File Coverage

blib/lib/Async/Redis.pm
Criterion Covered Total %
statement 102 747 13.6
branch 9 270 3.3
condition 17 133 12.7
subroutine 30 107 28.0
pod 18 49 36.7
total 176 1306 13.4


line stmt bran cond sub pod time code
1             package Async::Redis;
2              
3 68     68   28185543 use strict;
  68         160  
  68         3376  
4 68     68   498 use warnings;
  68         187  
  68         4527  
5 68     68   1302 use 5.018;
  68         306  
6              
7             our $VERSION = '0.001004';
8              
9 68     68   392 use Future;
  68         189  
  68         2064  
10 68     68   353 use Future::AsyncAwait;
  68         126  
  68         626  
11 68     68   4573 use Future::IO 0.19;
  68         1427  
  68         4514  
12 68     68   596 use Socket qw(pack_sockaddr_in inet_aton AF_INET SOCK_STREAM);
  68         271  
  68         5589  
13 68     68   45854 use IO::Socket::INET;
  68         1038855  
  68         528  
14 68     68   41203 use Time::HiRes ();
  68         157  
  68         1868  
15              
16             # Error classes
17 68     68   42741 use Async::Redis::Error::Connection;
  68         222  
  68         3708  
18 68     68   35065 use Async::Redis::Error::Timeout;
  68         229  
  68         3015  
19 68     68   36493 use Async::Redis::Error::Disconnected;
  68         268  
  68         2990  
20 68     68   35764 use Async::Redis::Error::Redis;
  68         246  
  68         26911  
21 68     68   34813 use Async::Redis::Error::Protocol;
  68         223  
  68         3234  
22              
23             # Import auto-generated command methods
24 68     68   54307 use Async::Redis::Commands;
  68         364  
  68         6733  
25             our @ISA = qw(Async::Redis::Commands);
26              
27             # Key extraction for prefixing
28 68     68   53014 use Async::Redis::KeyExtractor;
  68         281  
  68         5135  
29              
30             # Transaction support
31 68     68   41903 use Async::Redis::Transaction;
  68         307  
  68         4355  
32              
33             # Script support
34 68     68   38780 use Async::Redis::Script;
  68         260  
  68         5089  
35              
36             # Iterator support
37 68     68   35349 use Async::Redis::Iterator;
  68         291  
  68         4958  
38              
39             # Pipeline support
40 68     68   36706 use Async::Redis::Pipeline;
  68         358  
  68         6185  
41 68     68   48165 use Async::Redis::AutoPipeline;
  68         268  
  68         5178  
42              
43             # PubSub support
44 68     68   38618 use Async::Redis::Subscription;
  68         302  
  68         6761  
45              
46             # Telemetry support
47 68     68   55705 use Async::Redis::Telemetry;
  68         257  
  68         6608  
