line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Beekeeper::Service::LogTail::Worker; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
1006
|
use strict; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
31
|
|
4
|
1
|
|
|
1
|
|
5
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
41
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.08'; |
7
|
|
|
|
|
|
|
|
8
|
1
|
|
|
1
|
|
6
|
use Beekeeper::Worker ':log'; |
|
1
|
|
|
|
|
1
|
|
|
1
|
|
|
|
|
162
|
|
9
|
1
|
|
|
1
|
|
8
|
use base 'Beekeeper::Worker'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
72
|
|
10
|
|
|
|
|
|
|
|
11
|
1
|
|
|
1
|
|
6
|
use Beekeeper::Logger ':log_levels'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
147
|
|
12
|
1
|
|
|
1
|
|
8
|
use Scalar::Util 'weaken'; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
48
|
|
13
|
1
|
|
|
1
|
|
6
|
use JSON::XS; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
1353
|
|
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
my @Log_buffer; |
16
|
|
|
|
|
|
|
|
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub authorize_request { |
19
|
0
|
|
|
0
|
1
|
|
my ($self, $req) = @_; |
20
|
|
|
|
|
|
|
|
21
|
0
|
0
|
|
|
|
|
return unless $self->__has_authorization_token('BKPR_ADMIN'); |
22
|
|
|
|
|
|
|
|
23
|
0
|
|
|
|
|
|
return BKPR_REQUEST_AUTHORIZED; |
24
|
|
|
|
|
|
|
} |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
sub on_startup { |
27
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
28
|
|
|
|
|
|
|
|
29
|
0
|
|
0
|
|
|
|
$self->{max_entries} = $self->{config}->{buffer_entries} || 20000; |
30
|
0
|
|
0
|
|
|
|
$self->{log_level} = $self->{config}->{log_level} || LOG_DEBUG; |
31
|
|
|
|
|
|
|
|
32
|
0
|
|
|
|
|
|
$self->_connect_to_all_brokers; |
33
|
|
|
|
|
|
|
|
34
|
0
|
|
|
|
|
|
$self->accept_remote_calls( |
35
|
|
|
|
|
|
|
'_bkpr.logtail.tail' => 'tail', |
36
|
|
|
|
|
|
|
); |
37
|
|
|
|
|
|
|
|
38
|
0
|
|
|
|
|
|
log_info "Ready"; |
39
|
|
|
|
|
|
|
} |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
sub _connect_to_all_brokers { |
42
|
0
|
|
|
0
|
|
|
my $self = shift; |
43
|
0
|
|
|
|
|
|
weaken($self); |
44
|
|
|
|
|
|
|
|
45
|
0
|
|
|
|
|
|
my $own_bus = $self->{_BUS}; |
46
|
0
|
|
|
|
|
|
my $group_config = Beekeeper::Config->get_bus_group_config( bus_id => $own_bus->bus_id ); |
47
|
|
|
|
|
|
|
|
48
|
0
|
|
|
|
|
|
$self->{_BUS_GROUP} = []; |
49
|
|
|
|
|
|
|
|
50
|
0
|
|
|
|
|
|
foreach my $config (@$group_config) { |
51
|
|
|
|
|
|
|
|
52
|
0
|
|
|
|
|
|
my $bus_id = $config->{'bus_id'}; |
53
|
|
|
|
|
|
|
|
54
|
0
|
0
|
|
|
|
|
if ($bus_id eq $own_bus->bus_id) { |
55
|
|
|
|
|
|
|
# Already connected to our own bus |
56
|
0
|
|
|
|
|
|
$self->_collect_log($own_bus); |
57
|
0
|
|
|
|
|
|
next; |
58
|
|
|
|
|
|
|
} |
59
|
|
|
|
|
|
|
|
60
|
0
|
|
|
|
|
|
my $bus; $bus = Beekeeper::MQTT->new( |
61
|
|
|
|
|
|
|
%$config, |
62
|
|
|
|
|
|
|
bus_id => $bus_id, |
63
|
|
|
|
|
|
|
timeout => 300, |
64
|
|
|
|
|
|
|
on_connect => sub { |
65
|
|
|
|
|
|
|
# Setup subscriptions |
66
|
0
|
|
|
0
|
|
|
$self->_collect_log($bus); |
67
|
|
|
|
|
|
|
}, |
68
|
|
|
|
|
|
|
on_error => sub { |
69
|
|
|
|
|
|
|
# Reconnect |
70
|
0
|
|
0
|
0
|
|
|
my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg; |
|
0
|
|
|
|
|
|
|
71
|
0
|
|
|
|
|
|
log_error "Connection to $bus_id failed: $errmsg"; |
72
|
0
|
|
|
|
|
|
my $delay = $self->{connect_err}->{$bus_id}++; |
73
|
|
|
|
|
|
|
$self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer( |
74
|
|
|
|
|
|
|
after => ($delay < 10 ? $delay * 3 : 30), |
75
|
0
|
|
|
|
|
|
cb => sub { $bus->connect }, |
76
|
0
|
0
|
|
|
|
|
); |
77
|
|
|
|
|
|
|
}, |
78
|
0
|
|
|
|
|
|
); |
79
|
|
|
|
|
|
|
|
80
|
0
|
|
|
|
|
|
push @{$self->{_BUS_GROUP}}, $bus; |
|
0
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
|
82
|
0
|
|
|
|
|
|
$bus->connect; |
83
|
|
|
|
|
|
|
} |
84
|
|
|
|
|
|
|
} |
85
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
sub _collect_log { |
87
|
0
|
|
|
0
|
|
|
my ($self, $bus) = @_; |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
# Default logger logs to topic log/$level/$service |
90
|
|
|
|
|
|
|
|
91
|
0
|
|
|
|
|
|
my $max_entries = $self->{max_entries}; |
92
|
0
|
|
|
|
|
|
my $log_level = $self->{log_level}; |
93
|
0
|
|
|
|
|
|
my $worker = $self->{_WORKER}; |
94
|
|
|
|
|
|
|
|
95
|
0
|
|
|
|
|
|
foreach my $level (1..$log_level) { |
96
|
|
|
|
|
|
|
|
97
|
0
|
|
|
|
|
|
my $topic = "log/$level/#"; |
98
|
0
|
|
|
|
|
|
my $req; |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
$bus->subscribe( |
101
|
|
|
|
|
|
|
topic => $topic, |
102
|
|
|
|
|
|
|
on_publish => sub { |
103
|
|
|
|
|
|
|
# my ($payload_ref, $mqtt_properties) = @_; |
104
|
|
|
|
|
|
|
|
105
|
0
|
|
|
0
|
|
|
$req = decode_json( ${$_[0]} ); |
|
0
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
|
107
|
0
|
|
|
|
|
|
push @Log_buffer, $req->{params}; |
108
|
|
|
|
|
|
|
|
109
|
0
|
0
|
|
|
|
|
shift @Log_buffer if (@Log_buffer > $max_entries); |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
# Track number of collected log entries |
112
|
0
|
|
|
|
|
|
$worker->{notif_count}++; |
113
|
|
|
|
|
|
|
}, |
114
|
|
|
|
|
|
|
on_suback => sub { |
115
|
0
|
|
|
0
|
|
|
my ($success, $prop) = @_; |
116
|
0
|
0
|
|
|
|
|
die "Could not subscribe to log topic '$topic'" unless $success; |
117
|
|
|
|
|
|
|
}, |
118
|
0
|
|
|
|
|
|
); |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
} |
121
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
sub on_shutdown { |
123
|
0
|
|
|
0
|
1
|
|
my ($self, %args) = @_; |
124
|
|
|
|
|
|
|
|
125
|
0
|
|
|
|
|
|
foreach my $bus (@{$self->{_BUS_GROUP}}) { |
|
0
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
|
127
|
0
|
0
|
|
|
|
|
next unless ($bus->{is_connected}); |
128
|
0
|
|
|
|
|
|
$bus->disconnect; |
129
|
|
|
|
|
|
|
} |
130
|
|
|
|
|
|
|
|
131
|
0
|
|
|
|
|
|
log_info "Stopped"; |
132
|
|
|
|
|
|
|
} |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
sub tail { |
135
|
0
|
|
|
0
|
0
|
|
my ($self, $params) = @_; |
136
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
|
foreach ('count','level','after') { |
138
|
0
|
0
|
|
|
|
|
next unless defined $params->{$_}; |
139
|
0
|
0
|
|
|
|
|
unless ($params->{$_} =~ m/^\d+(\.\d+)?$/) { |
140
|
0
|
|
|
|
|
|
die "Invalid parameter $_"; |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
|
foreach ('host','pool','service','message') { |
145
|
0
|
0
|
|
|
|
|
next unless defined $params->{$_}; |
146
|
|
|
|
|
|
|
# Allow simple regexes |
147
|
0
|
0
|
|
|
|
|
unless ($params->{$_} =~ m/^[\w .*+?:,()\-\[\]\\]+$/) { |
148
|
0
|
|
|
|
|
|
die "Invalid parameter $_"; |
149
|
|
|
|
|
|
|
} |
150
|
|
|
|
|
|
|
} |
151
|
|
|
|
|
|
|
|
152
|
0
|
|
0
|
|
|
|
my $count = $params->{count} || 10; |
153
|
0
|
|
|
|
|
|
my $after = $params->{after}; |
154
|
0
|
|
|
|
|
|
my $level = $params->{level}; |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
# This will die when an invalid regex is provided, but that's fine |
157
|
0
|
0
|
|
|
|
|
my $host_re = defined $params->{host} ? qr/$params->{host}/i : undef; |
158
|
0
|
0
|
|
|
|
|
my $pool_re = defined $params->{pool} ? qr/$params->{pool}/i : undef; |
159
|
0
|
0
|
|
|
|
|
my $svc_re = defined $params->{service} ? qr/$params->{service}/i : undef; |
160
|
0
|
0
|
|
|
|
|
my $msg_re = defined $params->{message} ? qr/$params->{message}/i : undef; |
161
|
|
|
|
|
|
|
|
162
|
0
|
|
|
|
|
|
my ($entry, @filtered); |
163
|
|
|
|
|
|
|
|
164
|
0
|
|
|
|
|
|
for (my $i = @Log_buffer - 1; $i >= 0; $i--) { |
165
|
|
|
|
|
|
|
|
166
|
0
|
|
|
|
|
|
$entry = $Log_buffer[$i]; |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
next if (defined $level && $entry->{level} > $level ) || |
169
|
|
|
|
|
|
|
(defined $after && $entry->{tstamp} <= $after ) || |
170
|
|
|
|
|
|
|
(defined $host_re && $entry->{host} !~ $host_re ) || |
171
|
|
|
|
|
|
|
(defined $pool_re && $entry->{pool} !~ $pool_re ) || |
172
|
|
|
|
|
|
|
(defined $svc_re && $entry->{service} !~ $svc_re ) || |
173
|
0
|
0
|
0
|
|
|
|
(defined $msg_re && $entry->{message} !~ $msg_re ); |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
174
|
|
|
|
|
|
|
|
175
|
0
|
|
|
|
|
|
unshift @filtered, $entry; |
176
|
|
|
|
|
|
|
|
177
|
0
|
0
|
|
|
|
|
last if (@filtered >= $count); |
178
|
|
|
|
|
|
|
} |
179
|
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
|
return \@filtered; |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
1; |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
__END__ |