File Coverage

blib/lib/Async/Redis/Telemetry.pm
Criterion Covered Total %
statement 11 118 9.3
branch 0 72 0.0
condition 0 27 0.0
subroutine 4 19 21.0
pod 0 12 0.0
total 15 248 6.0


line stmt bran cond sub pod time code
1             package Async::Redis::Telemetry;
2              
3 73     73   508 use strict;
  73         160  
  73         3383  
4 73     73   393 use warnings;
  73         138  
  73         4297  
5 73     73   1269 use 5.018;
  73         271  
6              
7 73     73   542 use Time::HiRes qw(time);
  73         140  
  73         783  
8              
9             our $VERSION = '0.001';
10              
11             # Commands with sensitive arguments that need redaction
12             our %REDACT_RULES = (
13             AUTH => sub {
14             my (@args) = @_;
15             if (@args == 1) {
16             # AUTH password
17             return ('[REDACTED]');
18             }
19             elsif (@args >= 2) {
20             # AUTH username password
21             return ($args[0], '[REDACTED]');
22             }
23             return @args;
24             },
25              
26             CONFIG => sub {
27             my (@args) = @_;
28             return @args unless @args >= 3;
29              
30             my $subcommand = uc($args[0] // '');
31             if ($subcommand eq 'SET') {
32             my $param = lc($args[1] // '');
33             if ($param =~ /^(requirepass|masterauth|masteruser|user)$/) {
34             return ($args[0], $args[1], '[REDACTED]');
35             }
36             }
37             return @args;
38             },
39              
40             MIGRATE => sub {
41             my (@args) = @_;
42             my @result;
43              
44             for (my $i = 0; $i <= $#args; $i++) {
45             my $arg = $args[$i];
46             my $uc_arg = uc($arg // '');
47              
48             if ($uc_arg eq 'AUTH' && defined $args[$i + 1]) {
49             push @result, $arg, '[REDACTED]';
50             $i++; # Skip password
51             }
52             elsif ($uc_arg eq 'AUTH2' && defined $args[$i + 1] && defined $args[$i + 2]) {
53             push @result, $arg, $args[$i + 1], '[REDACTED]';
54             $i += 2; # Skip username and password
55             }
56             else {
57             push @result, $arg;
58             }
59             }
60              
61             return @result;
62             },
63              
64             HELLO => sub {
65             my (@args) = @_;
66             my @result;
67              
68             for (my $i = 0; $i <= $#args; $i++) {
69             my $arg = $args[$i];
70             my $uc_arg = uc($arg // '');
71              
72             if ($uc_arg eq 'AUTH' && defined $args[$i + 1] && defined $args[$i + 2]) {
73             push @result, $arg, $args[$i + 1], '[REDACTED]';
74             $i += 2; # Skip username and password
75             }
76             else {
77             push @result, $arg;
78             }
79             }
80              
81             return @result;
82             },
83              
84             ACL => sub {
85             my (@args) = @_;
86             return @args unless @args >= 1;
87              
88             my $subcommand = uc($args[0] // '');
89             if ($subcommand eq 'SETUSER' && @args >= 3) {
90             # ACL SETUSER username ...rules...
91             # Redact any >password patterns
92             my @result = ($args[0], $args[1]);
93             for my $i (2 .. $#args) {
94             if ($args[$i] =~ /^>/) {
95             push @result, '>[REDACTED]';
96             }
97             else {
98             push @result, $args[$i];
99             }
100             }
101             return @result;
102             }
103             return @args;
104             },
105             );
106              
107             # Format command for logging with redaction
108             sub format_command_for_log {
109 0     0 0   my (@cmd) = @_;
110              
111 0 0         return '' unless @cmd;
112              
113 0   0       my $name = uc($cmd[0] // '');
114 0           my @args = @cmd[1 .. $#cmd];
115              
116 0 0         if (my $redactor = $REDACT_RULES{$name}) {
117 0           @args = $redactor->(@args);
118             }
119              
120 0           return join(' ', $name, @args);
121             }
122              
123             # Format command for OTel span (same redaction, optional args)
124             sub format_command_for_span {
125 0     0 0   my ($include_args, $redact, @cmd) = @_;
126              
127 0 0         return '' unless @cmd;
128              
129 0   0       my $name = uc($cmd[0] // '');
130              
131 0 0         return $name unless $include_args;
132              
133 0           my @args = @cmd[1 .. $#cmd];
134              
135 0 0 0       if ($redact && (my $redactor = $REDACT_RULES{$name})) {
136 0           @args = $redactor->(@args);
137             }
138              
139 0           return join(' ', $name, @args);
140             }
141              
142             #
143             # OpenTelemetry Integration
144             #
145              
146             sub new {
147 0     0 0   my ($class, %args) = @_;
148              
149             return bless {
150             tracer => $args{tracer}, # OTel tracer
151             meter => $args{meter}, # OTel meter
152             debug => $args{debug}, # Debug logger
153             include_args => $args{include_args} // 1,
154             redact => $args{redact} // 1,
155             host => $args{host} // 'localhost',
156             port => $args{port} // 6379,
157 0   0       database => $args{database} // 0,
      0        
      0        
      0        
      0        
158              
159             # Metrics (lazy-initialized)
160             _commands_counter => undef,
161             _commands_histogram => undef,
162             _connections_gauge => undef,
163             _errors_counter => undef,
164             _reconnects_counter => undef,
165             _pipeline_histogram => undef,
166             }, $class;
167             }
168              
169             # Initialize metrics (call after meter is set)
170             sub _init_metrics {
171 0     0     my ($self) = @_;
172              
173 0 0         return unless $self->{meter};
174 0 0         return if $self->{_metrics_initialized};
175              
176 0           my $meter = $self->{meter};
177              
178 0           $self->{_commands_counter} = $meter->create_counter(
179             name => 'redis.commands.total',
180             description => 'Total Redis commands executed',
181             unit => '1',
182             );
183              
184 0           $self->{_commands_histogram} = $meter->create_histogram(
185             name => 'redis.commands.duration',
186             description => 'Redis command latency',
187             unit => 'ms',
188             );
189              
190 0           $self->{_connections_gauge} = $meter->create_up_down_counter(
191             name => 'redis.connections.active',
192             description => 'Current active connections',
193             unit => '1',
194             );
195              
196 0           $self->{_errors_counter} = $meter->create_counter(
197             name => 'redis.errors.total',
198             description => 'Total Redis errors by type',
199             unit => '1',
200             );
201              
202 0           $self->{_reconnects_counter} = $meter->create_counter(
203             name => 'redis.reconnects.total',
204             description => 'Total reconnection attempts',
205             unit => '1',
206             );
207              
208 0           $self->{_pipeline_histogram} = $meter->create_histogram(
209             name => 'redis.pipeline.size',
210             description => 'Commands per pipeline',
211             unit => '1',
212             );
213              
214 0           $self->{_metrics_initialized} = 1;
215             }
216              
217             # Start a span for a command
218             sub start_command_span {
219 0     0 0   my ($self, @cmd) = @_;
220              
221 0   0       my $command_name = uc($cmd[0] // 'UNKNOWN');
222 0           my $span;
223              
224 0 0         if ($self->{tracer}) {
225 0           my $span_name = "redis.$command_name";
226              
227             my $statement = format_command_for_span(
228             $self->{include_args},
229             $self->{redact},
230             @cmd
231 0           );
232              
233             $span = $self->{tracer}->create_span(
234             name => $span_name,
235             kind => 'client',
236             attributes => {
237             'db.system' => 'redis',
238             'db.operation' => $command_name,
239             'db.statement' => $statement,
240             'net.peer.name' => $self->{host},
241             'net.peer.port' => $self->{port},
242             'db.redis.database_index' => $self->{database},
243             },
244 0           );
245             }
246              
247             # Always return a context for metrics tracking
248             return {
249 0           span => $span,
250             start_time => time(),
251             command => $command_name,
252             };
253             }
254              
255             # End a command span
256             sub end_command_span {
257 0     0 0   my ($self, $context, $error) = @_;
258              
259 0 0         return unless $context;
260              
261 0           my $elapsed_ms = (time() - $context->{start_time}) * 1000;
262              
263             # Record metrics (always, regardless of span)
264 0           $self->_record_command_metrics($context->{command}, $elapsed_ms, $error);
265              
266             # End span if present
267 0 0         if ($context->{span}) {
268 0 0         if ($error) {
269 0           $context->{span}->set_status('error', "$error");
270 0           $context->{span}->record_exception($error);
271             }
272 0           $context->{span}->end;
273             }
274             }
275              
276             # Record command metrics
277             sub _record_command_metrics {
278 0     0     my ($self, $command, $elapsed_ms, $error) = @_;
279              
280 0           $self->_init_metrics;
281              
282 0           my %labels = (command => $command);
283              
284 0 0         if ($self->{_commands_counter}) {
285 0           $self->{_commands_counter}->add(1, \%labels);
286             }
287              
288 0 0         if ($self->{_commands_histogram}) {
289 0           $self->{_commands_histogram}->record($elapsed_ms, \%labels);
290             }
291              
292 0 0 0       if ($error && $self->{_errors_counter}) {
293 0   0       my $error_type = ref($error) || 'unknown';
294 0           $error_type =~ s/.*:://; # Strip package prefix
295 0           $self->{_errors_counter}->add(1, { type => $error_type });
296             }
297             }
298              
299             # Record pipeline metrics
300             sub record_pipeline {
301 0     0 0   my ($self, $size, $elapsed_ms) = @_;
302              
303 0           $self->_init_metrics;
304              
305 0 0         if ($self->{_pipeline_histogram}) {
306 0           $self->{_pipeline_histogram}->record($size);
307             }
308              
309 0 0         if ($self->{_commands_histogram}) {
310 0           $self->{_commands_histogram}->record($elapsed_ms, { command => 'PIPELINE' });
311             }
312             }
313              
314             # Record connection event
315             sub record_connection {
316 0     0 0   my ($self, $delta) = @_;
317              
318 0           $self->_init_metrics;
319              
320 0 0         if ($self->{_connections_gauge}) {
321 0           $self->{_connections_gauge}->add($delta);
322             }
323             }
324              
325             # Record reconnection attempt
326             sub record_reconnect {
327 0     0 0   my ($self) = @_;
328              
329 0           $self->_init_metrics;
330              
331 0 0         if ($self->{_reconnects_counter}) {
332 0           $self->{_reconnects_counter}->add(1);
333             }
334             }
335              
336             #
337             # Debug Logging
338             #
339              
340             sub log_send {
341 0     0 0   my ($self, @cmd) = @_;
342              
343 0 0         return unless $self->{debug};
344              
345 0           my $formatted = format_command_for_log(@cmd);
346              
347 0 0         if (ref $self->{debug} eq 'CODE') {
348 0           $self->{debug}->('send', $formatted);
349             }
350             else {
351 0           warn "[REDIS SEND] $formatted\n";
352             }
353             }
354              
355             sub log_recv {
356 0     0 0   my ($self, $result, $elapsed_ms) = @_;
357              
358 0 0         return unless $self->{debug};
359              
360 0           my $summary = _summarize_result($result);
361 0           my $msg = sprintf("[REDIS RECV] %s (%.2fms)", $summary, $elapsed_ms);
362              
363 0 0         if (ref $self->{debug} eq 'CODE') {
364 0           $self->{debug}->('recv', $msg);
365             }
366             else {
367 0           warn "$msg\n";
368             }
369             }
370              
371             sub log_error {
372 0     0 0   my ($self, $error) = @_;
373              
374 0 0         return unless $self->{debug};
375              
376 0           my $msg = "[REDIS ERROR] $error";
377              
378 0 0         if (ref $self->{debug} eq 'CODE') {
379 0           $self->{debug}->('error', $msg);
380             }
381             else {
382 0           warn "$msg\n";
383             }
384             }
385              
386             sub log_event {
387 0     0 0   my ($self, $event, $details) = @_;
388              
389 0 0         return unless $self->{debug};
390              
391 0 0         my $msg = "[REDIS EVENT] $event" . ($details ? ": $details" : '');
392              
393 0 0         if (ref $self->{debug} eq 'CODE') {
394 0           $self->{debug}->('event', $msg);
395             }
396             else {
397 0           warn "$msg\n";
398             }
399             }
400              
401             # Summarize result without exposing values
402             sub _summarize_result {
403 0     0     my ($result) = @_;
404              
405 0 0         return 'nil' unless defined $result;
406              
407 0 0 0       if (ref $result eq 'ARRAY') {
    0          
    0          
    0          
408 0           return 'array[' . scalar(@$result) . ']';
409             }
410             elsif (ref $result eq 'HASH') {
411 0           return 'hash{' . scalar(keys %$result) . '}';
412             }
413             elsif (ref $result && $result->can('message')) {
414 0           return 'error: ' . $result->message;
415             }
416             elsif (length($result) > 100) {
417 0           return 'string[' . length($result) . ' bytes]';
418             }
419             else {
420             # Short strings are OK to show type
421 0 0         return 'OK' if $result eq 'OK';
422 0 0         return 'PONG' if $result eq 'PONG';
423 0 0         return 'QUEUED' if $result eq 'QUEUED';
424 0 0         return 'integer' if $result =~ /^-?\d+$/;
425 0           return 'string[' . length($result) . ']';
426             }
427             }
428              
429             1;
430              
431             __END__