line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Beekeeper::Service::LogTail::Worker; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
977
|
use strict; |
|
1
|
|
|
|
|
5
|
|
|
1
|
|
|
|
|
29
|
|
4
|
1
|
|
|
1
|
|
5
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
41
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.10'; |
7
|
|
|
|
|
|
|
|
8
|
1
|
|
|
1
|
|
852
|
use Beekeeper::Worker ':log'; |
|
1
|
|
|
|
|
4
|
|
|
1
|
|
|
|
|
158
|
|
9
|
1
|
|
|
1
|
|
11
|
use base 'Beekeeper::Worker'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
67
|
|
10
|
|
|
|
|
|
|
|
11
|
1
|
|
|
1
|
|
11
|
use Beekeeper::Logger ':log_levels'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
126
|
|
12
|
1
|
|
|
1
|
|
6
|
use Scalar::Util 'weaken'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
40
|
|
13
|
1
|
|
|
1
|
|
5
|
use JSON::XS; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
1394
|
|
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
my @Log_buffer; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub authorize_request { |
19
|
0
|
|
|
0
|
1
|
|
my ($self, $req) = @_; |
20
|
|
|
|
|
|
|
|
21
|
0
|
0
|
|
|
|
|
return unless $self->__has_authorization_token('BKPR_ADMIN'); |
22
|
|
|
|
|
|
|
|
23
|
0
|
|
|
|
|
|
return BKPR_REQUEST_AUTHORIZED; |
24
|
|
|
|
|
|
|
} |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
sub on_startup { |
27
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
28
|
0
|
|
|
|
|
|
weaken $self; |
29
|
|
|
|
|
|
|
|
30
|
0
|
|
0
|
|
|
|
$self->{max_entries} = $self->{config}->{buffer_entries} || 20000; |
31
|
0
|
|
0
|
|
|
|
$self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG; |
32
|
|
|
|
|
|
|
|
33
|
0
|
|
|
|
|
|
$self->_connect_to_all_brokers; |
34
|
|
|
|
|
|
|
|
35
|
0
|
|
|
|
|
|
$self->accept_remote_calls( |
36
|
|
|
|
|
|
|
'_bkpr.logtail.tail' => 'tail', |
37
|
|
|
|
|
|
|
); |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
# Ping backend brokers to avoid disconnections due to inactivity |
40
|
|
|
|
|
|
|
$self->{ping_timer} = AnyEvent->timer( |
41
|
|
|
|
|
|
|
after => 60 * rand(), |
42
|
|
|
|
|
|
|
interval => 60, |
43
|
0
|
|
|
0
|
|
|
cb => sub { $self->_ping_backend_brokers }, |
44
|
0
|
|
|
|
|
|
); |
45
|
|
|
|
|
|
|
|
46
|
0
|
|
|
|
|
|
log_info "Ready"; |
47
|
|
|
|
|
|
|
} |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
sub _connect_to_all_brokers { |
50
|
0
|
|
|
0
|
|
|
my $self = shift; |
51
|
0
|
|
|
|
|
|
weaken $self; |
52
|
|
|
|
|
|
|
|
53
|
0
|
|
|
|
|
|
my $own_bus = $self->{_BUS}; |
54
|
0
|
|
|
|
|
|
my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id ); |
55
|
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
|
$self->{_BUS_GROUP} = []; |
57
|
|
|
|
|
|
|
|
58
|
0
|
|
|
|
|
|
foreach my $config (@$group_config) { |
59
|
|
|
|
|
|
|
|
60
|
0
|
|
|
|
|
|
my $bus_id = $config->{'bus_id'}; |
61
|
|
|
|
|
|
|
|
62
|
0
|
0
|
|
|
|
|
if ($bus_id eq $own_bus->bus_id) { |
63
|
|
|
|
|
|
|
# Already connected to our own bus |
64
|
0
|
|
|
|
|
|
$self->_collect_log($own_bus); |
65
|
0
|
|
|
|
|
|
next; |
66
|
|
|
|
|
|
|
} |
67
|
|
|
|
|
|
|
|
68
|
0
|
|
|
|
|
|
my $bus; $bus = Beekeeper::MQTT->new( |
69
|
|
|
|
|
|
|
%$config, |
70
|
|
|
|
|
|
|
bus_id => $bus_id, |
71
|
|
|
|
|
|
|
timeout => 300, |
72
|
|
|
|
|
|
|
on_error => sub { |
73
|
|
|
|
|
|
|
# Reconnect |
74
|
0
|
|
0
|
0
|
|
|
my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg; |
|
0
|
|
|
|
|
|
|
75
|
0
|
|
|
|
|
|
log_error "Connection to $bus_id failed: $errmsg"; |
76
|
0
|
|
|
|
|
|
my $delay = $self->{connect_err}->{$bus_id}++; |
77
|
|
|
|
|
|
|
$self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer( |
78
|
|
|
|
|
|
|
after => ($delay < 10 ? $delay * 3 : 30), |
79
|
|
|
|
|
|
|
cb => sub { |
80
|
|
|
|
|
|
|
$bus->connect( |
81
|
|
|
|
|
|
|
on_connack => sub { |
82
|
|
|
|
|
|
|
# Setup subscriptions |
83
|
0
|
|
|
|
|
|
log_warn "Reconnected to $bus_id"; |
84
|
0
|
|
|
|
|
|
$self->_collect_log($bus); |
85
|
|
|
|
|
|
|
} |
86
|
0
|
|
|
|
|
|
); |
87
|
|
|
|
|
|
|
}, |
88
|
0
|
0
|
|
|
|
|
); |
89
|
|
|
|
|
|
|
}, |
90
|
0
|
|
|
|
|
|
); |
91
|
|
|
|
|
|
|
|
92
|
0
|
|
|
|
|
|
push @{$self->{_BUS_GROUP}}, $bus; |
|
0
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
$bus->connect( |
95
|
|
|
|
|
|
|
on_connack => sub { |
96
|
|
|
|
|
|
|
# Setup subscriptions |
97
|
0
|
|
|
0
|
|
|
$self->_collect_log($bus); |
98
|
|
|
|
|
|
|
} |
99
|
0
|
|
|
|
|
|
); |
100
|
|
|
|
|
|
|
} |
101
|
|
|
|
|
|
|
} |
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
sub _collect_log { |
104
|
0
|
|
|
0
|
|
|
my ($self, $bus) = @_; |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
# Default logger logs to topic log/$level/$service |
107
|
|
|
|
|
|
|
|
108
|
0
|
|
|
|
|
|
my $max_entries = $self->{max_entries}; |
109
|
0
|
|
|
|
|
|
my $log_level = $self->{log_level}; |
110
|
0
|
|
|
|
|
|
my $worker = $self->{_WORKER}; |
111
|
|
|
|
|
|
|
|
112
|
0
|
|
|
|
|
|
foreach my $level (1..$log_level) { |
113
|
|
|
|
|
|
|
|
114
|
0
|
|
|
|
|
|
my $topic = "log/$level/#"; |
115
|
0
|
|
|
|
|
|
my $req; |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
$bus->subscribe( |
118
|
|
|
|
|
|
|
topic => $topic, |
119
|
|
|
|
|
|
|
on_publish => sub { |
120
|
|
|
|
|
|
|
# my ($payload_ref, $mqtt_properties) = @_; |
121
|
|
|
|
|
|
|
|
122
|
0
|
|
|
0
|
|
|
$req = decode_json( ${$_[0]} ); |
|
0
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
|
124
|
0
|
|
|
|
|
|
push @Log_buffer, $req->{params}; |
125
|
|
|
|
|
|
|
|
126
|
0
|
0
|
|
|
|
|
shift @Log_buffer if (@Log_buffer > $max_entries); |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
# Track number of collected log entries |
129
|
0
|
|
|
|
|
|
$worker->{notif_count}++; |
130
|
|
|
|
|
|
|
}, |
131
|
|
|
|
|
|
|
on_suback => sub { |
132
|
0
|
|
|
0
|
|
|
my ($success, $prop) = @_; |
133
|
0
|
0
|
|
|
|
|
die "Could not subscribe to log topic '$topic'" unless $success; |
134
|
|
|
|
|
|
|
}, |
135
|
0
|
|
|
|
|
|
); |
136
|
|
|
|
|
|
|
} |
137
|
|
|
|
|
|
|
} |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
sub _ping_backend_brokers { |
140
|
0
|
|
|
0
|
|
|
my $self = shift; |
141
|
|
|
|
|
|
|
|
142
|
0
|
|
|
|
|
|
foreach my $bus (@{$self->{_BUS_GROUP}}) { |
|
0
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
|
144
|
0
|
0
|
|
|
|
|
next unless $bus->{is_connected}; |
145
|
0
|
|
|
|
|
|
$bus->pingreq; |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
} |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
sub on_shutdown { |
150
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
151
|
|
|
|
|
|
|
|
152
|
0
|
|
|
|
|
|
foreach my $bus (@{$self->{_BUS_GROUP}}) { |
|
0
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
|
154
|
0
|
0
|
|
|
|
|
next unless $bus->{is_connected}; |
155
|
0
|
|
|
|
|
|
$bus->disconnect; |
156
|
|
|
|
|
|
|
} |
157
|
|
|
|
|
|
|
|
158
|
0
|
|
|
|
|
|
log_info "Stopped"; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
sub tail { |
162
|
0
|
|
|
0
|
0
|
|
my ($self, $params) = @_; |
163
|
|
|
|
|
|
|
|
164
|
0
|
|
|
|
|
|
foreach ('count','level','after') { |
165
|
0
|
0
|
|
|
|
|
next unless defined $params->{$_}; |
166
|
0
|
0
|
|
|
|
|
unless ($params->{$_} =~ m/^\d+(\.\d+)?$/) { |
167
|
0
|
|
|
|
|
|
die "Invalid parameter $_"; |
168
|
|
|
|
|
|
|
} |
169
|
|
|
|
|
|
|
} |
170
|
|
|
|
|
|
|
|
171
|
0
|
|
|
|
|
|
foreach ('host','pool','service','message') { |
172
|
0
|
0
|
|
|
|
|
next unless defined $params->{$_}; |
173
|
|
|
|
|
|
|
# Allow simple regexes |
174
|
0
|
0
|
|
|
|
|
unless ($params->{$_} =~ m/^[\w .*+?:,()\-\[\]\\]+$/) { |
175
|
0
|
|
|
|
|
|
die "Invalid parameter $_"; |
176
|
|
|
|
|
|
|
} |
177
|
|
|
|
|
|
|
} |
178
|
|
|
|
|
|
|
|
179
|
0
|
|
0
|
|
|
|
my $count = $params->{count} || 10; |
180
|
0
|
|
|
|
|
|
my $after = $params->{after}; |
181
|
0
|
|
|
|
|
|
my $level = $params->{level}; |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
# This will die when an invalid regex is provided, but that's fine |
184
|
0
|
0
|
|
|
|
|
my $host_re = defined $params->{host} ? qr/$params->{host}/i : undef; |
185
|
0
|
0
|
|
|
|
|
my $pool_re = defined $params->{pool} ? qr/$params->{pool}/i : undef; |
186
|
0
|
0
|
|
|
|
|
my $svc_re = defined $params->{service} ? qr/$params->{service}/i : undef; |
187
|
0
|
0
|
|
|
|
|
my $msg_re = defined $params->{message} ? qr/$params->{message}/i : undef; |
188
|
|
|
|
|
|
|
|
189
|
0
|
|
|
|
|
|
my ($entry, @filtered); |
190
|
|
|
|
|
|
|
|
191
|
0
|
|
|
|
|
|
for (my $i = @Log_buffer - 1; $i >= 0; $i--) { |
192
|
|
|
|
|
|
|
|
193
|
0
|
|
|
|
|
|
$entry = $Log_buffer[$i]; |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
next if (defined $level && $entry->{level} > $level ) || |
196
|
|
|
|
|
|
|
(defined $after && $entry->{tstamp} <= $after ) || |
197
|
|
|
|
|
|
|
(defined $host_re && $entry->{host} !~ $host_re ) || |
198
|
|
|
|
|
|
|
(defined $pool_re && $entry->{pool} !~ $pool_re ) || |
199
|
|
|
|
|
|
|
(defined $svc_re && $entry->{service} !~ $svc_re ) || |
200
|
0
|
0
|
0
|
|
|
|
(defined $msg_re && $entry->{message} !~ $msg_re ); |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
201
|
|
|
|
|
|
|
|
202
|
0
|
|
|
|
|
|
unshift @filtered, $entry; |
203
|
|
|
|
|
|
|
|
204
|
0
|
0
|
|
|
|
|
last if (@filtered >= $count); |
205
|
|
|
|
|
|
|
} |
206
|
|
|
|
|
|
|
|
207
|
0
|
|
|
|
|
|
return \@filtered; |
208
|
|
|
|
|
|
|
} |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
1; |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
__END__ |