line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
# ABSTRACT: a Perl port of Etsy's statsd *server* |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
package Net::Statsd::Server; |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
# Use statements {{{ |
6
|
|
|
|
|
|
|
|
7
|
3
|
|
|
3
|
|
96401
|
use strict; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
563
|
|
8
|
|
|
|
|
|
|
#se warnings; |
9
|
|
|
|
|
|
|
|
10
|
3
|
|
|
3
|
|
4187
|
use JSON::XS (); |
|
3
|
|
|
|
|
33618
|
|
|
3
|
|
|
|
|
168
|
|
11
|
3
|
|
|
3
|
|
3616
|
use Socket qw(SOL_SOCKET SO_RCVBUF); |
|
3
|
|
|
|
|
15771
|
|
|
3
|
|
|
|
|
1104
|
|
12
|
3
|
|
|
3
|
|
3944
|
use Time::HiRes (); |
|
3
|
|
|
|
|
8285
|
|
|
3
|
|
|
|
|
93
|
|
13
|
|
|
|
|
|
|
|
14
|
3
|
|
|
3
|
|
5033
|
use AnyEvent; |
|
3
|
|
|
|
|
20500
|
|
|
3
|
|
|
|
|
217
|
|
15
|
3
|
|
|
3
|
|
4662
|
use AnyEvent::Handle; |
|
3
|
|
|
|
|
61803
|
|
|
3
|
|
|
|
|
138
|
|
16
|
3
|
|
|
3
|
|
3703
|
use AnyEvent::Handle::UDP; |
|
3
|
|
|
|
|
399019
|
|
|
3
|
|
|
|
|
204
|
|
17
|
3
|
|
|
3
|
|
4012
|
use AnyEvent::Log; |
|
3
|
|
|
|
|
43786
|
|
|
3
|
|
|
|
|
152
|
|
18
|
3
|
|
|
3
|
|
35
|
use AnyEvent::Socket; |
|
3
|
|
|
|
|
6
|
|
|
3
|
|
|
|
|
453
|
|
19
|
|
|
|
|
|
|
|
20
|
3
|
|
|
3
|
|
2292
|
use Net::Statsd::Server::Backend; |
|
3
|
|
|
|
|
14
|
|
|
3
|
|
|
|
|
84
|
|
21
|
3
|
|
|
3
|
|
1871
|
use Net::Statsd::Server::Metrics; |
|
3
|
|
|
|
|
8
|
|
|
3
|
|
|
|
|
259
|
|
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
# }}} |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
# Constants and global variables {{{ |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
use constant { |
28
|
3
|
|
|
|
|
16326
|
DEBUG => 0, |
29
|
|
|
|
|
|
|
DEFAULT_CONFIG_FILE => 'localConfig.js', |
30
|
|
|
|
|
|
|
DEFAULT_FLUSH_INTERVAL => 10_000, |
31
|
|
|
|
|
|
|
DEFAULT_LOG_LEVEL => 'info', |
32
|
|
|
|
|
|
|
RECEIVE_BUFFER_MB => 8, # 0 = setsockopt disabled |
33
|
3
|
|
|
3
|
|
25
|
}; |
|
3
|
|
|
|
|
5
|
|
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
our $VERSION = '0.17'; |
36
|
|
|
|
|
|
|
our $logger; |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
# }}} |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
sub new { |
41
|
1
|
|
|
1
|
0
|
171
|
my ($class, $opt) = @_; |
42
|
1
|
|
50
|
|
|
4
|
$opt ||= {}; |
43
|
1
|
|
33
|
|
|
6
|
$class = ref $class || $class; |
44
|
|
|
|
|
|
|
|
45
|
1
|
|
|
|
|
7
|
my $startup_time = time(); |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
# Initialize data structures with defaults for statsd stats |
48
|
1
|
|
|
|
|
35
|
my $self = { |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
startup_time => $startup_time, |
51
|
|
|
|
|
|
|
start_time_hi => [Time::HiRes::gettimeofday], |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
server => undef, |
54
|
|
|
|
|
|
|
mgmtServer => undef, |
55
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
config_file => $opt->{config_file}, |
57
|
|
|
|
|
|
|
config => undef, |
58
|
|
|
|
|
|
|
stats => { |
59
|
|
|
|
|
|
|
messages => { |
60
|
|
|
|
|
|
|
"last_msg_seen" => $startup_time, |
61
|
|
|
|
|
|
|
"bad_lines_seen" => 0, |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
}, |
64
|
|
|
|
|
|
|
metrics => undef, |
65
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
debugInt => undef, |
67
|
|
|
|
|
|
|
flushInterval => undef, |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
backends => [], |
70
|
|
|
|
|
|
|
logger => $logger, |
71
|
|
|
|
|
|
|
}; |
72
|
|
|
|
|
|
|
|
73
|
1
|
|
|
|
|
5
|
$self->{$_} = $opt->{$_} |
74
|
1
|
|
|
|
|
5
|
for keys %{ $opt }; |
75
|
|
|
|
|
|
|
|
76
|
1
|
|
|
|
|
4
|
bless $self, $class; |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
# Flatten JSON booleans to avoid calls to JSON::XS::bool |
80
|
|
|
|
|
|
|
# in the performance-critical code paths |
81
|
|
|
|
|
|
|
sub _flatten_bools { |
82
|
1
|
|
|
1
|
|
2
|
my ($self, $conf_hash) = @_; |
83
|
1
|
|
|
|
|
3
|
for (qw(dumpMessages debug)) { |
84
|
2
|
|
|
|
|
76
|
$conf_hash->{$_} = !! $conf_hash->{$_}; |
85
|
|
|
|
|
|
|
} |
86
|
1
|
|
|
|
|
5
|
return $conf_hash; |
87
|
|
|
|
|
|
|
} |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
sub _json_emitter { |
90
|
0
|
|
|
0
|
|
0
|
my ($self) = @_; |
91
|
0
|
|
|
|
|
0
|
my $js = JSON::XS->new() |
92
|
|
|
|
|
|
|
->utf8(1) |
93
|
|
|
|
|
|
|
->shrink(1) |
94
|
|
|
|
|
|
|
->space_before(0) |
95
|
|
|
|
|
|
|
->space_after(1) |
96
|
|
|
|
|
|
|
->indent(0); |
97
|
0
|
|
|
|
|
0
|
return $js; |
98
|
|
|
|
|
|
|
} |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
sub _start_time_hi { |
101
|
0
|
|
|
0
|
|
0
|
return $_[0]->{start_time_hi}; |
102
|
|
|
|
|
|
|
} |
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
sub config_defaults { |
105
|
|
|
|
|
|
|
return { |
106
|
1
|
|
|
1
|
0
|
13
|
"debug" => 0, |
107
|
|
|
|
|
|
|
"debugInterval" => 10000, # ms |
108
|
|
|
|
|
|
|
"graphitePort" => 2003, |
109
|
|
|
|
|
|
|
"port" => 8125, |
110
|
|
|
|
|
|
|
"address" => "0.0.0.0", |
111
|
|
|
|
|
|
|
"mgmt_port" => 8126, |
112
|
|
|
|
|
|
|
"mgmt_address" => "0.0.0.0", |
113
|
|
|
|
|
|
|
"flushInterval" => DEFAULT_FLUSH_INTERVAL, # ms |
114
|
|
|
|
|
|
|
#"keyFlush" => { |
115
|
|
|
|
|
|
|
# "interval" => 10, # s |
116
|
|
|
|
|
|
|
# "percent" => 100, |
117
|
|
|
|
|
|
|
# "log" => "", |
118
|
|
|
|
|
|
|
#}, |
119
|
|
|
|
|
|
|
"log" => { |
120
|
|
|
|
|
|
|
"backend" => "stdout", |
121
|
|
|
|
|
|
|
"level" => "LOG_INFO", |
122
|
|
|
|
|
|
|
}, |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
"prefixStats" => "statsd", |
125
|
|
|
|
|
|
|
"dumpMessages" => 0, |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
"deleteIdleStats" => 0, |
128
|
|
|
|
|
|
|
#"deleteCounters" => 0, |
129
|
|
|
|
|
|
|
#"deleteGauges" => 0, |
130
|
|
|
|
|
|
|
#"deleteSets" => 0, |
131
|
|
|
|
|
|
|
#"deleteTimers" => 0, |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
"percentThreshold" => [ 90 ], |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
"backends" => [ |
136
|
|
|
|
|
|
|
"Console", |
137
|
|
|
|
|
|
|
], |
138
|
|
|
|
|
|
|
}; |
139
|
|
|
|
|
|
|
} |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
sub config { |
142
|
1
|
|
|
1
|
0
|
6
|
my ($self, $config_file) = @_; |
143
|
|
|
|
|
|
|
|
144
|
1
|
50
|
33
|
|
|
10
|
if (exists $self->{config} && defined $self->{config}) { |
145
|
0
|
|
|
|
|
0
|
return $self->{config}; |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
1
|
|
33
|
|
|
7
|
$config_file ||= $self->config_file(); |
149
|
|
|
|
|
|
|
|
150
|
1
|
50
|
|
|
|
19
|
if (! -e $config_file) { |
151
|
0
|
|
|
|
|
0
|
return; |
152
|
|
|
|
|
|
|
} |
153
|
|
|
|
|
|
|
|
154
|
1
|
|
|
|
|
4
|
my $defaults = $self->config_defaults(); |
155
|
|
|
|
|
|
|
|
156
|
1
|
50
|
|
|
|
30
|
open my $conf_fh, '<', $config_file |
157
|
|
|
|
|
|
|
or return $defaults; |
158
|
|
|
|
|
|
|
|
159
|
1
|
|
|
|
|
22
|
my $conf_json = join("", <$conf_fh>); |
160
|
1
|
|
|
|
|
9
|
close $conf_fh; |
161
|
|
|
|
|
|
|
|
162
|
1
|
|
|
|
|
17
|
my $json = JSON::XS->new->relaxed->utf8; |
163
|
1
|
|
|
|
|
16
|
my $conf_hash = $json->decode($conf_json); |
164
|
|
|
|
|
|
|
|
165
|
1
|
|
|
|
|
3
|
$conf_hash = $self->_flatten_bools($conf_hash); |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
# Poor man's Hash::Merge |
168
|
1
|
|
|
|
|
2
|
for (keys %{ $defaults }) { |
|
1
|
|
|
|
|
4
|
|
169
|
14
|
100
|
|
|
|
24
|
if (! exists $conf_hash->{$_}) { |
170
|
4
|
|
|
|
|
8
|
$conf_hash->{$_} = $defaults->{$_}; |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
} |
173
|
|
|
|
|
|
|
|
174
|
1
|
|
|
|
|
9
|
return $self->{config} = $conf_hash; |
175
|
|
|
|
|
|
|
} |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
sub clear_metrics { |
178
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
179
|
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
0
|
my $conf = $self->config; |
181
|
|
|
|
|
|
|
|
182
|
0
|
|
|
|
|
0
|
my $del_counters = $conf->{deleteCounters}; |
183
|
0
|
|
|
|
|
0
|
my $del_gauges = $conf->{deleteGauges}; |
184
|
0
|
|
|
|
|
0
|
my $del_sets = $conf->{deleteSets}; |
185
|
0
|
|
|
|
|
0
|
my $del_timers = $conf->{deleteTimers}; |
186
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
# Metrics that are not seen in the interval won't |
188
|
|
|
|
|
|
|
# be sent anymore. Enable this with 'deleteIdleStats' |
189
|
0
|
|
|
|
|
0
|
my $del_idle = _defined_or($conf->{deleteIdleStats}, 0); |
190
|
|
|
|
|
|
|
|
191
|
0
|
0
|
|
|
|
0
|
if ($del_idle) { |
192
|
0
|
|
|
|
|
0
|
$del_counters = _defined_or($del_counters, 1); |
193
|
0
|
|
|
|
|
0
|
$del_gauges = _defined_or($del_gauges, 1); |
194
|
0
|
|
|
|
|
0
|
$del_timers = _defined_or($del_timers, 1); |
195
|
0
|
|
|
|
|
0
|
$del_sets = _defined_or($del_sets, 1); |
196
|
|
|
|
|
|
|
} |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
# Whether to just reset them to zero or to wipe them |
199
|
0
|
|
|
|
|
0
|
my $metrics = $self->{metrics}; |
200
|
0
|
0
|
|
|
|
0
|
if ($del_counters) { |
201
|
0
|
|
|
|
|
0
|
$metrics->{counters} = {}; |
202
|
0
|
|
|
|
|
0
|
$metrics->{counter_rates} = {}; |
203
|
|
|
|
|
|
|
} |
204
|
|
|
|
|
|
|
else { |
205
|
0
|
|
|
|
|
0
|
my $counters = $metrics->{counters}; |
206
|
0
|
|
|
|
|
0
|
my $counter_rates = $metrics->{counter_rates}; |
207
|
0
|
|
|
|
|
0
|
$_ = 0 for |
|
0
|
|
|
|
|
0
|
|
208
|
0
|
|
|
|
|
0
|
values %{ $counters }, |
209
|
|
|
|
|
|
|
values %{ $counter_rates }; |
210
|
|
|
|
|
|
|
} |
211
|
|
|
|
|
|
|
|
212
|
0
|
0
|
|
|
|
0
|
if ($del_timers) { |
213
|
0
|
|
|
|
|
0
|
$metrics->{timers} = {}; |
214
|
0
|
|
|
|
|
0
|
$metrics->{timer_data} = {}; |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
else { |
217
|
0
|
|
|
|
|
0
|
my $timers = $metrics->{timers}; |
218
|
0
|
|
|
|
|
0
|
my $timer_data = $metrics->{timer_data}; |
219
|
0
|
|
|
|
|
0
|
$_ = [] for |
|
0
|
|
|
|
|
0
|
|
220
|
0
|
|
|
|
|
0
|
values %{ $timers }, |
221
|
|
|
|
|
|
|
values %{ $timer_data }; |
222
|
|
|
|
|
|
|
} |
223
|
|
|
|
|
|
|
|
224
|
0
|
0
|
|
|
|
0
|
if ($del_gauges) { |
225
|
0
|
|
|
|
|
0
|
$metrics->{gauges} = {}; |
226
|
|
|
|
|
|
|
} |
227
|
|
|
|
|
|
|
|
228
|
0
|
0
|
|
|
|
0
|
if ($del_sets) { |
229
|
0
|
|
|
|
|
0
|
$metrics->{sets} = {}; |
230
|
|
|
|
|
|
|
} |
231
|
|
|
|
|
|
|
else { |
232
|
0
|
|
|
|
|
0
|
my $sets = $metrics->{sets}; |
233
|
0
|
|
|
|
|
0
|
$_ = {} for values %{ $sets }; |
|
0
|
|
|
|
|
0
|
|
234
|
|
|
|
|
|
|
} |
235
|
|
|
|
|
|
|
|
236
|
0
|
|
|
|
|
0
|
return; |
237
|
|
|
|
|
|
|
} |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
sub config_file { |
240
|
1
|
|
|
1
|
0
|
4
|
_defined_or($_[0]->{config_file}, DEFAULT_CONFIG_FILE); |
241
|
|
|
|
|
|
|
} |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
sub flush_metrics { |
244
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
245
|
0
|
|
|
|
|
0
|
my $flush_start_time = time; |
246
|
0
|
|
|
|
|
0
|
$logger->(notice => "flushing metrics"); |
247
|
0
|
|
|
|
|
0
|
my $flush_interval = $self->config->{flushInterval}; |
248
|
0
|
|
|
|
|
0
|
my $metrics = $self->metrics->process($flush_interval); |
249
|
|
|
|
|
|
|
$self->foreach_backend(sub { |
250
|
0
|
|
|
0
|
|
0
|
$_[0]->flush($flush_start_time, $metrics); |
251
|
0
|
|
|
|
|
0
|
}); |
252
|
0
|
|
|
|
|
0
|
$self->clear_metrics(); |
253
|
0
|
|
|
|
|
0
|
return; |
254
|
|
|
|
|
|
|
} |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
# This is the performance-critical section of Net::Statsd::Server. |
257
|
|
|
|
|
|
|
# Everything below has been optimised for performance rather than |
258
|
|
|
|
|
|
|
# legibility or transparency. Be careful. |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
sub handle_client_packet { |
261
|
0
|
|
|
0
|
0
|
0
|
my ($self, $request) = @_; |
262
|
|
|
|
|
|
|
|
263
|
0
|
|
|
|
|
0
|
my $config = $self->{config}; |
264
|
0
|
|
|
|
|
0
|
my $metrics = $self->{metrics}; |
265
|
0
|
|
|
|
|
0
|
my $counters = $metrics->{counters}; |
266
|
0
|
|
|
|
|
0
|
my $stats = $self->{stats}; |
267
|
0
|
|
|
|
|
0
|
my $g_pref = $config->{prefixStats}; |
268
|
|
|
|
|
|
|
|
269
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.packets_received"}++; |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
# TODO backendEvents.emit('packet', msg, rinfo); |
272
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
0
|
my @metrics = split("\n", $request); |
274
|
|
|
|
|
|
|
|
275
|
0
|
|
|
|
|
0
|
my $dump_messages = $config->{dumpMessages}; |
276
|
0
|
|
0
|
|
|
0
|
my $must_count_keys = exists $config->{keyFlush} |
277
|
|
|
|
|
|
|
&& $config->{keyFlush}->{interval}; |
278
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
0
|
for my $m (@metrics) { |
280
|
|
|
|
|
|
|
|
281
|
0
|
0
|
|
|
|
0
|
$logger->(debug => $m) if $dump_messages; |
282
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
0
|
my @bits = split(":", $m); |
284
|
0
|
|
|
|
|
0
|
my $key = shift @bits; |
285
|
|
|
|
|
|
|
|
286
|
0
|
|
|
|
|
0
|
$key =~ y{/ }{_-}s; |
287
|
0
|
|
|
|
|
0
|
$key =~ y{a-zA-Z0-9_\-\.}{}cd; |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
# Not very clear here. Etsy's code was doing this differently |
290
|
0
|
0
|
|
|
|
0
|
if ($must_count_keys) { |
291
|
0
|
|
|
|
|
0
|
my $key_counter = $metrics->{keyCounter}; |
292
|
0
|
|
|
|
|
0
|
$key_counter->{$key}++; |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
|
295
|
0
|
0
|
|
|
|
0
|
push @bits, "1" if 0 == @bits; |
296
|
|
|
|
|
|
|
|
297
|
0
|
|
|
|
|
0
|
for my $i (0..$#bits) { |
298
|
|
|
|
|
|
|
|
299
|
0
|
|
|
|
|
0
|
my $sample_rate = 1; |
300
|
0
|
|
|
|
|
0
|
my @fields = split(/\|/, $bits[$i]); |
301
|
|
|
|
|
|
|
|
302
|
0
|
0
|
0
|
|
|
0
|
if (! defined $fields[1] || $fields[1] eq "") { |
303
|
0
|
|
|
|
|
0
|
$logger->(warn => "Bad line: $bits[$i] in msg \"$m\""); |
304
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.bad_lines_seen"}++; |
305
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"bad_lines_seen"}++; |
306
|
0
|
|
|
|
|
0
|
next; |
307
|
|
|
|
|
|
|
} |
308
|
|
|
|
|
|
|
|
309
|
0
|
|
0
|
|
|
0
|
my $value = $fields[0] || 0; |
310
|
0
|
|
|
|
|
0
|
my $unit = $fields[1]; |
311
|
0
|
|
|
|
|
0
|
for ($unit) { |
312
|
0
|
|
|
|
|
0
|
s{^\s*}{}; |
313
|
0
|
|
|
|
|
0
|
s{\s*$}{}; |
314
|
|
|
|
|
|
|
} |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
# Timers |
317
|
0
|
0
|
|
|
|
0
|
if ($unit eq "ms") { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
318
|
0
|
|
|
|
|
0
|
my $timers = $metrics->{timers}; |
319
|
0
|
|
0
|
|
|
0
|
$timers->{$key} ||= []; |
320
|
0
|
|
|
|
|
0
|
push @{ $timers->{$key} }, $value; |
|
0
|
|
|
|
|
0
|
|
321
|
|
|
|
|
|
|
} |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
# Gauges |
324
|
|
|
|
|
|
|
elsif ($unit eq "g") { |
325
|
0
|
|
|
|
|
0
|
my $gauges = $metrics->{gauges}; |
326
|
0
|
|
|
|
|
0
|
$gauges->{$key} = $value; |
327
|
|
|
|
|
|
|
} |
328
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
# Sets |
330
|
|
|
|
|
|
|
elsif ($unit eq "s") { |
331
|
|
|
|
|
|
|
# Treat set as a normal hash with undef keys |
332
|
|
|
|
|
|
|
# to minimize memory consumption *and* insertion speed |
333
|
0
|
|
|
|
|
0
|
my $sets = $metrics->{sets}; |
334
|
0
|
|
0
|
|
|
0
|
$sets->{$key} ||= {}; |
335
|
0
|
|
|
|
|
0
|
$sets->{$key}->{$value} = undef; |
336
|
|
|
|
|
|
|
} |
337
|
|
|
|
|
|
|
|
338
|
|
|
|
|
|
|
# Counters |
339
|
|
|
|
|
|
|
else { |
340
|
0
|
0
|
|
|
|
0
|
if (defined $fields[2]) { |
341
|
0
|
0
|
|
|
|
0
|
if ($fields[2] =~ m{^\@([\d\.]+)}) { |
342
|
0
|
|
|
|
|
0
|
$sample_rate = $1 + 0; |
343
|
|
|
|
|
|
|
} |
344
|
|
|
|
|
|
|
else { |
345
|
0
|
|
|
|
|
0
|
$logger->(warn => "Bad line: $bits[$i] in msg \"$m\"; has invalid sample rate"); |
346
|
0
|
|
|
|
|
0
|
$counters->{"${g_pref}.bad_lines_seen"}++; |
347
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"bad_lines_seen"}++; |
348
|
0
|
|
|
|
|
0
|
next; |
349
|
|
|
|
|
|
|
} |
350
|
|
|
|
|
|
|
} |
351
|
0
|
|
0
|
|
|
0
|
$counters->{$key} ||= 0; |
352
|
0
|
|
0
|
|
|
0
|
$value ||= 1; |
353
|
0
|
|
|
|
|
0
|
$value /= $sample_rate; |
354
|
0
|
|
|
|
|
0
|
$counters->{$key} += $value; |
355
|
|
|
|
|
|
|
} |
356
|
|
|
|
|
|
|
} |
357
|
|
|
|
|
|
|
} |
358
|
|
|
|
|
|
|
|
359
|
0
|
|
|
|
|
0
|
$stats->{"messages"}->{"last_msg_seen"} = time(); |
360
|
|
|
|
|
|
|
} |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
sub handle_manager_command { |
363
|
0
|
|
|
0
|
0
|
0
|
my ($self, $handle, $request) = @_; |
364
|
0
|
|
|
|
|
0
|
my @cmdline = split(" ", trim($request)); |
365
|
0
|
|
|
|
|
0
|
my $cmd = shift @cmdline; |
366
|
0
|
|
|
|
|
0
|
my $reply; |
367
|
|
|
|
|
|
|
|
368
|
0
|
|
|
|
|
0
|
$logger->(notice => "Received manager command '$cmd' (req=$request)"); |
369
|
|
|
|
|
|
|
|
370
|
0
|
0
|
|
|
|
0
|
if ($cmd eq "help") { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
371
|
0
|
|
|
|
|
0
|
$reply = ( |
372
|
|
|
|
|
|
|
"Commands: stats, counters, timers, gauges, delcounters, deltimers, delgauges, quit\015\012\015\012" |
373
|
|
|
|
|
|
|
); |
374
|
|
|
|
|
|
|
} |
375
|
|
|
|
|
|
|
elsif ($cmd eq "stats") { |
376
|
0
|
|
|
|
|
0
|
my $now = time; |
377
|
0
|
|
|
|
|
0
|
my $uptime = $now - $self->{startup_time}; |
378
|
0
|
|
|
|
|
0
|
$reply = "uptime: $uptime\n"; |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
# Loop through the base stats |
381
|
0
|
|
|
|
|
0
|
my $stats = $self->stats; |
382
|
|
|
|
|
|
|
|
383
|
0
|
|
|
|
|
0
|
for my $group (keys %{$stats}) { |
|
0
|
|
|
|
|
0
|
|
384
|
0
|
|
|
|
|
0
|
for my $metric (keys %{$stats->{$group}}) { |
|
0
|
|
|
|
|
0
|
|
385
|
0
|
|
|
|
|
0
|
my $val = $stats->{$group}->{$metric}; |
386
|
0
|
0
|
|
|
|
0
|
my $delta = $metric =~ m{^last_} |
387
|
|
|
|
|
|
|
? $now - $val |
388
|
|
|
|
|
|
|
: $val; |
389
|
0
|
|
|
|
|
0
|
$reply .= "${group}.${metric}: ${delta}\n"; |
390
|
|
|
|
|
|
|
} |
391
|
|
|
|
|
|
|
} |
392
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
$self->foreach_backend(sub { |
394
|
0
|
|
|
0
|
|
0
|
my $backend_status = $_[0]->status; |
395
|
0
|
0
|
0
|
|
|
0
|
if ($backend_status && ref $backend_status eq "HASH") { |
396
|
0
|
|
|
|
|
0
|
for (keys %{ $backend_status }) { |
|
0
|
|
|
|
|
0
|
|
397
|
0
|
|
|
|
|
0
|
$reply .= sprintf("%s.%s: %s\n", |
398
|
|
|
|
|
|
|
lc($_[0]->name), |
399
|
|
|
|
|
|
|
$_ => $backend_status->{$_} |
400
|
|
|
|
|
|
|
); |
401
|
|
|
|
|
|
|
} |
402
|
|
|
|
|
|
|
} |
403
|
0
|
|
|
|
|
0
|
}); |
404
|
|
|
|
|
|
|
|
405
|
0
|
|
|
|
|
0
|
$reply .= "END\n\n"; |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
elsif ($cmd eq "counters") { |
408
|
0
|
|
|
|
|
0
|
my $counters = $self->{metrics}->{counters}; |
409
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($counters); |
410
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
411
|
|
|
|
|
|
|
} |
412
|
|
|
|
|
|
|
elsif ($cmd eq "timers") { |
413
|
0
|
|
|
|
|
0
|
my $timers = $self->{metrics}->{timers}; |
414
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($timers); |
415
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
416
|
|
|
|
|
|
|
} |
417
|
|
|
|
|
|
|
elsif ($cmd eq "gauges") { |
418
|
0
|
|
|
|
|
0
|
my $gauges = $self->{metrics}->{gauges}; |
419
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($gauges); |
420
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
elsif ($cmd eq "sets") { |
423
|
0
|
|
|
|
|
0
|
my $sets = $self->{metrics}->{sets}; |
424
|
0
|
|
|
|
|
0
|
my $sets_as_lists = {}; |
425
|
|
|
|
|
|
|
# FIXME Not really happy about this... |
426
|
|
|
|
|
|
|
# if you have huge sets, it's going to suck. |
427
|
|
|
|
|
|
|
# If you have huge sets, probably statsd is not for you anyway. |
428
|
0
|
|
|
|
|
0
|
for my $set (keys %{$sets}) { |
|
0
|
|
|
|
|
0
|
|
429
|
0
|
|
|
|
|
0
|
$sets_as_lists->{$set} = [ keys %{ $sets->{$set} } ]; |
|
0
|
|
|
|
|
0
|
|
430
|
|
|
|
|
|
|
} |
431
|
0
|
|
|
|
|
0
|
$reply = $self->_json_emitter()->encode($sets_as_lists); |
432
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
433
|
|
|
|
|
|
|
} |
434
|
|
|
|
|
|
|
elsif ($cmd eq "delcounters") { |
435
|
0
|
|
|
|
|
0
|
my $counters = $self->{metrics}->{counters}; |
436
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
437
|
0
|
|
|
|
|
0
|
delete $counters->{$name}; |
438
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
439
|
|
|
|
|
|
|
} |
440
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
elsif ($cmd eq "deltimers") { |
443
|
0
|
|
|
|
|
0
|
my $timers = $self->{metrics}->{timers}; |
444
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
445
|
0
|
|
|
|
|
0
|
delete $timers->{$name}; |
446
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
447
|
|
|
|
|
|
|
} |
448
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
449
|
|
|
|
|
|
|
} |
450
|
|
|
|
|
|
|
elsif ($cmd eq "delgauges") { |
451
|
0
|
|
|
|
|
0
|
my $gauges = $self->{metrics}->{gauges}; |
452
|
0
|
|
|
|
|
0
|
for my $name (@cmdline) { |
453
|
0
|
|
|
|
|
0
|
delete $gauges->{$name}; |
454
|
0
|
|
|
|
|
0
|
$reply .= "deleted: $name\n"; |
455
|
|
|
|
|
|
|
} |
456
|
0
|
|
|
|
|
0
|
$reply .= "\nEND\n\n"; |
457
|
|
|
|
|
|
|
} |
458
|
|
|
|
|
|
|
elsif ($cmd eq "quit") { |
459
|
0
|
|
|
|
|
0
|
undef $reply; |
460
|
0
|
|
|
|
|
0
|
$handle->destroy(); |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
else { |
463
|
0
|
|
|
|
|
0
|
$reply = "ERROR\n"; |
464
|
|
|
|
|
|
|
} |
465
|
0
|
|
|
|
|
0
|
return $reply; |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
sub handle_manager_connection { |
469
|
0
|
|
|
0
|
0
|
0
|
my ($self, $handle, $line) = @_; |
470
|
|
|
|
|
|
|
#$logger->(notice => "Received mgmt command [$line]"); |
471
|
0
|
0
|
|
|
|
0
|
if (my $reply = $self->handle_manager_command($handle, $line)) { |
472
|
0
|
|
|
|
|
0
|
$logger->(notice => "Sending mgmt reply [$reply]"); |
473
|
0
|
|
|
|
|
0
|
$handle->push_write($reply); |
474
|
|
|
|
|
|
|
# Accept a new command on the same connection |
475
|
|
|
|
|
|
|
$handle->push_read(line => sub { |
476
|
0
|
|
|
0
|
|
0
|
handle_manager_connection($self, @_) |
477
|
0
|
|
|
|
|
0
|
}); |
478
|
|
|
|
|
|
|
} |
479
|
|
|
|
|
|
|
else { |
480
|
0
|
|
|
|
|
0
|
$logger->(notice => "Shutting down socket"); |
481
|
0
|
|
|
|
|
0
|
$handle->push_write("\n"); |
482
|
0
|
|
|
|
|
0
|
$handle->destroy; |
483
|
|
|
|
|
|
|
} |
484
|
|
|
|
|
|
|
} |
485
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
sub init_backends { |
487
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
488
|
0
|
|
|
|
|
0
|
my $backends = $self->config->{backends}; |
489
|
0
|
0
|
0
|
|
|
0
|
if (! $backends or ref $backends ne 'ARRAY') { |
490
|
0
|
|
|
|
|
0
|
die "At least one backend is needed in your configuration"; |
491
|
|
|
|
|
|
|
} |
492
|
0
|
|
|
|
|
0
|
for my $backend (@{ $backends }) { |
|
0
|
|
|
|
|
0
|
|
493
|
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
# Nodejs statsd expects a relative path |
495
|
0
|
0
|
|
|
|
0
|
if ($backend =~ m{^ \./backends/ (.+) $}x) { |
496
|
0
|
|
|
|
|
0
|
$backend = $1; |
497
|
|
|
|
|
|
|
} |
498
|
|
|
|
|
|
|
|
499
|
0
|
|
|
|
|
0
|
my $pkg = $backend; |
500
|
0
|
0
|
|
|
|
0
|
if ($backend =~ m{^ (\w+) $}x) { |
501
|
0
|
|
|
|
|
0
|
$pkg = ucfirst lc $pkg; |
502
|
0
|
|
|
|
|
0
|
$pkg = "Net::Statsd::Server::Backend::${pkg}"; |
503
|
|
|
|
|
|
|
} |
504
|
0
|
|
|
|
|
0
|
my $mod = $pkg; |
505
|
0
|
|
|
|
|
0
|
$mod =~ s{::}{/}g; |
506
|
0
|
|
|
|
|
0
|
$mod .= ".pm"; |
507
|
|
|
|
|
|
|
eval { |
508
|
0
|
|
|
|
|
0
|
require $mod ; 1 |
|
0
|
|
|
|
|
0
|
|
509
|
0
|
0
|
|
|
|
0
|
} or do { |
510
|
0
|
|
|
|
|
0
|
$logger->(error=>"Backend ${backend} failed to load: $@"); |
511
|
0
|
|
|
|
|
0
|
next; |
512
|
|
|
|
|
|
|
}; |
513
|
0
|
|
|
|
|
0
|
$self->register_backend($pkg); |
514
|
|
|
|
|
|
|
} |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
sub init_logger { |
518
|
0
|
|
|
0
|
0
|
0
|
my ($self, $config) = @_; |
519
|
|
|
|
|
|
|
|
520
|
0
|
|
0
|
|
|
0
|
$config ||= {}; |
521
|
|
|
|
|
|
|
|
522
|
0
|
|
0
|
|
|
0
|
my $backend = $config->{backend} || 'stdout'; |
523
|
0
|
|
0
|
|
|
0
|
my $level = lc($config->{level} || 'LOG_INFO'); |
524
|
0
|
|
|
|
|
0
|
$level =~ s{^log_}{}; |
525
|
|
|
|
|
|
|
|
526
|
0
|
0
|
|
|
|
0
|
if ($backend eq 'stdout') { |
|
|
0
|
|
|
|
|
|
527
|
0
|
|
|
|
|
0
|
$AnyEvent::Log::FILTER->level($level); |
528
|
|
|
|
|
|
|
} |
529
|
|
|
|
|
|
|
elsif ($backend eq 'syslog') { |
530
|
|
|
|
|
|
|
# Syslog logging works commenting out the FILTER->level line |
531
|
0
|
|
|
|
|
0
|
$AnyEvent::Log::COLLECT->attach( |
532
|
|
|
|
|
|
|
AnyEvent::Log::Ctx->new( |
533
|
|
|
|
|
|
|
level => $level, |
534
|
|
|
|
|
|
|
log_to_syslog => "user", |
535
|
|
|
|
|
|
|
) |
536
|
|
|
|
|
|
|
); |
537
|
|
|
|
|
|
|
} |
538
|
0
|
|
0
|
0
|
|
0
|
$logger ||= sub { AE::log(shift(@_), shift(@_)) }; |
|
0
|
|
|
|
|
0
|
|
539
|
|
|
|
|
|
|
} |
540
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
sub logger { |
542
|
0
|
|
|
0
|
0
|
0
|
return $logger; |
543
|
|
|
|
|
|
|
} |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
sub metrics { |
546
|
0
|
|
|
0
|
0
|
0
|
$_[0]->{metrics}; |
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
sub register_backend { |
550
|
0
|
|
|
0
|
0
|
0
|
my ($self, $backend) = @_; |
551
|
0
|
|
0
|
|
|
0
|
$self->{backends} ||= []; |
552
|
0
|
|
|
|
|
0
|
my $backend_instance = $backend->new( |
553
|
|
|
|
|
|
|
$self->_start_time_hi, $self->config, |
554
|
|
|
|
|
|
|
); |
555
|
0
|
|
|
|
|
0
|
$logger->(notice => "Initializing $backend backend"); |
556
|
0
|
|
|
|
|
0
|
push @{ $self->{backends} }, $backend_instance; |
|
0
|
|
|
|
|
0
|
|
557
|
|
|
|
|
|
|
} |
558
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
sub foreach_backend { |
560
|
0
|
|
|
0
|
0
|
0
|
my ($self, $callback) = @_; |
561
|
0
|
|
0
|
|
|
0
|
my $backends = $self->{backends} || []; |
562
|
0
|
|
|
|
|
0
|
for my $obj (@{ $backends }) { |
|
0
|
|
|
|
|
0
|
|
563
|
|
|
|
|
|
|
eval { |
564
|
0
|
|
|
|
|
0
|
$callback->($obj); 1; |
|
0
|
|
|
|
|
0
|
|
565
|
0
|
0
|
|
|
|
0
|
} or do { |
566
|
0
|
|
|
|
|
0
|
$logger->(error => "Failed callback on $obj backend: $@"); |
567
|
|
|
|
|
|
|
}; |
568
|
|
|
|
|
|
|
} |
569
|
|
|
|
|
|
|
} |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
sub reload_config { |
572
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
573
|
0
|
|
|
|
|
0
|
delete $self->{config}; |
574
|
0
|
|
|
|
|
0
|
$logger->(warn => "Received SIGHUP: reloading configuration"); |
575
|
0
|
|
|
|
|
0
|
return $self->{config} = $self->config(); |
576
|
|
|
|
|
|
|
} |
577
|
|
|
|
|
|
|
|
578
|
|
|
|
|
|
|
sub setup_flush_timer { |
579
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
580
|
|
|
|
|
|
|
|
581
|
0
|
|
0
|
|
|
0
|
my $flush_interval = $self->config->{flushInterval} |
582
|
|
|
|
|
|
|
|| DEFAULT_FLUSH_INTERVAL; |
583
|
|
|
|
|
|
|
|
584
|
0
|
|
|
|
|
0
|
$flush_interval = $flush_interval / 1000; |
585
|
0
|
|
|
|
|
0
|
$logger->(notice => "metrics flush will happen every ${flush_interval}s"); |
586
|
|
|
|
|
|
|
|
587
|
|
|
|
|
|
|
my $flush_t = AE::timer $flush_interval, $flush_interval, sub { |
588
|
0
|
|
|
0
|
|
0
|
$self->flush_metrics |
589
|
0
|
|
|
|
|
0
|
}; |
590
|
|
|
|
|
|
|
|
591
|
0
|
|
|
|
|
0
|
return $flush_t; |
592
|
|
|
|
|
|
|
} |
593
|
|
|
|
|
|
|
|
594
|
1
|
50
|
|
1
|
|
5
|
sub _defined_or { defined $_[0] ? $_[0] : $_[1] } |
595
|
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
sub setup_keyflush_timer { |
597
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
598
|
|
|
|
|
|
|
|
599
|
0
|
|
|
|
|
0
|
my $conf_kf = $self->config->{keyFlush}; |
600
|
0
|
|
|
|
|
0
|
my $kf_interval = _defined_or($conf_kf->{interval}, 0); |
601
|
0
|
0
|
|
|
|
0
|
return if $kf_interval <= 0; |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
# Always milliseconds in the config! |
604
|
0
|
|
|
|
|
0
|
$kf_interval /= 1000; |
605
|
|
|
|
|
|
|
|
606
|
0
|
|
|
|
|
0
|
my $kf_pct = _defined_or($conf_kf->{percent}, 100); |
607
|
0
|
|
|
|
|
0
|
my $kf_log = $conf_kf->{log}; |
608
|
|
|
|
|
|
|
|
609
|
0
|
|
0
|
|
|
0
|
$logger->(notice => "flushing top ${kf_pct}% keys to " |
610
|
|
|
|
|
|
|
. ($kf_log || "stdout") |
611
|
|
|
|
|
|
|
. " every ${kf_interval}s" |
612
|
|
|
|
|
|
|
); |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
my $kf_timer = AE::timer $kf_interval, $kf_interval, sub { |
615
|
0
|
|
|
0
|
|
0
|
$self->flush_top_keys() |
616
|
0
|
|
|
|
|
0
|
}; |
617
|
|
|
|
|
|
|
|
618
|
0
|
|
|
|
|
0
|
return $kf_timer; |
619
|
|
|
|
|
|
|
} |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
sub flush_top_keys { |
622
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
623
|
|
|
|
|
|
|
|
624
|
0
|
|
|
|
|
0
|
my $conf_kf = _defined_or($self->config->{keyFlush}, {}); |
625
|
0
|
|
|
|
|
0
|
my $kf_interval = _defined_or($conf_kf->{interval}, 0); |
626
|
0
|
|
|
|
|
0
|
$kf_interval /= 1000; |
627
|
|
|
|
|
|
|
|
628
|
0
|
|
0
|
|
|
0
|
my $kf_pct = $conf_kf->{percent} || 100; |
629
|
0
|
|
|
|
|
0
|
my $kf_log = $conf_kf->{log}; |
630
|
|
|
|
|
|
|
|
631
|
0
|
|
|
|
|
0
|
my @sorted_keys; |
632
|
0
|
|
|
|
|
0
|
my $key_counter = $self->metrics->{keyCounter}; |
633
|
0
|
|
|
|
|
0
|
while (my ($k, $v) = each %{ $key_counter }) { |
|
0
|
|
|
|
|
0
|
|
634
|
0
|
|
|
|
|
0
|
push @sorted_keys, [ $k, $v ]; |
635
|
|
|
|
|
|
|
} |
636
|
|
|
|
|
|
|
|
637
|
0
|
|
|
|
|
0
|
@sorted_keys = sort { $b->[1] <=> $a->[1] } @sorted_keys; |
|
0
|
|
|
|
|
0
|
|
638
|
|
|
|
|
|
|
|
639
|
0
|
|
|
|
|
0
|
my @time = localtime; |
640
|
0
|
|
|
|
|
0
|
my $time_str = sprintf "%04d-%02d-%02d %02d:%02d:%02d", |
641
|
|
|
|
|
|
|
$time[5] + 1900, $time[4] + 1, $time[3], |
642
|
|
|
|
|
|
|
$time[2], $time[1], $time[0]; |
643
|
|
|
|
|
|
|
|
644
|
0
|
|
|
|
|
0
|
my $log_message = ""; |
645
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
# Only show the top keyFlush.percent keys |
647
|
0
|
|
|
|
|
0
|
my $top_pct_limit = int(scalar(@sorted_keys) * $kf_pct / 100); |
648
|
0
|
|
|
|
|
0
|
for my $i (0 .. $top_pct_limit - 1) { |
649
|
0
|
|
|
|
|
0
|
$log_message .= sprintf "$time_str count=%d key=%s\n", |
650
|
|
|
|
|
|
|
$sorted_keys[$i][1], $sorted_keys[$i][0]; |
651
|
|
|
|
|
|
|
} |
652
|
|
|
|
|
|
|
|
653
|
0
|
0
|
|
|
|
0
|
if ($kf_log) { |
654
|
0
|
0
|
|
|
|
0
|
if (open my $log_fh, '>>', $kf_log) { |
655
|
0
|
|
|
|
|
0
|
$log_fh->printflush($log_message); |
656
|
0
|
|
|
|
|
0
|
$log_fh->close(); |
657
|
|
|
|
|
|
|
} |
658
|
|
|
|
|
|
|
} else { |
659
|
0
|
|
|
|
|
0
|
print $log_message; |
660
|
|
|
|
|
|
|
} |
661
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
# Clear the counters |
663
|
0
|
|
|
|
|
0
|
$self->metrics->{keyCounter} = {}; |
664
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
} |
666
|
|
|
|
|
|
|
|
667
|
|
|
|
|
|
|
sub init_metrics { |
668
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
669
|
0
|
|
|
|
|
0
|
my $config = $self->config; |
670
|
0
|
|
|
|
|
0
|
$self->{metrics} = Net::Statsd::Server::Metrics->new($config); |
671
|
0
|
|
|
|
|
0
|
return $self->{metrics}; |
672
|
|
|
|
|
|
|
} |
673
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
sub start_server { |
675
|
0
|
|
|
0
|
0
|
0
|
my ($self, $config) = @_; |
676
|
|
|
|
|
|
|
|
677
|
0
|
0
|
|
|
|
0
|
if (! defined $config) { |
678
|
0
|
|
|
|
|
0
|
$config = $self->config(); |
679
|
|
|
|
|
|
|
} |
680
|
|
|
|
|
|
|
|
681
|
0
|
|
|
|
|
0
|
$self->init_logger($config->{log}); |
682
|
|
|
|
|
|
|
|
683
|
0
|
|
0
|
|
|
0
|
my $host = $config->{address} || '0.0.0.0'; |
684
|
0
|
|
0
|
|
|
0
|
my $port = $config->{port} || 8125; |
685
|
|
|
|
|
|
|
|
686
|
0
|
|
0
|
|
|
0
|
my $mgmt_host = $config->{mgmt_address} || '0.0.0.0'; |
687
|
0
|
|
0
|
|
|
0
|
my $mgmt_port = $config->{mgmt_port} || 8126; |
688
|
|
|
|
|
|
|
|
689
|
0
|
|
|
|
|
0
|
$self->init_backends(); |
690
|
0
|
|
|
|
|
0
|
$self->init_metrics(); |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
# Statsd clients interface (UDP) |
693
|
|
|
|
|
|
|
$self->{server} = AnyEvent::Handle::UDP->new( |
694
|
|
|
|
|
|
|
bind => [$host, $port], |
695
|
|
|
|
|
|
|
on_recv => sub { |
696
|
0
|
|
|
0
|
|
0
|
my ($data, $ae_handle, $client_addr) = @_; |
697
|
|
|
|
|
|
|
#$logger->(debug => "Got data=$data self=$self"); |
698
|
0
|
|
|
|
|
0
|
my $reply = $self->handle_client_packet($data); |
699
|
0
|
|
|
|
|
0
|
$ae_handle->push_send($reply, $client_addr); |
700
|
|
|
|
|
|
|
}, |
701
|
0
|
|
|
|
|
0
|
); |
702
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
# Bump up SO_RCVBUF on UDP socket, to buffer up incoming |
704
|
|
|
|
|
|
|
# UDP packets, to avoid significant packet loss under load. |
705
|
|
|
|
|
|
|
# Read more: http://bit.ly/10eeFoE |
706
|
0
|
|
|
|
|
0
|
if (RECEIVE_BUFFER_MB > 0) { |
707
|
|
|
|
|
|
|
# On some systems this could fail (cpantesters reports) |
708
|
|
|
|
|
|
|
# Have it emit a warning instead of throwing an exception |
709
|
0
|
0
|
|
|
|
0
|
setsockopt($self->{server}->fh, SOL_SOCKET, |
710
|
|
|
|
|
|
|
SO_RCVBUF, RECEIVE_BUFFER_MB * 1048576) |
711
|
|
|
|
|
|
|
or warn "Couldn't set SO_RCVBUF: $!"; |
712
|
|
|
|
|
|
|
} |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
# Management interface (TCP, for 'stats' command, etc...) |
715
|
|
|
|
|
|
|
$self->{mgmtServer} = AnyEvent::Socket::tcp_server $mgmt_host, $mgmt_port, sub { |
716
|
0
|
0
|
|
0
|
|
0
|
my ($fh, $host, $port) = @_ |
717
|
|
|
|
|
|
|
or die "Unable to connect: $!"; |
718
|
|
|
|
|
|
|
|
719
|
0
|
|
|
|
|
0
|
my $handle; $handle = AnyEvent::Handle->new( |
720
|
|
|
|
|
|
|
fh => $fh, |
721
|
|
|
|
|
|
|
on_error => sub { |
722
|
0
|
|
|
|
|
0
|
AE::log error => $_[2], |
723
|
|
|
|
|
|
|
$_[0]->destroy; |
724
|
|
|
|
|
|
|
}, |
725
|
|
|
|
|
|
|
on_eof => sub { |
726
|
0
|
|
|
|
|
0
|
$handle->destroy; |
727
|
0
|
|
|
|
|
0
|
AE::log info => "Done.", |
728
|
|
|
|
|
|
|
}, |
729
|
0
|
|
|
|
|
0
|
); |
730
|
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
$handle->push_read(line => sub { |
732
|
0
|
|
|
|
|
0
|
handle_manager_connection($self, @_) |
733
|
0
|
|
|
|
|
0
|
}); |
734
|
0
|
|
|
|
|
0
|
}; |
735
|
|
|
|
|
|
|
|
736
|
0
|
|
|
|
|
0
|
$logger->(notice => "statsd server started on ${host}:${port} (v${VERSION})"); |
737
|
0
|
|
|
|
|
0
|
$logger->(notice => "manager interface started on ${mgmt_host}:${mgmt_port}"); |
738
|
|
|
|
|
|
|
|
739
|
0
|
|
|
|
|
0
|
my $f_ti = $self->setup_flush_timer; |
740
|
0
|
|
|
|
|
0
|
my $kf_ti = $self->setup_keyflush_timer; |
741
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
# This will block waiting for |
743
|
|
|
|
|
|
|
# incoming connections (TCP) or packets (UDP) |
744
|
0
|
|
|
|
|
0
|
my $cv = AE::cv; |
745
|
0
|
|
|
|
|
0
|
$cv->recv(); |
746
|
|
|
|
|
|
|
} |
747
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
sub stats { |
749
|
0
|
|
|
0
|
0
|
0
|
$_[0]->{stats}; |
750
|
|
|
|
|
|
|
} |
751
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
sub trim { |
753
|
8
|
|
|
8
|
0
|
8581
|
my $s = shift; |
754
|
8
|
100
|
|
|
|
26
|
return unless defined $s; |
755
|
7
|
|
|
|
|
26
|
$s =~ s{^\s+}{}; |
756
|
7
|
|
|
|
|
22
|
$s =~ s{\s+$}{}; |
757
|
7
|
|
|
|
|
16
|
return $s; |
758
|
|
|
|
|
|
|
} |
759
|
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
1; |
761
|
|
|
|
|
|
|
|
762
|
|
|
|
|
|
|
__END__ |