48              
49             # Try XS version first, fall back to pure Perl
50             BEGIN {
51 68 50   68   230 eval { require Protocol::Redis::XS; 1 }
  68         42621  
  68         1645703  
52             or require Protocol::Redis;
53             }
54              
55             sub _parser_class {
56 0 0   0   0 return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
57             }
58              
59             sub _calculate_backoff {
60 0     0   0 my ($self, $attempt) = @_;
61              
62             # Exponential: delay * 2^(attempt-1)
63 0         0 my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
64              
65             # Cap at max
66 0 0       0 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
67              
68             # Apply jitter: delay * (1 +/- jitter)
69 0 0       0 if ($self->{reconnect_jitter} > 0) {
70 0         0 my $jitter_range = $delay * $self->{reconnect_jitter};
71 0         0 my $jitter = (rand(2) - 1) * $jitter_range;
72 0         0 $delay += $jitter;
73             }
74              
75 0         0 return $delay;
76             }
77              
78             sub new {
79 70     70 1 6211483 my ($class, %args) = @_;
80              
81             # Parse URI if provided
82 70 50       411 if ($args{uri}) {
83 0         0 require Async::Redis::URI;
84 0         0 my $uri = Async::Redis::URI->parse($args{uri});
85 0 0       0 if ($uri) {
86 0         0 my %uri_args = $uri->to_hash;
87             # URI values are defaults, explicit args override
88 0         0 %args = (%uri_args, %args);
89 0         0 delete $args{uri}; # don't store the string
90             }
91             }
92              
93             my $self = bless {
94             host => $args{host} // 'localhost',
95             port => $args{port} // 6379,
96             socket => undef,
97             parser => undef,
98             connected => 0,
99              
100             # Timeout settings
101             connect_timeout => $args{connect_timeout} // 10,
102             request_timeout => $args{request_timeout} // 5,
103             blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
104              
105             # Inflight tracking with deadlines
106             # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
107             inflight => [],
108              
109             # Response reader synchronization
110             _reading_responses => 0,
111              
112             # Reconnection settings
113             reconnect => $args{reconnect} // 0,
114             reconnect_delay => $args{reconnect_delay} // 0.1,
115             reconnect_delay_max => $args{reconnect_delay_max} // 60,
116             reconnect_jitter => $args{reconnect_jitter} // 0.25,
117             _reconnect_attempt => 0,
118              
119             # Callbacks
120             on_connect => $args{on_connect},
121             on_disconnect => $args{on_disconnect},
122             on_error => $args{on_error},
123              
124             # Authentication
125             password => $args{password},
126             username => $args{username},
127             database => $args{database} // 0,
128             client_name => $args{client_name},
129              
130             # TLS
131             tls => $args{tls},
132              
133             # Key prefixing
134             prefix => $args{prefix},
135              
136             # Pipeline settings
137             pipeline_depth => $args{pipeline_depth} // 10000,
138             auto_pipeline => $args{auto_pipeline} // 0,
139              
140             # Transaction state
141             in_multi => 0,
142             watching => 0,
143              
144             # PubSub state
145             in_pubsub => 0,
146             _subscription => undef,
147             _pump_running => 0,
148              
149             # Fork safety
150             _pid => $$,
151              
152             # Script registry
153             _scripts => {},
154              
155             # Current read future for clean disconnect cancellation
156             _current_read_future => undef,
157              
158             # Telemetry options
159             debug => $args{debug},
160             otel_tracer => $args{otel_tracer},
161             otel_meter => $args{otel_meter},
162             otel_include_args => $args{otel_include_args} // 1,
163 70   50     6883 otel_redact => $args{otel_redact} // 1,
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
      50        
164             }, $class;
165              
166             # Initialize telemetry if any observability enabled
167 70 50 33     1391 if ($self->{debug} || $self->{otel_tracer} || $self->{otel_meter}) {
      33        
168             $self->{_telemetry} = Async::Redis::Telemetry->new(
169             tracer => $self->{otel_tracer},
170             meter => $self->{otel_meter},
171             debug => $self->{debug},
172             include_args => $self->{otel_include_args},
173             redact => $self->{otel_redact},
174             host => $self->{host},
175             port => $self->{port},
176 0   0     0 database => $self->{database} // 0,
177             );
178             }
179              
180 70         381 return $self;
181             }
182              
183             # Connect to Redis server
184 70     70 1 1223 async sub connect {
185 70         229 my ($self) = @_;
186              
187 70 50       336 return $self if $self->{connected};
188              
189             # Create socket
190             my $socket = IO::Socket::INET->new(
191             Proto => 'tcp',
192             Blocking => 0,
193             ) or die Async::Redis::Error::Connection->new(
194             message => "Cannot create socket: $!",
195             host => $self->{host},
196             port => $self->{port},
197 70 50       891 );
198              
199             # Build sockaddr
200             my $addr = inet_aton($self->{host})
201             or die Async::Redis::Error::Connection->new(
202             message => "Cannot resolve host: $self->{host}",
203             host => $self->{host},
204             port => $self->{port},
205 70 50       68932 );
206 70         668 my $sockaddr = pack_sockaddr_in($self->{port}, $addr);
207              
208             # Connect with timeout using Future->wait_any
209 70         965 my $connect_f = Future::IO->connect($socket, $sockaddr);
210 70         153206 my $sleep_f = Future::IO->sleep($self->{connect_timeout});
211              
212             my $timeout_f = $sleep_f->then(sub {
213 0     0   0 return Future->fail('connect_timeout');
214 70         231384 });
215              
216 70         10370 my $wait_f = Future->wait_any($connect_f, $timeout_f);
217              
218             # Use followed_by to handle both success and failure without await propagating failure
219             my $result_f = $wait_f->followed_by(sub {
220 70     70   72515 my ($f) = @_;
221 70         635 return Future->done($f); # wrap the future itself
222 70         14810 });
223              
224 70         8864 my $completed_f = await $result_f;
225              
226             # Now check the result
227 70 50       13111 if ($completed_f->is_failed) {
228 70         1207 my ($error) = $completed_f->failure;
229             # Don't call close() - let $socket go out of scope when we die.
230             # Perl's DESTROY will close it after the exception unwinds.
231              
232 70 50       1481 if ($error eq 'connect_timeout') {
233             die Async::Redis::Error::Timeout->new(
234             message => "Connect timed out after $self->{connect_timeout}s",
235             timeout => $self->{connect_timeout},
236 0         0 );
237             }
238             die Async::Redis::Error::Connection->new(
239             message => "$error",
240             host => $self->{host},
241             port => $self->{port},
242 70         1461 );
243             }
244              
245             # TLS upgrade if enabled
246 0 0       0 if ($self->{tls}) {
247 0         0 eval {
248 0         0 $socket = await $self->_tls_upgrade($socket);
249             };
250 0 0       0 if ($@) {
251             # Don't call close() - let $socket go out of scope when we die.
252             # Perl's DESTROY will close it after the exception unwinds.
253 0         0 die $@;
254             }
255             }
256              
257 0         0 $self->{socket} = $socket;
258 0         0 $self->{parser} = _parser_class()->new(api => 1);
259 0         0 $self->{connected} = 1;
260 0         0 $self->{inflight} = [];
261 0         0 $self->{_reading_responses} = 0;
262 0         0 $self->{_pid} = $$; # Track PID for fork safety
263 0         0 $self->{_current_read_future} = undef;
264              
265             # Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME)
266 0         0 await $self->_redis_handshake;
267              
268             # Initialize auto-pipeline if enabled
269 0 0       0 if ($self->{auto_pipeline}) {
270             $self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
271             redis => $self,
272             max_depth => $self->{pipeline_depth},
273 0         0 );
274             }
275              
276             # Fire on_connect callback and reset reconnect counter
277 0 0       0 if ($self->{on_connect}) {
278 0         0 $self->{on_connect}->($self);
279             }
280 0         0 $self->{_reconnect_attempt} = 0;
281              
282             # Telemetry: record connection
283 0 0       0 if ($self->{_telemetry}) {
284 0         0 $self->{_telemetry}->record_connection(1);
285 0         0 $self->{_telemetry}->log_event('connected', "$self->{host}:$self->{port}");
286             }
287              
288 0         0 return $self;
289             }
290              
291             # Redis protocol handshake after TCP connect
292 0     0   0 async sub _redis_handshake {
293 0         0 my ($self) = @_;
294              
295             # Use connect_timeout for the entire handshake (AUTH, SELECT, CLIENT SETNAME)
296             # This ensures the handshake can't block forever if Redis hangs
297 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
298              
299             # AUTH (password or username+password for ACL)
300 0 0       0 if ($self->{password}) {
301 0         0 my @auth_args = ('AUTH');
302 0 0       0 push @auth_args, $self->{username} if $self->{username};
303 0         0 push @auth_args, $self->{password};
304              
305 0         0 my $cmd = $self->_build_command(@auth_args);
306 0         0 await $self->_send($cmd);
307              
308 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['AUTH']);
309 0         0 my $result = $self->_decode_response($response);
310              
311             # AUTH returns OK on success, throws on failure
312 0 0 0     0 unless ($result && $result eq 'OK') {
313 0         0 die Async::Redis::Error::Redis->new(
314             message => "Authentication failed: $result",
315             type => 'NOAUTH',
316             );
317             }
318             }
319              
320             # SELECT database
321 0 0 0     0 if ($self->{database} && $self->{database} != 0) {
322 0         0 my $cmd = $self->_build_command('SELECT', $self->{database});
323 0         0 await $self->_send($cmd);
324              
325 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['SELECT', $self->{database}]);
326 0         0 my $result = $self->_decode_response($response);
327              
328 0 0 0     0 unless ($result && $result eq 'OK') {
329 0         0 die Async::Redis::Error::Redis->new(
330             message => "SELECT failed: $result",
331             type => 'ERR',
332             );
333             }
334             }
335              
336             # CLIENT SETNAME
337 0 0       0 if ($self->{client_name}) {
338 0         0 my $cmd = $self->_build_command('CLIENT', 'SETNAME', $self->{client_name});
339 0         0 await $self->_send($cmd);
340              
341 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['CLIENT', 'SETNAME']);
342             # Ignore result - SETNAME failing shouldn't prevent connection
343             }
344             }
345              
346             # Check if connected to Redis
347             sub is_connected {
348 0     0 0 0 my ($self) = @_;
349 0 0       0 return $self->{connected} ? 1 : 0;
350             }
351              
352             # Disconnect from Redis
353             sub disconnect {
354 0     0 1 0 my ($self, $reason) = @_;
355 0   0     0 $reason //= 'client_disconnect';
356              
357 0         0 my $was_connected = $self->{connected};
358              
359             # Cancel any active read future BEFORE closing socket
360             # This ensures Future::IO unregisters its watcher while fileno is still valid
361 0 0 0     0 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
362 0         0 $self->{_current_read_future}->cancel;
363 0         0 $self->{_current_read_future} = undef;
364             }
365              
366             # Cancel any pending inflight operations before closing socket
367 0 0       0 if (my $inflight = $self->{inflight}) {
368 0         0 for my $entry (@$inflight) {
369 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
370 0         0 $entry->{future}->cancel;
371             }
372             }
373 0         0 $self->{inflight} = [];
374             }
375              
376 0 0       0 if ($self->{socket}) {
377 0         0 $self->_close_socket;
378             }
379 0         0 $self->{connected} = 0;
380 0         0 $self->{parser} = undef;
381 0         0 $self->{_reading_responses} = 0;
382              
383 0 0 0     0 if ($was_connected && $self->{on_disconnect}) {
384 0         0 $self->{on_disconnect}->($self, $reason);
385             }
386              
387             # Telemetry: record disconnection
388 0 0 0     0 if ($was_connected && $self->{_telemetry}) {
389 0         0 $self->{_telemetry}->record_connection(-1);
390 0         0 $self->{_telemetry}->log_event('disconnected', $reason);
391             }
392              
393 0         0 return $self;
394             }
395              
396             # Destructor - clean up socket when object is garbage collected
397             sub DESTROY {
398 70     70   28697 my ($self) = @_;
399             # Only clean up if we have a socket and it's still open
400 70 50 33     1425 if ($self->{socket} && fileno($self->{socket})) {
401 0           $self->_close_socket;
402             }
403             }
404              
405             # Properly close socket, canceling any pending futures first
406             sub _close_socket {
407 0     0     my ($self) = @_;
408              
409             # Take ownership - removes from $self immediately
410 0 0         my $socket = delete $self->{socket} or return;
411 0           my $fileno = fileno($socket);
412              
413             # Cancel any pending inflight futures - this propagates to
414             # Future::IO internals and cleans up any watchers on this socket.
415             # Important: must happen while fileno is still valid!
416 0 0         if (my $inflight = delete $self->{inflight}) {
417 0           for my $entry (@$inflight) {
418 0 0 0       if ($entry->{future} && !$entry->{future}->is_ready) {
419 0           $entry->{future}->cancel;
420             }
421             }
422             }
423 0           $self->{inflight} = [];
424              
425             # Initiate clean TCP shutdown (FIN) while fileno still valid
426 0 0         shutdown($socket, 2) if defined $fileno;
427              
428             # DON'T call close()!
429             # $socket falls out of scope here, Perl's DESTROY calls close().
430             # By this point, Future::IO has already unregistered its watchers
431             # via the cancel() calls above.
432             }
433              
434             # Check if fork occurred and invalidate connection
435             sub _check_fork {
436 0     0     my ($self) = @_;
437              
438 0 0 0       if ($self->{_pid} && $self->{_pid} != $$) {
439             # Fork detected - invalidate connection (parent owns the socket)
440             # Don't cancel futures - they belong to the parent's event loop
441             # Just clear references so we don't try to use them
442 0           $self->{connected} = 0;
443 0           $self->{socket} = undef;
444 0           $self->{parser} = undef;
445 0           $self->{inflight} = [];
446 0           $self->{_reading_responses} = 0;
447 0           $self->{_current_read_future} = undef; # Clear stale reference
448              
449 0           my $old_pid = $self->{_pid};
450 0           $self->{_pid} = $$;
451              
452 0 0         if ($self->{_telemetry}) {
453 0           $self->{_telemetry}->log_event('fork_detected', "old PID: $old_pid, new PID: $$");
454             }
455              
456 0           return 1; # Fork occurred
457             }
458              
459 0           return 0;
460             }
461              
462             # Build Redis command in RESP format
463             sub _build_command {
464 0     0     my ($self, @args) = @_;
465              
466 0           my $cmd = "*" . scalar(@args) . "\r\n";
467 0           for my $arg (@args) {
468 0   0       $arg //= '';
469 0           my $bytes = "$arg"; # stringify
470 0 0         utf8::encode($bytes) if utf8::is_utf8($bytes);
471 0           $cmd .= "\$" . length($bytes) . "\r\n" . $bytes . "\r\n";
472             }
473 0           return $cmd;
474             }
475              
476             # Send raw data
477 0     0     async sub _send {
478 0           my ($self, $data) = @_;
479 0           await Future::IO->write_exactly($self->{socket}, $data);
480 0           return length($data);
481             }
482              
483             # Add command to inflight queue - returns queue depth
484             sub _add_inflight {
485 0     0     my ($self, $future, $cmd, $args, $deadline) = @_;
486 0           push @{$self->{inflight}}, {
  0            
487             future => $future,
488             cmd => $cmd,
489             args => $args,
490             deadline => $deadline,
491             sent_at => Time::HiRes::time(),
492             };
493 0           return scalar @{$self->{inflight}};
  0            
494             }
495              
496             # Shift first entry from inflight queue
497             sub _shift_inflight {
498 0     0     my ($self) = @_;
499 0           return shift @{$self->{inflight}};
  0            
500             }
501              
502             # Fail all pending inflight futures with given error
503             sub _fail_all_inflight {
504 0     0     my ($self, $error) = @_;
505 0           while (my $entry = $self->_shift_inflight) {
506 0 0 0       if ($entry->{future} && !$entry->{future}->is_ready) {
507 0           $entry->{future}->fail($error);
508             }
509             }
510             }
511              
512             # Ensure response reader is running - the core response queue mechanism
513             # Only one reader should be active at a time, processing responses in FIFO order
514 0     0     async sub _ensure_response_reader {
515 0           my ($self) = @_;
516              
517             # Already reading - don't start another reader
518 0 0         return if $self->{_reading_responses};
519              
520 0           $self->{_reading_responses} = 1;
521              
522 0   0       while (@{$self->{inflight}} && $self->{connected}) {
  0            
523 0           my $entry = $self->{inflight}[0];
524              
525             # Read response with deadline from the entry
526 0           my $response;
527 0           my $read_ok = eval {
528             $response = await $self->_read_response_with_deadline(
529             $entry->{deadline},
530             $entry->{args}
531 0           );
532 0           1;
533             };
534              
535 0 0         if (!$read_ok) {
536 0           my $read_error = $@;
537             # Connection/timeout error - fail all inflight and abort
538 0           $self->_fail_all_inflight($read_error);
539 0           $self->{_reading_responses} = 0;
540 0           return;
541             }
542              
543             # Remove this entry from the queue now that we have its response
544 0           $self->_shift_inflight;
545              
546             # Decode response (sync operation, eval works fine here)
547 0           my $result;
548 0           my $decode_ok = eval {
549 0           $result = $self->_decode_response($response);
550 0           1;
551             };
552              
553             # Complete the future
554 0 0         if (!$decode_ok) {
555 0           my $decode_error = $@;
556             # Redis error (like WRONGTYPE) - fail just this future
557 0 0         $entry->{future}->fail($decode_error) unless $entry->{future}->is_ready;
558             } else {
559             # Success - complete the future with result
560 0 0         $entry->{future}->done($result) unless $entry->{future}->is_ready;
561             }
562             }
563              
564 0           $self->{_reading_responses} = 0;
565             }
566              
567             # Read and parse one response
568 0     0     async sub _read_response {
569 0           my ($self) = @_;
570              
571             # First check if parser already has a complete message
572             # (from previous read that contained multiple responses)
573 0 0         if (my $msg = $self->{parser}->get_message) {
574 0           return $msg;
575             }
576              
577             # Read until we get a complete message
578 0           while (1) {
579 0           my $buf = await Future::IO->read($self->{socket}, 65536);
580              
581             # EOF
582 0 0 0       if (!defined $buf || length($buf) == 0) {
583 0           die "Connection closed by server";
584             }
585              
586 0           $self->{parser}->parse($buf);
587              
588 0 0         if (my $msg = $self->{parser}->get_message) {
589 0           return $msg;
590             }
591             }
592             }
593              
594             # Calculate deadline based on command type
595             sub _calculate_deadline {
596 0     0     my ($self, $cmd, @args) = @_;
597              
598 0   0       $cmd = uc($cmd // '');
599              
600             # Blocking commands get extended deadline
601 0 0         if ($cmd =~ /^(BLPOP|BRPOP|BLMOVE|BRPOPLPUSH|BLMPOP|BZPOPMIN|BZPOPMAX|BZMPOP)$/) {
602             # Last arg is the timeout for these commands
603 0   0       my $server_timeout = $args[-1] // 0;
604 0           return Time::HiRes::time() + $server_timeout + $self->{blocking_timeout_buffer};
605             }
606              
607 0 0         if ($cmd =~ /^(XREAD|XREADGROUP)$/) {
608             # XREAD/XREADGROUP have BLOCK option
609 0           for my $i (0 .. $#args - 1) {
610 0 0         if (uc($args[$i]) eq 'BLOCK') {
611 0   0       my $block_ms = $args[$i + 1] // 0;
612 0           return Time::HiRes::time() + ($block_ms / 1000) + $self->{blocking_timeout_buffer};
613             }
614             }
615             }
616              
617             # Normal commands use request_timeout
618 0           return Time::HiRes::time() + $self->{request_timeout};
619             }
620              
621             # Non-blocking TLS upgrade
622 0     0     async sub _tls_upgrade {
623 0           my ($self, $socket) = @_;
624              
625 0           require IO::Socket::SSL;
626              
627             # Build SSL options
628 0           my %ssl_opts = (
629             SSL_startHandshake => 0, # Don't block during start_SSL!
630             );
631              
632 0 0         if (ref $self->{tls} eq 'HASH') {
633 0 0         $ssl_opts{SSL_ca_file} = $self->{tls}{ca_file} if $self->{tls}{ca_file};
634 0 0         $ssl_opts{SSL_cert_file} = $self->{tls}{cert_file} if $self->{tls}{cert_file};
635 0 0         $ssl_opts{SSL_key_file} = $self->{tls}{key_file} if $self->{tls}{key_file};
636              
637 0 0         if (exists $self->{tls}{verify}) {
638             $ssl_opts{SSL_verify_mode} = $self->{tls}{verify}
639 0 0         ? IO::Socket::SSL::SSL_VERIFY_PEER()
640             : IO::Socket::SSL::SSL_VERIFY_NONE();
641             } else {
642 0           $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
643             }
644             } else {
645 0           $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
646             }
647              
648             # Start SSL (does not block because SSL_startHandshake => 0)
649             IO::Socket::SSL->start_SSL($socket, %ssl_opts)
650             or die Async::Redis::Error::Connection->new(
651             message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
652             host => $self->{host},
653             port => $self->{port},
654 0 0         );
655              
656             # Drive handshake with non-blocking loop
657 0           my $deadline = Time::HiRes::time() + $self->{connect_timeout};
658              
659 0           while (1) {
660             # Check timeout
661 0 0         if (Time::HiRes::time() >= $deadline) {
662             die Async::Redis::Error::Timeout->new(
663             message => "TLS handshake timed out",
664             timeout => $self->{connect_timeout},
665 0           );
666             }
667              
668             # Attempt handshake step
669 0           my $rv = $socket->connect_SSL;
670              
671 0 0         if ($rv) {
672             # Handshake complete!
673 0           return $socket;
674             }
675              
676             # Check what the handshake needs
677 0           my $remaining = $deadline - Time::HiRes::time();
678 0 0         $remaining = 0.1 if $remaining <= 0;
679              
680 0 0         if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
    0          
681             # Wait for socket to become readable with timeout
682 0           my $read_f = Future::IO->waitfor_readable($socket);
683             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
684 0     0     return Future->fail('tls_timeout');
685 0           });
686              
687 0           my $wait_f = Future->wait_any($read_f, $timeout_f);
688 0           await $wait_f;
689              
690 0 0         if ($wait_f->is_failed) {
691             die Async::Redis::Error::Timeout->new(
692             message => "TLS handshake timed out",
693             timeout => $self->{connect_timeout},
694 0           );
695             }
696             }
697             elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
698             # Wait for socket to become writable with timeout
699 0           my $write_f = Future::IO->waitfor_writable($socket);
700             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
701 0     0     return Future->fail('tls_timeout');
702 0           });
703              
704 0           my $wait_f = Future->wait_any($write_f, $timeout_f);
705 0           await $wait_f;
706              
707 0 0         if ($wait_f->is_failed) {
708             die Async::Redis::Error::Timeout->new(
709             message => "TLS handshake timed out",
710             timeout => $self->{connect_timeout},
711 0           );
712             }
713             }
714             else {
715             # Actual error
716             die Async::Redis::Error::Connection->new(
717             message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
718             host => $self->{host},
719             port => $self->{port},
720 0           );
721             }
722             }
723             }
724              
725             # Reconnect with exponential backoff
726 0     0     async sub _reconnect {
727 0           my ($self) = @_;
728              
729 0           while (!$self->{connected}) {
730 0           $self->{_reconnect_attempt}++;
731 0           my $delay = $self->_calculate_backoff($self->{_reconnect_attempt});
732              
733 0           eval {
734 0           await $self->connect;
735             };
736              
737 0 0         if ($@) {
738 0           my $error = $@;
739              
740             # Fire on_error callback
741 0 0         if ($self->{on_error}) {
742 0           $self->{on_error}->($self, $error);
743             }
744              
745             # Wait before next attempt
746 0           await Future::IO->sleep($delay);
747             }
748             }
749             }
750              
751             # Execute a Redis command
752 0     0 1   async sub command {
753 0           my ($self, $cmd, @args) = @_;
754              
755             # Check for fork - invalidate connection if PID changed
756 0           $self->_check_fork;
757              
758             # Block regular commands on pubsub connection
759 0 0         if ($self->{in_pubsub}) {
760 0   0       my $ucmd = uc($cmd // '');
761 0 0         unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
762 0           die Async::Redis::Error::Protocol->new(
763             message => "Cannot execute '$cmd' on connection in PubSub mode",
764             );
765             }
766             }
767              
768             # Apply key prefixing if configured
769 0 0 0       if (defined $self->{prefix} && $self->{prefix} ne '') {
770             @args = Async::Redis::KeyExtractor::apply_prefix(
771 0           $self->{prefix}, $cmd, @args
772             );
773             }
774              
775             # Route through auto-pipeline if enabled
776 0 0         if ($self->{_auto_pipeline}) {
777 0           return await $self->{_auto_pipeline}->command($cmd, @args);
778             }
779              
780             # If disconnected and reconnect enabled, try to reconnect
781 0 0 0       if (!$self->{connected} && $self->{reconnect}) {
782 0           await $self->_reconnect;
783             }
784              
785             die Async::Redis::Error::Disconnected->new(
786             message => "Not connected",
787 0 0         ) unless $self->{connected};
788              
789             # Telemetry: start span and log send
790 0           my $span_context;
791 0           my $start_time = Time::HiRes::time();
792 0 0         if ($self->{_telemetry}) {
793 0           $span_context = $self->{_telemetry}->start_command_span($cmd, @args);
794 0           $self->{_telemetry}->log_send($cmd, @args);
795             }
796              
797 0           my $raw_cmd = $self->_build_command($cmd, @args);
798              
799             # Calculate deadline based on command type
800 0           my $deadline = $self->_calculate_deadline($cmd, @args);
801              
802             # Create response future and register in inflight queue BEFORE sending
803             # This ensures responses are matched in order
804 0           my $response_future = Future->new;
805 0           $self->_add_inflight($response_future, $cmd, \@args, $deadline);
806              
807 0           my $result;
808             my $error;
809              
810 0           my $send_ok = eval {
811             # Send command
812 0           await $self->_send($raw_cmd);
813 0           1;
814             };
815              
816 0 0         if (!$send_ok) {
817 0           $error = $@;
818             # Send failed - remove from inflight and fail
819 0           $self->_shift_inflight; # Remove the entry we just added
820 0 0         $response_future->fail($error) unless $response_future->is_ready;
821             } else {
822             # Trigger the response reader (fire and forget - it runs in background)
823 0           $self->_ensure_response_reader->retain;
824              
825             # Wait for our response future to be completed by the reader
826 0           my $await_ok = eval {
827 0           $result = await $response_future;
828 0           1;
829             };
830              
831 0 0         if (!$await_ok) {
832 0           $error = $@;
833             }
834             }
835              
836             # Telemetry: log result and end span
837 0 0         if ($self->{_telemetry}) {
838 0           my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
839 0 0         if ($error) {
840 0           $self->{_telemetry}->log_error($error);
841             }
842             else {
843 0           $self->{_telemetry}->log_recv($result, $elapsed_ms);
844             }
845 0           $self->{_telemetry}->end_command_span($span_context, $error);
846             }
847              
848 0 0         die $error if $error;
849 0           return $result;
850             }
851              
852             # Read response with deadline enforcement
853 0     0     async sub _read_response_with_deadline {
854 0           my ($self, $deadline, $cmd_ref) = @_;
855              
856             # First check if parser already has a complete message
857 0 0         if (my $msg = $self->{parser}->get_message) {
858 0           return $msg;
859             }
860              
861             # Read until we get a complete message
862 0           while (1) {
863 0           my $remaining = $deadline - Time::HiRes::time();
864              
865 0 0         if ($remaining <= 0) {
866 0           $self->_reset_connection;
867             die Async::Redis::Error::Timeout->new(
868             message => "Request timed out after $self->{request_timeout}s",
869             command => $cmd_ref,
870             timeout => $self->{request_timeout},
871 0           maybe_executed => 1, # already sent the command
872             );
873             }
874              
875             # Use wait_any for timeout
876 0           my $read_f = Future::IO->read($self->{socket}, 65536);
877              
878             # Store reference so disconnect() can cancel it
879 0           $self->{_current_read_future} = $read_f;
880              
881             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
882 0     0     return Future->fail('read_timeout');
883 0           });
884              
885 0           my $wait_f = Future->wait_any($read_f, $timeout_f);
886 0           await $wait_f;
887              
888             # Clear stored reference after await completes
889 0           $self->{_current_read_future} = undef;
890              
891             # Check if read was cancelled (by disconnect)
892 0 0         if ($read_f->is_cancelled) {
893 0           die Async::Redis::Error::Disconnected->new(
894             message => "Disconnected during read",
895             );
896             }
897              
898 0 0         if ($wait_f->is_failed) {
899 0           my ($error) = $wait_f->failure;
900 0 0         if ($error eq 'read_timeout') {
901 0           $self->_reset_connection;
902             die Async::Redis::Error::Timeout->new(
903             message => "Request timed out after $self->{request_timeout}s",
904             command => $cmd_ref,
905             timeout => $self->{request_timeout},
906 0           maybe_executed => 1,
907             );
908             }
909 0           $self->_reset_connection;
910 0           die Async::Redis::Error::Connection->new(
911             message => "$error",
912             );
913             }
914              
915             # Get the read result
916 0           my $buf = $wait_f->get;
917              
918             # EOF
919 0 0 0       if (!defined $buf || length($buf) == 0) {
920 0           $self->_reset_connection;
921 0           die Async::Redis::Error::Connection->new(
922             message => "Connection closed by server",
923             );
924             }
925              
926 0           $self->{parser}->parse($buf);
927              
928 0 0         if (my $msg = $self->{parser}->get_message) {
929 0           return $msg;
930             }
931             }
932             }
933              
934             # Reset connection after timeout (stream is desynced)
935             sub _reset_connection {
936 0     0     my ($self, $reason) = @_;
937 0   0       $reason //= 'timeout';
938              
939 0           my $was_connected = $self->{connected};
940              
941             # Cancel any active read future BEFORE closing socket
942             # This ensures Future::IO unregisters its watcher while fileno is still valid
943 0 0 0       if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
944 0           $self->{_current_read_future}->cancel;
945 0           $self->{_current_read_future} = undef;
946             }
947              
948             # Cancel any pending inflight operations before closing socket
949 0 0         if (my $inflight = $self->{inflight}) {
950 0           for my $entry (@$inflight) {
951 0 0 0       if ($entry->{future} && !$entry->{future}->is_ready) {
952 0           $entry->{future}->cancel;
953             }
954             }
955 0           $self->{inflight} = [];
956             }
957              
958 0 0         if ($self->{socket}) {
959 0           $self->_close_socket;
960             }
961              
962 0           $self->{connected} = 0;
963 0           $self->{parser} = undef;
964 0           $self->{_reading_responses} = 0;
965              
966 0 0 0       if ($was_connected && $self->{on_disconnect}) {
967 0           $self->{on_disconnect}->($self, $reason);
968             }
969             }
970              
971             # Decode Protocol::Redis response to Perl value
972             sub _decode_response {
973 0     0     my ($self, $msg) = @_;
974              
975 0 0         return undef unless $msg;
976              
977 0           my $type = $msg->{type};
978 0           my $data = $msg->{data};
979              
980             # Simple string (+)
981 0 0         if ($type eq '+') {
    0          
    0          
    0          
    0          
982 0           return $data;
983             }
984             # Error (-)
985             elsif ($type eq '-') {
986 0           die "Redis error: $data";
987             }
988             # Integer (:)
989             elsif ($type eq ':') {
990 0           return 0 + $data;
991             }
992             # Bulk string ($)
993             elsif ($type eq '$') {
994 0           return $data; # undef for null bulk
995             }
996             # Array (*)
997             elsif ($type eq '*') {
998 0 0         return undef unless defined $data; # null array
999 0           return [ map { $self->_decode_response($_) } @$data ];
  0            
1000             }
1001              
1002 0           return $data;
1003             }
1004              
1005             # ============================================================================
1006             # Convenience Commands
1007             # ============================================================================
1008              
1009 0     0 0   async sub ping {
1010 0           my ($self) = @_;
1011 0           return await $self->command('PING');
1012             }
1013              
1014 0     0 0   async sub set {
1015 0           my ($self, $key, $value, %opts) = @_;
1016 0           my @cmd = ('SET', $key, $value);
1017 0 0         push @cmd, 'EX', $opts{ex} if exists $opts{ex};
1018 0 0         push @cmd, 'PX', $opts{px} if exists $opts{px};
1019 0 0         push @cmd, 'NX' if $opts{nx};
1020 0 0         push @cmd, 'XX' if $opts{xx};
1021 0           return await $self->command(@cmd);
1022             }
1023              
1024 0     0 0   async sub get {
1025 0           my ($self, $key) = @_;
1026 0           return await $self->command('GET', $key);
1027             }
1028              
1029 0     0 0   async sub del {
1030 0           my ($self, @keys) = @_;
1031 0           return await $self->command('DEL', @keys);
1032             }
1033              
1034 0     0 0   async sub incr {
1035 0           my ($self, $key) = @_;
1036 0           return await $self->command('INCR', $key);
1037             }
1038              
1039 0     0 0   async sub lpush {
1040 0           my ($self, $key, @values) = @_;
1041 0           return await $self->command('LPUSH', $key, @values);
1042             }
1043              
1044 0     0 0   async sub rpush {
1045 0           my ($self, $key, @values) = @_;
1046 0           return await $self->command('RPUSH', $key, @values);
1047             }
1048              
1049 0     0 0   async sub lpop {
1050 0           my ($self, $key) = @_;
1051 0           return await $self->command('LPOP', $key);
1052             }
1053              
1054 0     0 0   async sub lrange {
1055 0           my ($self, $key, $start, $stop) = @_;
1056 0           return await $self->command('LRANGE', $key, $start, $stop);
1057             }
1058              
1059 0     0 1   async sub keys {
1060 0           my ($self, $pattern) = @_;
1061 0           return await $self->command('KEYS', $pattern // '*');
1062             }
1063              
1064 0     0 0   async sub flushdb {
1065 0           my ($self) = @_;
1066 0           return await $self->command('FLUSHDB');
1067             }
1068              
1069             # ============================================================================
1070             # Lua Scripting
1071             # ============================================================================
1072              
1073 0     0 0   async sub script_load {
1074 0           my ($self, $script) = @_;
1075 0           return await $self->command('SCRIPT', 'LOAD', $script);
1076             }
1077              
1078 0     0 0   async sub script_exists {
1079 0           my ($self, @shas) = @_;
1080 0           return await $self->command('SCRIPT', 'EXISTS', @shas);
1081             }
1082              
1083 0     0 0   async sub script_flush {
1084 0           my ($self, $mode) = @_;
1085 0           my @args = ('SCRIPT', 'FLUSH');
1086 0 0         push @args, $mode if $mode; # ASYNC or SYNC
1087 0           return await $self->command(@args);
1088             }
1089              
1090 0     0 0   async sub script_kill {
1091 0           my ($self) = @_;
1092 0           return await $self->command('SCRIPT', 'KILL');
1093             }
1094              
1095 0     0 0   async sub evalsha_or_eval {
1096 0           my ($self, $sha, $script, $numkeys, @keys_and_args) = @_;
1097              
1098             # Try EVALSHA first
1099 0           my $result;
1100 0           eval {
1101 0           $result = await $self->evalsha($sha, $numkeys, @keys_and_args);
1102             };
1103              
1104 0 0         if ($@) {
1105 0           my $error = $@;
1106              
1107             # Check if it's a NOSCRIPT error
1108 0 0         if ("$error" =~ /NOSCRIPT/i) {
1109             # Fall back to EVAL (which also loads the script)
1110 0           $result = await $self->eval($script, $numkeys, @keys_and_args);
1111             }
1112             else {
1113             # Re-throw other errors
1114 0           die $error;
1115             }
1116             }
1117              
1118 0           return $result;
1119             }
1120              
1121             sub script {
1122 0     0 1   my ($self, $code) = @_;
1123 0           return Async::Redis::Script->new(
1124             redis => $self,
1125             script => $code,
1126             );
1127             }
1128              
1129             # Define a named script command
1130             # Usage: $redis->define_command(name => { keys => N, lua => '...' })
1131             sub define_command {
1132 0     0 1   my ($self, $name, $def) = @_;
1133              
1134 0 0 0       die "Command name required" unless defined $name && length $name;
1135 0 0         die "Command definition required" unless ref $def eq 'HASH';
1136 0 0         die "Lua script required (lua => '...')" unless defined $def->{lua};
1137              
1138             # Validate name (alphanumeric and underscore only)
1139 0 0         die "Invalid command name '$name' - use only alphanumeric and underscore"
1140             unless $name =~ /^[a-zA-Z_][a-zA-Z0-9_]*$/;
1141              
1142             my $script = Async::Redis::Script->new(
1143             redis => $self,
1144             script => $def->{lua},
1145             name => $name,
1146             num_keys => $def->{keys} // 'dynamic',
1147             description => $def->{description},
1148 0   0       );
1149              
1150 0           $self->{_scripts}{$name} = $script;
1151              
1152             # Optional: install as method on this instance
1153 0 0         if ($def->{install}) {
1154 0           $self->_install_script_method($name);
1155             }
1156              
1157 0           return $script;
1158             }
1159              
1160             # Run a registered script by name
1161             # Usage: $redis->run_script('name', @keys_then_args)
1162             # If num_keys is 'dynamic', first arg is the key count
1163 0     0 1   async sub run_script {
1164 0           my ($self, $name, @args) = @_;
1165              
1166 0 0         my $script = $self->{_scripts}{$name}
1167             or die "Unknown script: '$name' - use define_command() first";
1168              
1169 0           my $num_keys = $script->num_keys;
1170              
1171             # Handle dynamic key count
1172 0 0         if ($num_keys eq 'dynamic') {
1173 0           $num_keys = shift @args;
1174 0 0         die "Key count required as first argument for dynamic script '$name'"
1175             unless defined $num_keys;
1176             }
1177              
1178             # Split args into keys and argv
1179 0           my @keys = splice(@args, 0, $num_keys);
1180 0           return await $script->run(\@keys, \@args);
1181             }
1182              
1183             # Get a registered script by name
1184             sub get_script {
1185 0     0 1   my ($self, $name) = @_;
1186 0           return $self->{_scripts}{$name};
1187             }
1188              
1189             # List all registered script names
1190             sub list_scripts {
1191 0     0 1   my ($self) = @_;
1192 0           return CORE::keys %{$self->{_scripts}};
  0            
1193             }
1194              
1195             # Preload all registered scripts to Redis
1196             # Useful before pipeline execution
1197 0     0 1   async sub preload_scripts {
1198 0           my ($self) = @_;
1199              
1200 0           my @names = $self->list_scripts;
1201 0 0         return 0 unless @names;
1202              
1203 0           for my $name (@names) {
1204 0           my $script = $self->{_scripts}{$name};
1205 0           await $self->script_load($script->script);
1206             }
1207              
1208 0           return scalar @names;
1209             }
1210              
1211             # Install a script as a method (internal)
1212             sub _install_script_method {
1213 0     0     my ($self, $name) = @_;
1214              
1215             # Create closure that captures $name
1216             my $method = sub {
1217 0     0     my ($self, @args) = @_;
1218 0           return $self->run_script($name, @args);
1219 0           };
1220              
1221             # Install on the class (affects all instances)
1222 68     68   894 no strict 'refs';
  68         242  
  68         4406  
1223 68     68   425 no warnings 'redefine';
  68         188  
  68         361620  
1224 0           *{"Async::Redis::$name"} = $method;
  0            
1225             }
1226              
1227             # ============================================================================
1228             # SCAN Iterators
1229             # ============================================================================
1230              
1231             sub scan_iter {
1232 0     0 1   my ($self, %opts) = @_;
1233             return Async::Redis::Iterator->new(
1234             redis => $self,
1235             command => 'SCAN',
1236             match => $opts{match},
1237             count => $opts{count},
1238             type => $opts{type},
1239 0           );
1240             }
1241              
1242             sub hscan_iter {
1243 0     0 0   my ($self, $key, %opts) = @_;
1244             return Async::Redis::Iterator->new(
1245             redis => $self,
1246             command => 'HSCAN',
1247             key => $key,
1248             match => $opts{match},
1249             count => $opts{count},
1250 0           );
1251             }
1252              
1253             sub sscan_iter {
1254 0     0 0   my ($self, $key, %opts) = @_;
1255             return Async::Redis::Iterator->new(
1256             redis => $self,
1257             command => 'SSCAN',
1258             key => $key,
1259             match => $opts{match},
1260             count => $opts{count},
1261 0           );
1262             }
1263              
1264             sub zscan_iter {
1265 0     0 0   my ($self, $key, %opts) = @_;
1266             return Async::Redis::Iterator->new(
1267             redis => $self,
1268             command => 'ZSCAN',
1269             key => $key,
1270             match => $opts{match},
1271             count => $opts{count},
1272 0           );
1273             }
1274              
1275             # ============================================================================
1276             # Transactions
1277             # ============================================================================
1278              
1279 0     0 1   async sub multi {
1280 0           my ($self, $callback) = @_;
1281              
1282             # Prevent nested multi() calls
1283             die "Cannot nest multi() calls - already in a transaction"
1284 0 0         if $self->{in_multi};
1285              
1286             # Mark that we're collecting transaction commands
1287 0           $self->{in_multi} = 1;
1288              
1289 0           my @commands;
1290 0           eval {
1291             # Create transaction collector
1292 0           my $tx = Async::Redis::Transaction->new(redis => $self);
1293              
1294             # Run callback to collect commands
1295 0           await $callback->($tx);
1296              
1297 0           @commands = $tx->commands;
1298             };
1299 0           my $collect_error = $@;
1300              
1301 0 0         if ($collect_error) {
1302 0           $self->{in_multi} = 0;
1303 0           die $collect_error;
1304             }
1305              
1306             # If no commands queued, return empty result
1307 0 0         unless (@commands) {
1308 0           $self->{in_multi} = 0;
1309 0           return [];
1310             }
1311              
1312             # Execute transaction (in_multi already set)
1313 0           return await $self->_execute_transaction(\@commands);
1314             }
1315              
1316 0     0     async sub _execute_transaction {
1317 0           my ($self, $commands) = @_;
1318              
1319             # in_multi should already be set by caller
1320              
1321 0           my $results;
1322 0           eval {
1323             # Send MULTI
1324 0           await $self->command('MULTI');
1325              
1326             # Queue all commands (they return +QUEUED)
1327 0           for my $cmd (@$commands) {
1328 0           await $self->command(@$cmd);
1329             }
1330              
1331             # Execute and get results
1332 0           $results = await $self->command('EXEC');
1333             };
1334 0           my $error = $@;
1335              
1336             # Always clear transaction state
1337 0           $self->{in_multi} = 0;
1338              
1339 0 0         if ($error) {
1340             # Try to clean up
1341 0           eval { await $self->command('DISCARD') };
  0            
1342 0           die $error;
1343             }
1344              
1345 0           return $results;
1346             }
1347              
1348             # Accessor for pool cleanliness tracking
1349 0     0 0   sub in_multi { shift->{in_multi} }
1350 0     0 0   sub watching { shift->{watching} }
1351 0     0 0   sub in_pubsub { shift->{in_pubsub} }
1352 0   0 0 0   sub inflight_count { scalar @{shift->{inflight} // []} }
  0            
1353              
1354             # Is connection dirty (unsafe to reuse)?
1355             sub is_dirty {
1356 0     0 0   my ($self) = @_;
1357              
1358 0 0         return 1 if $self->{in_multi};
1359 0 0         return 1 if $self->{watching};
1360 0 0         return 1 if $self->{in_pubsub};
1361 0 0 0       return 1 if @{$self->{inflight} // []} > 0;
  0            
1362              
1363 0           return 0;
1364             }
1365              
1366 0     0 1   async sub watch {
1367 0           my ($self, @keys) = @_;
1368 0           $self->{watching} = 1;
1369 0           return await $self->command('WATCH', @keys);
1370             }
1371              
1372 0     0 0   async sub unwatch {
1373 0           my ($self) = @_;
1374 0           my $result = await $self->command('UNWATCH');
1375 0           $self->{watching} = 0;
1376 0           return $result;
1377             }
1378              
1379 0     0 0   async sub multi_start {
1380 0           my ($self) = @_;
1381 0           $self->{in_multi} = 1;
1382 0           return await $self->command('MULTI');
1383             }
1384              
1385 0     0 0   async sub exec {
1386 0           my ($self) = @_;
1387 0           my $result = await $self->command('EXEC');
1388 0           $self->{in_multi} = 0;
1389 0           $self->{watching} = 0; # EXEC clears watches
1390 0           return $result;
1391             }
1392              
1393 0     0 0   async sub discard {
1394 0           my ($self) = @_;
1395 0           my $result = await $self->command('DISCARD');
1396 0           $self->{in_multi} = 0;
1397             # Note: DISCARD does NOT clear watches
1398 0           return $result;
1399             }
1400              
1401 0     0 1   async sub watch_multi {
1402 0           my ($self, $keys, $callback) = @_;
1403              
1404             # WATCH the keys
1405 0           await $self->watch(@$keys);
1406              
1407             # Get current values of watched keys
1408 0           my %watched;
1409 0           for my $key (@$keys) {
1410 0           $watched{$key} = await $self->get($key);
1411             }
1412              
1413             # Create transaction collector
1414 0           my $tx = Async::Redis::Transaction->new(redis => $self);
1415              
1416             # Run callback with watched values
1417 0           await $callback->($tx, \%watched);
1418              
1419 0           my @commands = $tx->commands;
1420              
1421             # If no commands queued, just unwatch and return empty
1422 0 0         unless (@commands) {
1423 0           await $self->unwatch;
1424 0           return [];
1425             }
1426              
1427             # Execute transaction
1428 0           $self->{in_multi} = 1;
1429              
1430 0           my $results;
1431 0           eval {
1432 0           await $self->command('MULTI');
1433              
1434 0           for my $cmd (@commands) {
1435 0           await $self->command(@$cmd);
1436             }
1437              
1438 0           $results = await $self->command('EXEC');
1439             };
1440 0           my $error = $@;
1441              
1442 0           $self->{in_multi} = 0;
1443 0           $self->{watching} = 0;
1444              
1445 0 0         if ($error) {
1446 0           eval { await $self->command('DISCARD') };
  0            
1447 0           die $error;
1448             }
1449              
1450             # EXEC returns undef/nil if WATCH failed
1451 0           return $results;
1452             }
1453              
1454             # ============================================================================
1455             # PUB/SUB
1456             # ============================================================================
1457              
1458             # Wait for inflight commands to complete before mode change
1459 0     0     async sub _wait_for_inflight_drain {
1460 0           my ($self, $timeout) = @_;
1461 0   0       $timeout //= 30;
1462              
1463 0 0         return unless @{$self->{inflight}};
  0            
1464              
1465 0           my $deadline = Time::HiRes::time() + $timeout;
1466              
1467 0   0       while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
  0            
1468 0           await Future::IO->sleep(0.001);
1469             }
1470              
1471 0 0         if (@{$self->{inflight}}) {
  0            
1472 0           $self->_fail_all_inflight("Timeout waiting for inflight commands");
1473             }
1474             }
1475              
1476 0     0 0   async sub publish {
1477 0           my ($self, $channel, $message) = @_;
1478 0           return await $self->command('PUBLISH', $channel, $message);
1479             }
1480              
1481 0     0 0   async sub spublish {
1482 0           my ($self, $channel, $message) = @_;
1483 0           return await $self->command('SPUBLISH', $channel, $message);
1484             }
1485              
1486             # Subscribe to channels - returns a Subscription object
1487 0     0 1   async sub subscribe {
1488 0           my ($self, @channels) = @_;
1489              
1490             die Async::Redis::Error::Disconnected->new(
1491             message => "Not connected",
1492 0 0         ) unless $self->{connected};
1493              
1494             # Wait for pending commands before entering PubSub mode
1495 0           await $self->_wait_for_inflight_drain;
1496              
1497             # Create or reuse subscription
1498 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1499              
1500             # Send SUBSCRIBE command
1501 0           await $self->_send_command('SUBSCRIBE', @channels);
1502              
1503             # Read subscription confirmations
1504 0           for my $ch (@channels) {
1505 0           my $msg = await $self->_read_pubsub_frame();
1506             # Response: ['subscribe', $channel, $count]
1507 0           $sub->_add_channel($ch);
1508             }
1509              
1510 0           $self->{in_pubsub} = 1;
1511              
1512 0           return $sub;
1513             }
1514              
1515             # Pattern subscribe
1516 0     0 1   async sub psubscribe {
1517 0           my ($self, @patterns) = @_;
1518              
1519             die Async::Redis::Error::Disconnected->new(
1520             message => "Not connected",
1521 0 0         ) unless $self->{connected};
1522              
1523             # Wait for pending commands before entering PubSub mode
1524 0           await $self->_wait_for_inflight_drain;
1525              
1526 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1527              
1528 0           await $self->_send_command('PSUBSCRIBE', @patterns);
1529              
1530 0           for my $p (@patterns) {
1531 0           my $msg = await $self->_read_pubsub_frame();
1532 0           $sub->_add_pattern($p);
1533             }
1534              
1535 0           $self->{in_pubsub} = 1;
1536              
1537 0           return $sub;
1538             }
1539              
1540             # Sharded subscribe (Redis 7+)
1541 0     0 0   async sub ssubscribe {
1542 0           my ($self, @channels) = @_;
1543              
1544             die Async::Redis::Error::Disconnected->new(
1545             message => "Not connected",
1546 0 0         ) unless $self->{connected};
1547              
1548             # Wait for pending commands before entering PubSub mode
1549 0           await $self->_wait_for_inflight_drain;
1550              
1551 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1552              
1553 0           await $self->_send_command('SSUBSCRIBE', @channels);
1554              
1555 0           for my $ch (@channels) {
1556 0           my $msg = await $self->_read_pubsub_frame();
1557 0           $sub->_add_sharded_channel($ch);
1558             }
1559              
1560 0           $self->{in_pubsub} = 1;
1561              
1562 0           return $sub;
1563             }
1564              
1565             # Read pubsub frame (subscription confirmation or message)
1566 0     0     async sub _read_pubsub_frame {
1567 0           my ($self) = @_;
1568              
1569 0           my $msg = await $self->_read_response();
1570 0           return $self->_decode_response($msg);
1571             }
1572              
1573             # Send command without reading response (for pubsub)
1574 0     0     async sub _send_command {
1575 0           my ($self, @args) = @_;
1576              
1577 0           my $cmd = $self->_build_command(@args);
1578 0           await $self->_send($cmd);
1579             }
1580              
1581             # Read next pubsub message (blocking) - for compatibility
1582 0     0     async sub _read_pubsub_message {
1583 0           my ($self) = @_;
1584              
1585 0           my $msg = await $self->_read_response();
1586              
1587             # Message format: ['message', $channel, $payload]
1588             # or: ['pmessage', $pattern, $channel, $payload]
1589 0           return $msg;
1590             }
1591              
1592             # ============================================================================
1593             # Pipelining
1594             # ============================================================================
1595              
1596             sub pipeline {
1597 0     0 1   my ($self, %opts) = @_;
1598             return Async::Redis::Pipeline->new(
1599             redis => $self,
1600             max_depth => $opts{max_depth} // $self->{pipeline_depth},
1601 0   0       );
1602             }
1603              
1604             # Execute multiple commands, return all responses
1605 0     0     async sub _execute_pipeline {
1606 0           my ($self, $commands) = @_;
1607              
1608 0 0         die "Not connected" unless $self->{connected};
1609              
1610 0 0         return [] unless @$commands;
1611              
1612             # Wait for any inflight regular commands to complete before pipeline
1613             # This prevents interleaving pipeline responses with regular command responses
1614 0           await $self->_wait_for_inflight_drain;
1615              
1616             # Take over reading - prevent response reader from running
1617 0           $self->{_reading_responses} = 1;
1618              
1619 0           my $start_time = Time::HiRes::time();
1620 0           my @responses;
1621 0           my $count = scalar @$commands;
1622              
1623 0           my $ok = eval {
1624             # Send all commands
1625 0           my $data = '';
1626 0           for my $cmd (@$commands) {
1627 0           $data .= $self->_build_command(@$cmd);
1628             }
1629 0           await $self->_send($data);
1630              
1631             # Read all responses, capturing per-slot Redis errors
1632 0           for my $i (1 .. $count) {
1633 0           my $msg = await $self->_read_response();
1634              
1635             # Capture Redis errors inline rather than dying
1636 0           my $result;
1637 0           eval {
1638 0           $result = $self->_decode_response($msg);
1639             };
1640 0 0         if ($@) {
1641             # Capture the error as a string in the results
1642 0           $result = $@;
1643 0 0         chomp $result if defined $result;
1644             }
1645 0           push @responses, $result;
1646             }
1647 0           1;
1648             };
1649              
1650 0           my $error = $@;
1651              
1652             # Release reading lock
1653 0           $self->{_reading_responses} = 0;
1654              
1655 0 0         if (!$ok) {
1656 0           die $error;
1657             }
1658              
1659             # Telemetry: record pipeline metrics
1660 0 0         if ($self->{_telemetry}) {
1661 0           my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
1662 0           $self->{_telemetry}->record_pipeline($count, $elapsed_ms);
1663             }
1664              
1665 0           return \@responses;
1666             }
1667              
1668             1;
1669              
1670             __END__