File Coverage

blib/lib/Async/Redis/Telemetry.pm
Criterion Covered Total %
statement 21 118 17.8
branch 5 72 6.9
condition 8 27 29.6
subroutine 6 19 31.5
pod 0 12 0.0
total 40 248 16.1


line stmt bran cond sub pod time code
1             package Async::Redis::Telemetry;
2              
3 92     92   135818 use strict;
  92         152  
  92         3098  
4 92     92   336 use warnings;
  92         136  
  92         3743  
5 92     92   1204 use 5.018;
  92         256  
6              
7 92     92   3123 use Time::HiRes qw(time);
  92         1825  
  92         3573  
8              
9             # Commands with sensitive arguments that need redaction
10             our %REDACT_RULES = (
11             AUTH => sub {
12             my (@args) = @_;
13             if (@args == 1) {
14             # AUTH password
15             return ('[REDACTED]');
16             }
17             elsif (@args >= 2) {
18             # AUTH username password
19             return ($args[0], '[REDACTED]');
20             }
21             return @args;
22             },
23              
24             CONFIG => sub {
25             my (@args) = @_;
26             return @args unless @args >= 3;
27              
28             my $subcommand = uc($args[0] // '');
29             if ($subcommand eq 'SET') {
30             my $param = lc($args[1] // '');
31             if ($param =~ /^(requirepass|masterauth|masteruser|user)$/) {
32             return ($args[0], $args[1], '[REDACTED]');
33             }
34             }
35             return @args;
36             },
37              
38             MIGRATE => sub {
39             my (@args) = @_;
40             my @result;
41              
42             for (my $i = 0; $i <= $#args; $i++) {
43             my $arg = $args[$i];
44             my $uc_arg = uc($arg // '');
45              
46             if ($uc_arg eq 'AUTH' && defined $args[$i + 1]) {
47             push @result, $arg, '[REDACTED]';
48             $i++; # Skip password
49             }
50             elsif ($uc_arg eq 'AUTH2' && defined $args[$i + 1] && defined $args[$i + 2]) {
51             push @result, $arg, $args[$i + 1], '[REDACTED]';
52             $i += 2; # Skip username and password
53             }
54             else {
55             push @result, $arg;
56             }
57             }
58              
59             return @result;
60             },
61              
62             HELLO => sub {
63             my (@args) = @_;
64             my @result;
65              
66             for (my $i = 0; $i <= $#args; $i++) {
67             my $arg = $args[$i];
68             my $uc_arg = uc($arg // '');
69              
70             if ($uc_arg eq 'AUTH' && defined $args[$i + 1] && defined $args[$i + 2]) {
71             push @result, $arg, $args[$i + 1], '[REDACTED]';
72             $i += 2; # Skip username and password
73             }
74             else {
75             push @result, $arg;
76             }
77             }
78              
79             return @result;
80             },
81              
82             ACL => sub {
83             my (@args) = @_;
84             return @args unless @args >= 1;
85              
86             my $subcommand = uc($args[0] // '');
87             if ($subcommand eq 'SETUSER' && @args >= 3) {
88             # ACL SETUSER username ...rules...
89             # Redact any >password patterns
90             my @result = ($args[0], $args[1]);
91             for my $i (2 .. $#args) {
92             if ($args[$i] =~ /^>/) {
93             push @result, '>[REDACTED]';
94             }
95             else {
96             push @result, $args[$i];
97             }
98             }
99             return @result;
100             }
101             return @args;
102             },
103             );
104              
105             # Format command for logging with redaction
106             sub format_command_for_log {
107 0     0 0 0 my (@cmd) = @_;
108              
109 0 0       0 return '' unless @cmd;
110              
111 0   0     0 my $name = uc($cmd[0] // '');
112 0         0 my @args = @cmd[1 .. $#cmd];
113              
114 0 0       0 if (my $redactor = $REDACT_RULES{$name}) {
115 0         0 @args = $redactor->(@args);
116             }
117              
118 0         0 return join(' ', $name, @args);
119             }
120              
121             # Format command for OTel span (same redaction, optional args)
122             sub format_command_for_span {
123 3     3 0 4159 my ($include_args, $redact, @cmd) = @_;
124              
125 3 50       8 return '' unless @cmd;
126              
127 3   50     8 my $name = uc($cmd[0] // '');
128              
129 3 100       9 return $name unless $include_args;
130              
131 2         7 my @args = @cmd[1 .. $#cmd];
132              
133 2 100 66     14 if ($redact && (my $redactor = $REDACT_RULES{$name})) {
134 1         4 @args = $redactor->(@args);
135             }
136              
137 2         8 return join(' ', $name, @args);
138             }
139              
140             #
141             # OpenTelemetry Integration
142             #
143              
144             sub new {
145 1     1 0 143444 my ($class, %args) = @_;
146              
147             return bless {
148             tracer => $args{tracer}, # OTel tracer
149             meter => $args{meter}, # OTel meter
150             debug => $args{debug}, # Debug logger
151             include_args => $args{include_args} // 0,
152             redact => $args{redact} // 1,
153             host => $args{host} // 'localhost',
154             port => $args{port} // 6379,
155 1   50     23 database => $args{database} // 0,
      50        
      50        
      50        
      50        
156              
157             # Metrics (lazy-initialized)
158             _commands_counter => undef,
159             _commands_histogram => undef,
160             _connections_gauge => undef,
161             _errors_counter => undef,
162             _reconnects_counter => undef,
163             _pipeline_histogram => undef,
164             }, $class;
165             }
166              
167             # Initialize metrics (call after meter is set)
168             sub _init_metrics {
169 0     0     my ($self) = @_;
170              
171 0 0         return unless $self->{meter};
172 0 0         return if $self->{_metrics_initialized};
173              
174 0           my $meter = $self->{meter};
175              
176 0           $self->{_commands_counter} = $meter->create_counter(
177             name => 'redis.commands.total',
178             description => 'Total Redis commands executed',
179             unit => '1',
180             );
181              
182 0           $self->{_commands_histogram} = $meter->create_histogram(
183             name => 'redis.commands.duration',
184             description => 'Redis command latency',
185             unit => 'ms',
186             );
187              
188 0           $self->{_connections_gauge} = $meter->create_up_down_counter(
189             name => 'redis.connections.active',
190             description => 'Current active connections',
191             unit => '1',
192             );
193              
194 0           $self->{_errors_counter} = $meter->create_counter(
195             name => 'redis.errors.total',
196             description => 'Total Redis errors by type',
197             unit => '1',
198             );
199              
200 0           $self->{_reconnects_counter} = $meter->create_counter(
201             name => 'redis.reconnects.total',
202             description => 'Total reconnection attempts',
203             unit => '1',
204             );
205              
206 0           $self->{_pipeline_histogram} = $meter->create_histogram(
207             name => 'redis.pipeline.size',
208             description => 'Commands per pipeline',
209             unit => '1',
210             );
211              
212 0           $self->{_metrics_initialized} = 1;
213             }
214              
215             # Start a span for a command
216             sub start_command_span {
217 0     0 0   my ($self, @cmd) = @_;
218              
219 0   0       my $command_name = uc($cmd[0] // 'UNKNOWN');
220 0           my $span;
221              
222 0 0         if ($self->{tracer}) {
223 0           my $span_name = "redis.$command_name";
224              
225             my $statement = format_command_for_span(
226             $self->{include_args},
227             $self->{redact},
228             @cmd
229 0           );
230              
231             $span = $self->{tracer}->create_span(
232             name => $span_name,
233             kind => 'client',
234             attributes => {
235             'db.system' => 'redis',
236             'db.operation' => $command_name,
237             'db.statement' => $statement,
238             'net.peer.name' => $self->{host},
239             'net.peer.port' => $self->{port},
240             'db.redis.database_index' => $self->{database},
241             },
242 0           );
243             }
244              
245             # Always return a context for metrics tracking
246             return {
247 0           span => $span,
248             start_time => time(),
249             command => $command_name,
250             };
251             }
252              
253             # End a command span
254             sub end_command_span {
255 0     0 0   my ($self, $context, $error) = @_;
256              
257 0 0         return unless $context;
258              
259 0           my $elapsed_ms = (time() - $context->{start_time}) * 1000;
260              
261             # Record metrics (always, regardless of span)
262 0           $self->_record_command_metrics($context->{command}, $elapsed_ms, $error);
263              
264             # End span if present
265 0 0         if ($context->{span}) {
266 0 0         if ($error) {
267 0           $context->{span}->set_status('error', "$error");
268 0           $context->{span}->record_exception($error);
269             }
270 0           $context->{span}->end;
271             }
272             }
273              
274             # Record command metrics
275             sub _record_command_metrics {
276 0     0     my ($self, $command, $elapsed_ms, $error) = @_;
277              
278 0           $self->_init_metrics;
279              
280 0           my %labels = (command => $command);
281              
282 0 0         if ($self->{_commands_counter}) {
283 0           $self->{_commands_counter}->add(1, \%labels);
284             }
285              
286 0 0         if ($self->{_commands_histogram}) {
287 0           $self->{_commands_histogram}->record($elapsed_ms, \%labels);
288             }
289              
290 0 0 0       if ($error && $self->{_errors_counter}) {
291 0   0       my $error_type = ref($error) || 'unknown';
292 0           $error_type =~ s/.*:://; # Strip package prefix
293 0           $self->{_errors_counter}->add(1, { type => $error_type });
294             }
295             }
296              
297             # Record pipeline metrics
298             sub record_pipeline {
299 0     0 0   my ($self, $size, $elapsed_ms) = @_;
300              
301 0           $self->_init_metrics;
302              
303 0 0         if ($self->{_pipeline_histogram}) {
304 0           $self->{_pipeline_histogram}->record($size);
305             }
306              
307 0 0         if ($self->{_commands_histogram}) {
308 0           $self->{_commands_histogram}->record($elapsed_ms, { command => 'PIPELINE' });
309             }
310             }
311              
312             # Record connection event
313             sub record_connection {
314 0     0 0   my ($self, $delta) = @_;
315              
316 0           $self->_init_metrics;
317              
318 0 0         if ($self->{_connections_gauge}) {
319 0           $self->{_connections_gauge}->add($delta);
320             }
321             }
322              
323             # Record reconnection attempt
324             sub record_reconnect {
325 0     0 0   my ($self) = @_;
326              
327 0           $self->_init_metrics;
328              
329 0 0         if ($self->{_reconnects_counter}) {
330 0           $self->{_reconnects_counter}->add(1);
331             }
332             }
333              
334             #
335             # Debug Logging
336             #
337              
338             sub log_send {
339 0     0 0   my ($self, @cmd) = @_;
340              
341 0 0         return unless $self->{debug};
342              
343 0           my $formatted = format_command_for_log(@cmd);
344              
345 0 0         if (ref $self->{debug} eq 'CODE') {
346 0           $self->{debug}->('send', $formatted);
347             }
348             else {
349 0           warn "[REDIS SEND] $formatted\n";
350             }
351             }
352              
353             sub log_recv {
354 0     0 0   my ($self, $result, $elapsed_ms) = @_;
355              
356 0 0         return unless $self->{debug};
357              
358 0           my $summary = _summarize_result($result);
359 0           my $msg = sprintf("[REDIS RECV] %s (%.2fms)", $summary, $elapsed_ms);
360              
361 0 0         if (ref $self->{debug} eq 'CODE') {
362 0           $self->{debug}->('recv', $msg);
363             }
364             else {
365 0           warn "$msg\n";
366             }
367             }
368              
369             sub log_error {
370 0     0 0   my ($self, $error) = @_;
371              
372 0 0         return unless $self->{debug};
373              
374 0           my $msg = "[REDIS ERROR] $error";
375              
376 0 0         if (ref $self->{debug} eq 'CODE') {
377 0           $self->{debug}->('error', $msg);
378             }
379             else {
380 0           warn "$msg\n";
381             }
382             }
383              
384             sub log_event {
385 0     0 0   my ($self, $event, $details) = @_;
386              
387 0 0         return unless $self->{debug};
388              
389 0 0         my $msg = "[REDIS EVENT] $event" . ($details ? ": $details" : '');
390              
391 0 0         if (ref $self->{debug} eq 'CODE') {
392 0           $self->{debug}->('event', $msg);
393             }
394             else {
395 0           warn "$msg\n";
396             }
397             }
398              
399             # Summarize result without exposing values
400             sub _summarize_result {
401 0     0     my ($result) = @_;
402              
403 0 0         return 'nil' unless defined $result;
404              
405 0 0 0       if (ref $result eq 'ARRAY') {
    0          
    0          
    0          
406 0           return 'array[' . scalar(@$result) . ']';
407             }
408             elsif (ref $result eq 'HASH') {
409 0           return 'hash{' . scalar(keys %$result) . '}';
410             }
411             elsif (ref $result && $result->can('message')) {
412 0           return 'error: ' . $result->message;
413             }
414             elsif (length($result) > 100) {
415 0           return 'string[' . length($result) . ' bytes]';
416             }
417             else {
418             # Short strings are OK to show type
419 0 0         return 'OK' if $result eq 'OK';
420 0 0         return 'PONG' if $result eq 'PONG';
421 0 0         return 'QUEUED' if $result eq 'QUEUED';
422 0 0         return 'integer' if $result =~ /^-?\d+$/;
423 0           return 'string[' . length($result) . ']';
424             }
425             }
426              
427             1;
428              
429             __END__