File Coverage

blib/lib/Async/Redis.pm
Criterion Covered Total %
statement 129 768 16.8
branch 22 280 7.8
condition 27 141 19.1
subroutine 32 109 29.3
pod 18 49 36.7
total 228 1347 16.9


line stmt bran cond sub pod time code
1             package Async::Redis;
2              
3 73     73   24867473 use strict;
  73         234  
  73         3406  
4 73     73   539 use warnings;
  73         219  
  73         4698  
5 73     73   1496 use 5.018;
  73         273  
6              
7             our $VERSION = '0.001006';
8              
9 73     73   439 use Future;
  73         173  
  73         2763  
10 73     73   382 use Future::AsyncAwait;
  73         138  
  73         655  
11 73     73   4941 use Future::IO 0.19;
  73         1423  
  73         5046  
12 73     73   435 use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
  73         221  
  73         7015  
13 73     73   634 use IO::Handle ();
  73         234  
  73         2022  
14 73     73   46598 use IO::Socket::INET;
  73         1057357  
  73         522  
15 73     73   42617 use Time::HiRes ();
  73         157  
  73         2447  
16              
17             # Error classes
18 73     73   43931 use Async::Redis::Error::Connection;
  73         231  
  73         3881  
19 73     73   36640 use Async::Redis::Error::Timeout;
  73         244  
  73         3291  
20 73     73   38903 use Async::Redis::Error::Disconnected;
  73         275  
  73         3054  
21 73     73   38374 use Async::Redis::Error::Redis;
  73         211  
  73         3896  
22 73     73   36762 use Async::Redis::Error::Protocol;
  73         241  
  73         3290  
23              
24             # Import auto-generated command methods
25 73     73   48827 use Async::Redis::Commands;
  73         410  
  73         7426  
26             our @ISA = qw(Async::Redis::Commands);
27              
28             # Key extraction for prefixing
29 73     73   52632 use Async::Redis::KeyExtractor;
  73         307  
  73         6217  
30              
31             # Transaction support
32 73     73   43837 use Async::Redis::Transaction;
  73         365  
  73         4543  
33              
34             # Script support
35 73     73   42046 use Async::Redis::Script;
  73         280  
  73         5523  
36              
37             # Iterator support
38 73     73   38948 use Async::Redis::Iterator;
  73         294  
  73         5218  
39              
40             # Pipeline support
41 73     73   41846 use Async::Redis::Pipeline;
  73         276  
  73         5643  
42 73     73   38592 use Async::Redis::AutoPipeline;
  73         382  
  73         5776  
43              
44             # PubSub support
45 73     73   40855 use Async::Redis::Subscription;
  73         431  
  73         14668  
46              
47             # Telemetry support
48 73     73   41920 use Async::Redis::Telemetry;
  73         283  
  73         13541  
49              
50             # Try XS version first, fall back to pure Perl
51             BEGIN {
52 73 50   73   269 eval { require Protocol::Redis::XS; 1 }
  73         39205  
  73         1866150  
53             or require Protocol::Redis;
54             }
55              
56             sub _parser_class {
57 0 0   0   0 return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
58             }
59              
60             sub _calculate_backoff {
61 0     0   0 my ($self, $attempt) = @_;
62              
63             # Exponential: delay * 2^(attempt-1)
64 0         0 my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
65              
66             # Cap at max
67 0 0       0 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
68              
69             # Apply jitter: delay * (1 +/- jitter)
70 0 0       0 if ($self->{reconnect_jitter} > 0) {
71 0         0 my $jitter_range = $delay * $self->{reconnect_jitter};
72 0         0 my $jitter = (rand(2) - 1) * $jitter_range;
73 0         0 $delay += $jitter;
74             }
75              
76 0         0 return $delay;
77             }
78              
79             sub new {
80 81     81 1 5802086 my ($class, %args) = @_;
81              
82             # Parse URI if provided
83 81 100       806 if ($args{uri}) {
84 2         769 require Async::Redis::URI;
85 2         17 my $uri = Async::Redis::URI->parse($args{uri});
86 2 50       13 if ($uri) {
87 2         9 my %uri_args = $uri->to_hash;
88             # URI values are defaults, explicit args override
89 2         10 %args = (%uri_args, %args);
90 2         16 delete $args{uri}; # don't store the string
91             }
92             }
93              
94             my $self = bless {
95             path => $args{path},
96             host => $args{path} ? undef : ($args{host} // 'localhost'),
97             port => $args{path} ? undef : ($args{port} // 6379),
98             socket => undef,
99             parser => undef,
100             connected => 0,
101              
102             # Timeout settings
103             connect_timeout => $args{connect_timeout} // 10,
104             request_timeout => $args{request_timeout} // 5,
105             blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
106              
107             # Inflight tracking with deadlines
108             # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
109             inflight => [],
110              
111             # Response reader synchronization
112             _reading_responses => 0,
113              
114             # Reconnection settings
115             reconnect => $args{reconnect} // 0,
116             reconnect_delay => $args{reconnect_delay} // 0.1,
117             reconnect_delay_max => $args{reconnect_delay_max} // 60,
118             reconnect_jitter => $args{reconnect_jitter} // 0.25,
119             _reconnect_attempt => 0,
120              
121             # Callbacks
122             on_connect => $args{on_connect},
123             on_disconnect => $args{on_disconnect},
124             on_error => $args{on_error},
125              
126             # Authentication
127             password => $args{password},
128             username => $args{username},
129             database => $args{database} // 0,
130             client_name => $args{client_name},
131              
132             # TLS
133             tls => $args{tls},
134              
135             # Key prefixing
136             prefix => $args{prefix},
137              
138             # Pipeline settings
139             pipeline_depth => $args{pipeline_depth} // 10000,
140             auto_pipeline => $args{auto_pipeline} // 0,
141              
142             # Transaction state
143             in_multi => 0,
144             watching => 0,
145              
146             # PubSub state
147             in_pubsub => 0,
148             _subscription => undef,
149             _pump_running => 0,
150              
151             # Fork safety
152             _pid => $$,
153              
154             # Script registry
155             _scripts => {},
156              
157             # Current read future for clean disconnect cancellation
158             _current_read_future => undef,
159              
160             # Telemetry options
161             debug => $args{debug},
162             otel_tracer => $args{otel_tracer},
163             otel_meter => $args{otel_meter},
164             otel_include_args => $args{otel_include_args} // 1,
165 81 100 50     8561 otel_redact => $args{otel_redact} // 1,
    100 100        
      100        
      50        
      50        
      50        
      50        
      50        
      50        
      100        
      50        
      50        
      50        
      50        
166             }, $class;
167              
168             # Initialize telemetry if any observability enabled
169 81 50 33     1631 if ($self->{debug} || $self->{otel_tracer} || $self->{otel_meter}) {
      33        
170             $self->{_telemetry} = Async::Redis::Telemetry->new(
171             tracer => $self->{otel_tracer},
172             meter => $self->{otel_meter},
173             debug => $self->{debug},
174             include_args => $self->{otel_include_args},
175             redact => $self->{otel_redact},
176             host => $self->{host},
177             port => $self->{port},
178 0   0     0 database => $self->{database} // 0,
179             );
180             }
181              
182 81         457 return $self;
183             }
184              
185             # Connect to Redis server
186 75     75 1 1408 async sub connect {
187 75         221 my ($self) = @_;
188              
189 75 50       310 return $self if $self->{connected};
190              
191             # Create socket — AF_UNIX for path, AF_INET for host:port
192 75         202 my ($socket, $sockaddr);
193              
194 75 100       315 if ($self->{path}) {
195             socket($socket, AF_UNIX, SOCK_STREAM, 0)
196             or die Async::Redis::Error::Connection->new(
197             message => "Cannot create unix socket: $!",
198             host => $self->{path},
199 1 50       70 port => 0,
200             );
201 1         31 IO::Handle::blocking($socket, 0);
202 1         11 $sockaddr = pack_sockaddr_un($self->{path});
203             } else {
204             $socket = IO::Socket::INET->new(
205             Proto => 'tcp',
206             Blocking => 0,
207             ) or die Async::Redis::Error::Connection->new(
208             message => "Cannot create socket: $!",
209             host => $self->{host},
210             port => $self->{port},
211 74 50       1181 );
212              
213             # Build sockaddr
214             my $addr = inet_aton($self->{host})
215             or die Async::Redis::Error::Connection->new(
216             message => "Cannot resolve host: $self->{host}",
217             host => $self->{host},
218             port => $self->{port},
219 74 50       75494 );
220 74         771 $sockaddr = pack_sockaddr_in($self->{port}, $addr);
221             }
222              
223             # Connect with timeout using Future->wait_any
224 75         988 my $connect_f = Future::IO->connect($socket, $sockaddr);
225 75         35084 my $sleep_f = Future::IO->sleep($self->{connect_timeout});
226              
227             my $timeout_f = $sleep_f->then(sub {
228 0     0   0 return Future->fail('connect_timeout');
229 75         10275 });
230              
231 75         8099 my $wait_f = Future->wait_any($connect_f, $timeout_f);
232              
233             # Use followed_by to handle both success and failure without await propagating failure
234             my $result_f = $wait_f->followed_by(sub {
235 75     75   94609 my ($f) = @_;
236 75         754 return Future->done($f); # wrap the future itself
237 75         13857 });
238              
239 75         7223 my $completed_f = await $result_f;
240              
241             # Now check the result
242 75 50       14962 if ($completed_f->is_failed) {
243 75         1067 my ($error) = $completed_f->failure;
244             # Don't call close() - let $socket go out of scope when we die.
245             # Perl's DESTROY will close it after the exception unwinds.
246              
247 75 50       1493 if ($error eq 'connect_timeout') {
248             die Async::Redis::Error::Timeout->new(
249             message => "Connect timed out after $self->{connect_timeout}s",
250             timeout => $self->{connect_timeout},
251 0         0 );
252             }
253             die Async::Redis::Error::Connection->new(
254             message => "$error",
255             host => $self->{path} // $self->{host},
256 75   66     2195 port => $self->{port} // 0,
      100        
257             );
258             }
259              
260             # TLS upgrade if enabled
261 0 0       0 if ($self->{tls}) {
262 0         0 eval {
263 0         0 $socket = await $self->_tls_upgrade($socket);
264             };
265 0 0       0 if ($@) {
266             # Don't call close() - let $socket go out of scope when we die.
267             # Perl's DESTROY will close it after the exception unwinds.
268 0         0 die $@;
269             }
270             }
271              
272 0         0 $self->{socket} = $socket;
273 0         0 $self->{parser} = _parser_class()->new(api => 1);
274 0         0 $self->{connected} = 1;
275 0         0 $self->{inflight} = [];
276 0         0 $self->{_reading_responses} = 0;
277 0         0 $self->{_pid} = $$; # Track PID for fork safety
278 0         0 $self->{_current_read_future} = undef;
279              
280             # Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME)
281 0         0 await $self->_redis_handshake;
282              
283             # Initialize auto-pipeline if enabled
284 0 0       0 if ($self->{auto_pipeline}) {
285             $self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
286             redis => $self,
287             max_depth => $self->{pipeline_depth},
288 0         0 );
289             }
290              
291             # Fire on_connect callback and reset reconnect counter
292 0 0       0 if ($self->{on_connect}) {
293 0         0 $self->{on_connect}->($self);
294             }
295 0         0 $self->{_reconnect_attempt} = 0;
296              
297             # Telemetry: record connection
298 0 0       0 if ($self->{_telemetry}) {
299 0         0 $self->{_telemetry}->record_connection(1);
300             $self->{_telemetry}->log_event('connected',
301 0   0     0 $self->{path} // "$self->{host}:$self->{port}");
302             }
303              
304 0         0 return $self;
305             }
306              
307             # Redis protocol handshake after TCP connect
308 0     0   0 async sub _redis_handshake {
309 0         0 my ($self) = @_;
310              
311             # Use connect_timeout for the entire handshake (AUTH, SELECT, CLIENT SETNAME)
312             # This ensures the handshake can't block forever if Redis hangs
313 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
314              
315             # AUTH (password or username+password for ACL)
316 0 0       0 if ($self->{password}) {
317 0         0 my @auth_args = ('AUTH');
318 0 0       0 push @auth_args, $self->{username} if $self->{username};
319 0         0 push @auth_args, $self->{password};
320              
321 0         0 my $cmd = $self->_build_command(@auth_args);
322 0         0 await $self->_send($cmd);
323              
324 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['AUTH']);
325 0         0 my $result = $self->_decode_response($response);
326              
327             # AUTH returns OK on success, throws on failure
328 0 0 0     0 unless ($result && $result eq 'OK') {
329 0         0 die Async::Redis::Error::Redis->new(
330             message => "Authentication failed: $result",
331             type => 'NOAUTH',
332             );
333             }
334             }
335              
336             # SELECT database
337 0 0 0     0 if ($self->{database} && $self->{database} != 0) {
338 0         0 my $cmd = $self->_build_command('SELECT', $self->{database});
339 0         0 await $self->_send($cmd);
340              
341 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['SELECT', $self->{database}]);
342 0         0 my $result = $self->_decode_response($response);
343              
344 0 0 0     0 unless ($result && $result eq 'OK') {
345 0         0 die Async::Redis::Error::Redis->new(
346             message => "SELECT failed: $result",
347             type => 'ERR',
348             );
349             }
350             }
351              
352             # CLIENT SETNAME
353 0 0       0 if ($self->{client_name}) {
354 0         0 my $cmd = $self->_build_command('CLIENT', 'SETNAME', $self->{client_name});
355 0         0 await $self->_send($cmd);
356              
357 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['CLIENT', 'SETNAME']);
358             # Ignore result - SETNAME failing shouldn't prevent connection
359             }
360             }
361              
362             # Check if connected to Redis
363             sub is_connected {
364 0     0 0 0 my ($self) = @_;
365 0 0       0 return $self->{connected} ? 1 : 0;
366             }
367              
368             # Disconnect from Redis
369             sub disconnect {
370 0     0 1 0 my ($self, $reason) = @_;
371 0   0     0 $reason //= 'client_disconnect';
372              
373 0         0 my $was_connected = $self->{connected};
374              
375             # Cancel any active read future BEFORE closing socket
376             # This ensures Future::IO unregisters its watcher while fileno is still valid
377 0 0 0     0 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
378 0         0 $self->{_current_read_future}->cancel;
379 0         0 $self->{_current_read_future} = undef;
380             }
381              
382             # Cancel any pending inflight operations before closing socket
383 0 0       0 if (my $inflight = $self->{inflight}) {
384 0         0 for my $entry (@$inflight) {
385 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
386 0         0 $entry->{future}->cancel;
387             }
388             }
389 0         0 $self->{inflight} = [];
390             }
391              
392 0 0       0 if ($self->{socket}) {
393 0         0 $self->_close_socket;
394             }
395 0         0 $self->{connected} = 0;
396 0         0 $self->{parser} = undef;
397 0         0 $self->{_reading_responses} = 0;
398              
399 0 0 0     0 if ($was_connected && $self->{on_disconnect}) {
400 0         0 $self->{on_disconnect}->($self, $reason);
401             }
402              
403             # Telemetry: record disconnection
404 0 0 0     0 if ($was_connected && $self->{_telemetry}) {
405 0         0 $self->{_telemetry}->record_connection(-1);
406 0         0 $self->{_telemetry}->log_event('disconnected', $reason);
407             }
408              
409 0         0 return $self;
410             }
411              
412             # Destructor - clean up socket when object is garbage collected
413             sub DESTROY {
414 81     81   37742 my ($self) = @_;
415             # Only clean up if we have a socket and it's still open
416 81 50 33     1329 if ($self->{socket} && fileno($self->{socket})) {
417 0         0 $self->_close_socket;
418             }
419             }
420              
421             # Properly close socket, canceling any pending futures first
422             sub _close_socket {
423 0     0   0 my ($self) = @_;
424              
425             # Take ownership - removes from $self immediately
426 0 0       0 my $socket = delete $self->{socket} or return;
427 0         0 my $fileno = fileno($socket);
428              
429             # Cancel any pending inflight futures - this propagates to
430             # Future::IO internals and cleans up any watchers on this socket.
431             # Important: must happen while fileno is still valid!
432 0 0       0 if (my $inflight = delete $self->{inflight}) {
433 0         0 for my $entry (@$inflight) {
434 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
435 0         0 $entry->{future}->cancel;
436             }
437             }
438             }
439 0         0 $self->{inflight} = [];
440              
441             # Initiate clean TCP shutdown (FIN) while fileno still valid
442 0 0       0 shutdown($socket, 2) if defined $fileno;
443              
444             # DON'T call close()!
445             # $socket falls out of scope here, Perl's DESTROY calls close().
446             # By this point, Future::IO has already unregistered its watchers
447             # via the cancel() calls above.
448             }
449              
450             # Check if fork occurred and invalidate connection
451             sub _check_fork {
452 0     0   0 my ($self) = @_;
453              
454 0 0 0     0 if ($self->{_pid} && $self->{_pid} != $$) {
455             # Fork detected - invalidate connection (parent owns the socket)
456             # Don't cancel futures - they belong to the parent's event loop
457             # Just clear references so we don't try to use them
458 0         0 $self->{connected} = 0;
459 0         0 $self->{socket} = undef;
460 0         0 $self->{parser} = undef;
461 0         0 $self->{inflight} = [];
462 0         0 $self->{_reading_responses} = 0;
463 0         0 $self->{_current_read_future} = undef; # Clear stale reference
464              
465 0         0 my $old_pid = $self->{_pid};
466 0         0 $self->{_pid} = $$;
467              
468 0 0       0 if ($self->{_telemetry}) {
469 0         0 $self->{_telemetry}->log_event('fork_detected', "old PID: $old_pid, new PID: $$");
470             }
471              
472 0         0 return 1; # Fork occurred
473             }
474              
475 0         0 return 0;
476             }
477              
478             # Build Redis command in RESP format
479             sub _build_command {
480 0     0   0 my ($self, @args) = @_;
481              
482 0         0 my $cmd = "*" . scalar(@args) . "\r\n";
483 0         0 for my $arg (@args) {
484 0   0     0 $arg //= '';
485 0         0 my $bytes = "$arg"; # stringify
486 0 0       0 utf8::encode($bytes) if utf8::is_utf8($bytes);
487 0         0 $cmd .= "\$" . length($bytes) . "\r\n" . $bytes . "\r\n";
488             }
489 0         0 return $cmd;
490             }
491              
492             # Send raw data
493 0     0   0 async sub _send {
494 0         0 my ($self, $data) = @_;
495 0         0 await Future::IO->write_exactly($self->{socket}, $data);
496 0         0 return length($data);
497             }
498              
499             # Add command to inflight queue - returns queue depth
500             sub _add_inflight {
501 0     0   0 my ($self, $future, $cmd, $args, $deadline) = @_;
502 0         0 push @{$self->{inflight}}, {
  0         0  
503             future => $future,
504             cmd => $cmd,
505             args => $args,
506             deadline => $deadline,
507             sent_at => Time::HiRes::time(),
508             };
509 0         0 return scalar @{$self->{inflight}};
  0         0  
510             }
511              
512             # Shift first entry from inflight queue
513             sub _shift_inflight {
514 0     0   0 my ($self) = @_;
515 0         0 return shift @{$self->{inflight}};
  0         0  
516             }
517              
518             # Fail all pending inflight futures with given error
519             sub _fail_all_inflight {
520 0     0   0 my ($self, $error) = @_;
521 0         0 while (my $entry = $self->_shift_inflight) {
522 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
523 0         0 $entry->{future}->fail($error);
524             }
525             }
526             }
527              
528             # Ensure response reader is running - the core response queue mechanism
529             # Only one reader should be active at a time, processing responses in FIFO order
530 0     0   0 async sub _ensure_response_reader {
531 0         0 my ($self) = @_;
532              
533             # Already reading - don't start another reader
534 0 0       0 return if $self->{_reading_responses};
535              
536 0         0 $self->{_reading_responses} = 1;
537              
538 0   0     0 while (@{$self->{inflight}} && $self->{connected}) {
  0         0  
539 0         0 my $entry = $self->{inflight}[0];
540              
541             # Read response with deadline from the entry
542 0         0 my $response;
543 0         0 my $read_ok = eval {
544             $response = await $self->_read_response_with_deadline(
545             $entry->{deadline},
546             $entry->{args}
547 0         0 );
548 0         0 1;
549             };
550              
551 0 0       0 if (!$read_ok) {
552 0         0 my $read_error = $@;
553             # Connection/timeout error - fail all inflight and abort
554 0         0 $self->_fail_all_inflight($read_error);
555 0         0 $self->{_reading_responses} = 0;
556 0         0 return;
557             }
558              
559             # Remove this entry from the queue now that we have its response
560 0         0 $self->_shift_inflight;
561              
562             # Decode response (sync operation, eval works fine here)
563 0         0 my $result;
564 0         0 my $decode_ok = eval {
565 0         0 $result = $self->_decode_response($response);
566 0         0 1;
567             };
568              
569             # Complete the future
570 0 0       0 if (!$decode_ok) {
571 0         0 my $decode_error = $@;
572             # Redis error (like WRONGTYPE) - fail just this future
573 0 0       0 $entry->{future}->fail($decode_error) unless $entry->{future}->is_ready;
574             } else {
575             # Success - complete the future with result
576 0 0       0 $entry->{future}->done($result) unless $entry->{future}->is_ready;
577             }
578             }
579              
580 0         0 $self->{_reading_responses} = 0;
581             }
582              
583             # Read and parse one response
584 0     0   0 async sub _read_response {
585 0         0 my ($self) = @_;
586              
587             # First check if parser already has a complete message
588             # (from previous read that contained multiple responses)
589 0 0       0 if (my $msg = $self->{parser}->get_message) {
590 0         0 return $msg;
591             }
592              
593             # Read until we get a complete message
594 0         0 while (1) {
595 0         0 my $buf = await Future::IO->read($self->{socket}, 65536);
596              
597             # EOF
598 0 0 0     0 if (!defined $buf || length($buf) == 0) {
599 0         0 die "Connection closed by server";
600             }
601              
602 0         0 $self->{parser}->parse($buf);
603              
604 0 0       0 if (my $msg = $self->{parser}->get_message) {
605 0         0 return $msg;
606             }
607             }
608             }
609              
610             # Calculate deadline based on command type
611             sub _calculate_deadline {
612 0     0   0 my ($self, $cmd, @args) = @_;
613              
614 0   0     0 $cmd = uc($cmd // '');
615              
616             # Blocking commands get extended deadline
617 0 0       0 if ($cmd =~ /^(BLPOP|BRPOP|BLMOVE|BRPOPLPUSH|BLMPOP|BZPOPMIN|BZPOPMAX|BZMPOP)$/) {
618             # Last arg is the timeout for these commands
619 0   0     0 my $server_timeout = $args[-1] // 0;
620 0         0 return Time::HiRes::time() + $server_timeout + $self->{blocking_timeout_buffer};
621             }
622              
623 0 0       0 if ($cmd =~ /^(XREAD|XREADGROUP)$/) {
624             # XREAD/XREADGROUP have BLOCK option
625 0         0 for my $i (0 .. $#args - 1) {
626 0 0       0 if (uc($args[$i]) eq 'BLOCK') {
627 0   0     0 my $block_ms = $args[$i + 1] // 0;
628 0         0 return Time::HiRes::time() + ($block_ms / 1000) + $self->{blocking_timeout_buffer};
629             }
630             }
631             }
632              
633             # Normal commands use request_timeout
634 0         0 return Time::HiRes::time() + $self->{request_timeout};
635             }
636              
637             # Non-blocking TLS upgrade
638 0     0   0 async sub _tls_upgrade {
639 0         0 my ($self, $socket) = @_;
640              
641 0         0 require IO::Socket::SSL;
642              
643             # Build SSL options
644 0         0 my %ssl_opts = (
645             SSL_startHandshake => 0, # Don't block during start_SSL!
646             );
647              
648 0 0       0 if (ref $self->{tls} eq 'HASH') {
649 0 0       0 $ssl_opts{SSL_ca_file} = $self->{tls}{ca_file} if $self->{tls}{ca_file};
650 0 0       0 $ssl_opts{SSL_cert_file} = $self->{tls}{cert_file} if $self->{tls}{cert_file};
651 0 0       0 $ssl_opts{SSL_key_file} = $self->{tls}{key_file} if $self->{tls}{key_file};
652              
653 0 0       0 if (exists $self->{tls}{verify}) {
654             $ssl_opts{SSL_verify_mode} = $self->{tls}{verify}
655 0 0       0 ? IO::Socket::SSL::SSL_VERIFY_PEER()
656             : IO::Socket::SSL::SSL_VERIFY_NONE();
657             } else {
658 0         0 $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
659             }
660             } else {
661 0         0 $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
662             }
663              
664             # Start SSL (does not block because SSL_startHandshake => 0)
665             IO::Socket::SSL->start_SSL($socket, %ssl_opts)
666             or die Async::Redis::Error::Connection->new(
667             message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
668             host => $self->{host},
669             port => $self->{port},
670 0 0       0 );
671              
672             # Drive handshake with non-blocking loop
673 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
674              
675 0         0 while (1) {
676             # Check timeout
677 0 0       0 if (Time::HiRes::time() >= $deadline) {
678             die Async::Redis::Error::Timeout->new(
679             message => "TLS handshake timed out",
680             timeout => $self->{connect_timeout},
681 0         0 );
682             }
683              
684             # Attempt handshake step
685 0         0 my $rv = $socket->connect_SSL;
686              
687 0 0       0 if ($rv) {
688             # Handshake complete!
689 0         0 return $socket;
690             }
691              
692             # Check what the handshake needs
693 0         0 my $remaining = $deadline - Time::HiRes::time();
694 0 0       0 $remaining = 0.1 if $remaining <= 0;
695              
696 0 0       0 if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
    0          
697             # Wait for socket to become readable with timeout
698 0         0 my $read_f = Future::IO->waitfor_readable($socket);
699             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
700 0     0   0 return Future->fail('tls_timeout');
701 0         0 });
702              
703 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
704 0         0 await $wait_f;
705              
706 0 0       0 if ($wait_f->is_failed) {
707             die Async::Redis::Error::Timeout->new(
708             message => "TLS handshake timed out",
709             timeout => $self->{connect_timeout},
710 0         0 );
711             }
712             }
713             elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
714             # Wait for socket to become writable with timeout
715 0         0 my $write_f = Future::IO->waitfor_writable($socket);
716             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
717 0     0   0 return Future->fail('tls_timeout');
718 0         0 });
719              
720 0         0 my $wait_f = Future->wait_any($write_f, $timeout_f);
721 0         0 await $wait_f;
722              
723 0 0       0 if ($wait_f->is_failed) {
724             die Async::Redis::Error::Timeout->new(
725             message => "TLS handshake timed out",
726             timeout => $self->{connect_timeout},
727 0         0 );
728             }
729             }
730             else {
731             # Actual error
732             die Async::Redis::Error::Connection->new(
733             message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
734             host => $self->{host},
735             port => $self->{port},
736 0         0 );
737             }
738             }
739             }
740              
741             # Reconnect with exponential backoff
742 0     0   0 async sub _reconnect {
743 0         0 my ($self) = @_;
744              
745 0         0 while (!$self->{connected}) {
746 0         0 $self->{_reconnect_attempt}++;
747 0         0 my $delay = $self->_calculate_backoff($self->{_reconnect_attempt});
748              
749 0         0 eval {
750 0         0 await $self->connect;
751             };
752              
753 0 0       0 if ($@) {
754 0         0 my $error = $@;
755              
756             # Fire on_error callback
757 0 0       0 if ($self->{on_error}) {
758 0         0 $self->{on_error}->($self, $error);
759             }
760              
761             # Wait before next attempt
762 0         0 await Future::IO->sleep($delay);
763             }
764             }
765             }
766              
767             # Reconnect and replay pubsub subscriptions
768 0     0   0 async sub _reconnect_pubsub {
769 0         0 my ($self) = @_;
770              
771             my $sub = $self->{_subscription}
772 0 0       0 or die Async::Redis::Error::Disconnected->new(
773             message => "No subscription to replay",
774             );
775              
776 0         0 my @replay = $sub->get_replay_commands;
777              
778             # Ensure connection state is fully cleaned up before reconnecting.
779             # _reset_connection may have already been called by _read_response,
780             # but if the socket was closed externally, we need to clean up
781             # stale IO watchers and state here. It is safe to call twice —
782             # the on_disconnect callback is guarded by $was_connected.
783 0         0 $self->_reset_connection('pubsub_reconnect');
784              
785 0         0 await $self->_reconnect;
786              
787             # Replay all subscription commands
788 0         0 for my $cmd (@replay) {
789 0         0 my ($command, @args) = @$cmd;
790              
791 0         0 await $self->_send_command($command, @args);
792              
793             # Read and discard subscription confirmations
794 0         0 for my $arg (@args) {
795 0         0 await $self->_read_pubsub_frame();
796             }
797             }
798              
799             # Re-enter pubsub mode
800 0         0 $self->{in_pubsub} = 1;
801             }
802              
803             # Execute a Redis command
804 0     0 1 0 async sub command {
805 0         0 my ($self, $cmd, @args) = @_;
806              
807             # Check for fork - invalidate connection if PID changed
808 0         0 $self->_check_fork;
809              
810             # Block regular commands on pubsub connection
811 0 0       0 if ($self->{in_pubsub}) {
812 0   0     0 my $ucmd = uc($cmd // '');
813 0 0       0 unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
814 0         0 die Async::Redis::Error::Protocol->new(
815             message => "Cannot execute '$cmd' on connection in PubSub mode",
816             );
817             }
818             }
819              
820             # Apply key prefixing if configured
821 0 0 0     0 if (defined $self->{prefix} && $self->{prefix} ne '') {
822             @args = Async::Redis::KeyExtractor::apply_prefix(
823 0         0 $self->{prefix}, $cmd, @args
824             );
825             }
826              
827             # Route through auto-pipeline if enabled
828 0 0       0 if ($self->{_auto_pipeline}) {
829 0         0 return await $self->{_auto_pipeline}->command($cmd, @args);
830             }
831              
832             # If disconnected and reconnect enabled, try to reconnect
833 0 0 0     0 if (!$self->{connected} && $self->{reconnect}) {
834 0         0 await $self->_reconnect;
835             }
836              
837             die Async::Redis::Error::Disconnected->new(
838             message => "Not connected",
839 0 0       0 ) unless $self->{connected};
840              
841             # Telemetry: start span and log send
842 0         0 my $span_context;
843 0         0 my $start_time = Time::HiRes::time();
844 0 0       0 if ($self->{_telemetry}) {
845 0         0 $span_context = $self->{_telemetry}->start_command_span($cmd, @args);
846 0         0 $self->{_telemetry}->log_send($cmd, @args);
847             }
848              
849 0         0 my $raw_cmd = $self->_build_command($cmd, @args);
850              
851             # Calculate deadline based on command type
852 0         0 my $deadline = $self->_calculate_deadline($cmd, @args);
853              
854             # Create response future and register in inflight queue BEFORE sending
855             # This ensures responses are matched in order
856 0         0 my $response_future = Future->new;
857 0         0 $self->_add_inflight($response_future, $cmd, \@args, $deadline);
858              
859 0         0 my $result;
860             my $error;
861              
862 0         0 my $send_ok = eval {
863             # Send command
864 0         0 await $self->_send($raw_cmd);
865 0         0 1;
866             };
867              
868 0 0       0 if (!$send_ok) {
869 0         0 $error = $@;
870             # Send failed - remove from inflight and fail
871 0         0 $self->_shift_inflight; # Remove the entry we just added
872 0 0       0 $response_future->fail($error) unless $response_future->is_ready;
873             } else {
874             # Trigger the response reader (fire and forget - it runs in background)
875 0         0 $self->_ensure_response_reader->retain;
876              
877             # Wait for our response future to be completed by the reader
878 0         0 my $await_ok = eval {
879 0         0 $result = await $response_future;
880 0         0 1;
881             };
882              
883 0 0       0 if (!$await_ok) {
884 0         0 $error = $@;
885             }
886             }
887              
888             # Telemetry: log result and end span
889 0 0       0 if ($self->{_telemetry}) {
890 0         0 my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
891 0 0       0 if ($error) {
892 0         0 $self->{_telemetry}->log_error($error);
893             }
894             else {
895 0         0 $self->{_telemetry}->log_recv($result, $elapsed_ms);
896             }
897 0         0 $self->{_telemetry}->end_command_span($span_context, $error);
898             }
899              
900 0 0       0 die $error if $error;
901 0         0 return $result;
902             }
903              
904             # Read response with deadline enforcement
905 0     0   0 async sub _read_response_with_deadline {
906 0         0 my ($self, $deadline, $cmd_ref) = @_;
907              
908             # First check if parser already has a complete message
909 0 0       0 if (my $msg = $self->{parser}->get_message) {
910 0         0 return $msg;
911             }
912              
913             # Read until we get a complete message
914 0         0 while (1) {
915 0         0 my $remaining = $deadline - Time::HiRes::time();
916              
917 0 0       0 if ($remaining <= 0) {
918 0         0 $self->_reset_connection;
919             die Async::Redis::Error::Timeout->new(
920             message => "Request timed out after $self->{request_timeout}s",
921             command => $cmd_ref,
922             timeout => $self->{request_timeout},
923 0         0 maybe_executed => 1, # already sent the command
924             );
925             }
926              
927             # Use wait_any for timeout
928 0         0 my $read_f = Future::IO->read($self->{socket}, 65536);
929              
930             # Store reference so disconnect() can cancel it
931 0         0 $self->{_current_read_future} = $read_f;
932              
933             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
934 0     0   0 return Future->fail('read_timeout');
935 0         0 });
936              
937 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
938 0         0 await $wait_f;
939              
940             # Clear stored reference after await completes
941 0         0 $self->{_current_read_future} = undef;
942              
943             # Check if read was cancelled (by disconnect)
944 0 0       0 if ($read_f->is_cancelled) {
945 0         0 die Async::Redis::Error::Disconnected->new(
946             message => "Disconnected during read",
947             );
948             }
949              
950 0 0       0 if ($wait_f->is_failed) {
951 0         0 my ($error) = $wait_f->failure;
952 0 0       0 if ($error eq 'read_timeout') {
953 0         0 $self->_reset_connection;
954             die Async::Redis::Error::Timeout->new(
955             message => "Request timed out after $self->{request_timeout}s",
956             command => $cmd_ref,
957             timeout => $self->{request_timeout},
958 0         0 maybe_executed => 1,
959             );
960             }
961 0         0 $self->_reset_connection;
962 0         0 die Async::Redis::Error::Connection->new(
963             message => "$error",
964             );
965             }
966              
967             # Get the read result
968 0         0 my $buf = $wait_f->get;
969              
970             # EOF
971 0 0 0     0 if (!defined $buf || length($buf) == 0) {
972 0         0 $self->_reset_connection;
973 0         0 die Async::Redis::Error::Connection->new(
974             message => "Connection closed by server",
975             );
976             }
977              
978 0         0 $self->{parser}->parse($buf);
979              
980 0 0       0 if (my $msg = $self->{parser}->get_message) {
981 0         0 return $msg;
982             }
983             }
984             }
985              
986             # Reset connection after timeout (stream is desynced)
987             sub _reset_connection {
988 1     1   17 my ($self, $reason) = @_;
989 1   50     5 $reason //= 'timeout';
990              
991 1         3 my $was_connected = $self->{connected};
992              
993             # Cancel any active read future BEFORE closing socket
994             # This ensures Future::IO unregisters its watcher while fileno is still valid
995 1 50 33     6 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
996 0         0 $self->{_current_read_future}->cancel;
997 0         0 $self->{_current_read_future} = undef;
998             }
999              
1000             # Cancel any pending inflight operations before closing socket
1001 1 50       6 if (my $inflight = $self->{inflight}) {
1002 1         4 for my $entry (@$inflight) {
1003 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
1004 0         0 $entry->{future}->cancel;
1005             }
1006             }
1007 1         4 $self->{inflight} = [];
1008             }
1009              
1010 1 50       4 if ($self->{socket}) {
1011 0         0 $self->_close_socket;
1012             }
1013              
1014 1         3 $self->{connected} = 0;
1015 1         2 $self->{parser} = undef;
1016 1         3 $self->{_reading_responses} = 0;
1017 1         2 $self->{in_pubsub} = 0;
1018              
1019 1 50 33     11 if ($was_connected && $self->{on_disconnect}) {
1020 0           $self->{on_disconnect}->($self, $reason);
1021             }
1022             }
1023              
1024             # Decode Protocol::Redis response to Perl value
1025             sub _decode_response {
1026 0     0     my ($self, $msg) = @_;
1027              
1028 0 0         return undef unless $msg;
1029              
1030 0           my $type = $msg->{type};
1031 0           my $data = $msg->{data};
1032              
1033             # Simple string (+)
1034 0 0         if ($type eq '+') {
    0          
    0          
    0          
    0          
1035 0           return $data;
1036             }
1037             # Error (-)
1038             elsif ($type eq '-') {
1039 0           die Async::Redis::Error::Redis->from_message($data);
1040             }
1041             # Integer (:)
1042             elsif ($type eq ':') {
1043 0           return 0 + $data;
1044             }
1045             # Bulk string ($)
1046             elsif ($type eq '$') {
1047 0           return $data; # undef for null bulk
1048             }
1049             # Array (*)
1050             elsif ($type eq '*') {
1051 0 0         return undef unless defined $data; # null array
1052 0           return [ map { $self->_decode_response($_) } @$data ];
  0            
1053             }
1054              
1055 0           return $data;
1056             }
1057              
1058             # ============================================================================
1059             # Convenience Commands
1060             # ============================================================================
1061              
1062 0     0 0   async sub ping {
1063 0           my ($self) = @_;
1064 0           return await $self->command('PING');
1065             }
1066              
1067 0     0 0   async sub set {
1068 0           my ($self, $key, $value, %opts) = @_;
1069 0           my @cmd = ('SET', $key, $value);
1070 0 0         push @cmd, 'EX', $opts{ex} if exists $opts{ex};
1071 0 0         push @cmd, 'PX', $opts{px} if exists $opts{px};
1072 0 0         push @cmd, 'NX' if $opts{nx};
1073 0 0         push @cmd, 'XX' if $opts{xx};
1074 0           return await $self->command(@cmd);
1075             }
1076              
1077 0     0 0   async sub get {
1078 0           my ($self, $key) = @_;
1079 0           return await $self->command('GET', $key);
1080             }
1081              
1082 0     0 0   async sub del {
1083 0           my ($self, @keys) = @_;
1084 0           return await $self->command('DEL', @keys);
1085             }
1086              
1087 0     0 0   async sub incr {
1088 0           my ($self, $key) = @_;
1089 0           return await $self->command('INCR', $key);
1090             }
1091              
1092 0     0 0   async sub lpush {
1093 0           my ($self, $key, @values) = @_;
1094 0           return await $self->command('LPUSH', $key, @values);
1095             }
1096              
1097 0     0 0   async sub rpush {
1098 0           my ($self, $key, @values) = @_;
1099 0           return await $self->command('RPUSH', $key, @values);
1100             }
1101              
1102 0     0 0   async sub lpop {
1103 0           my ($self, $key) = @_;
1104 0           return await $self->command('LPOP', $key);
1105             }
1106              
1107 0     0 0   async sub lrange {
1108 0           my ($self, $key, $start, $stop) = @_;
1109 0           return await $self->command('LRANGE', $key, $start, $stop);
1110             }
1111              
1112 0     0 1   async sub keys {
1113 0           my ($self, $pattern) = @_;
1114 0           return await $self->command('KEYS', $pattern // '*');
1115             }
1116              
1117 0     0 0   async sub flushdb {
1118 0           my ($self) = @_;
1119 0           return await $self->command('FLUSHDB');
1120             }
1121              
1122             # ============================================================================
1123             # Lua Scripting
1124             # ============================================================================
1125              
1126 0     0 0   async sub script_load {
1127 0           my ($self, $script) = @_;
1128 0           return await $self->command('SCRIPT', 'LOAD', $script);
1129             }
1130              
1131 0     0 0   async sub script_exists {
1132 0           my ($self, @shas) = @_;
1133 0           return await $self->command('SCRIPT', 'EXISTS', @shas);
1134             }
1135              
1136 0     0 0   async sub script_flush {
1137 0           my ($self, $mode) = @_;
1138 0           my @args = ('SCRIPT', 'FLUSH');
1139 0 0         push @args, $mode if $mode; # ASYNC or SYNC
1140 0           return await $self->command(@args);
1141             }
1142              
1143 0     0 0   async sub script_kill {
1144 0           my ($self) = @_;
1145 0           return await $self->command('SCRIPT', 'KILL');
1146             }
1147              
1148 0     0 0   async sub evalsha_or_eval {
1149 0           my ($self, $sha, $script, $numkeys, @keys_and_args) = @_;
1150              
1151             # Try EVALSHA first
1152 0           my $result;
1153 0           eval {
1154 0           $result = await $self->evalsha($sha, $numkeys, @keys_and_args);
1155             };
1156              
1157 0 0         if ($@) {
1158 0           my $error = $@;
1159              
1160             # Check if it's a NOSCRIPT error
1161 0 0         if ("$error" =~ /NOSCRIPT/i) {
1162             # Fall back to EVAL (which also loads the script)
1163 0           $result = await $self->eval($script, $numkeys, @keys_and_args);
1164             }
1165             else {
1166             # Re-throw other errors
1167 0           die $error;
1168             }
1169             }
1170              
1171 0           return $result;
1172             }
1173              
1174             sub script {
1175 0     0 1   my ($self, $code) = @_;
1176 0           return Async::Redis::Script->new(
1177             redis => $self,
1178             script => $code,
1179             );
1180             }
1181              
1182             # Define a named script command
1183             # Usage: $redis->define_command(name => { keys => N, lua => '...' })
1184             sub define_command {
1185 0     0 1   my ($self, $name, $def) = @_;
1186              
1187 0 0 0       die "Command name required" unless defined $name && length $name;
1188 0 0         die "Command definition required" unless ref $def eq 'HASH';
1189 0 0         die "Lua script required (lua => '...')" unless defined $def->{lua};
1190              
1191             # Validate name (alphanumeric and underscore only)
1192 0 0         die "Invalid command name '$name' - use only alphanumeric and underscore"
1193             unless $name =~ /^[a-zA-Z_][a-zA-Z0-9_]*$/;
1194              
1195             my $script = Async::Redis::Script->new(
1196             redis => $self,
1197             script => $def->{lua},
1198             name => $name,
1199             num_keys => $def->{keys} // 'dynamic',
1200             description => $def->{description},
1201 0   0       );
1202              
1203 0           $self->{_scripts}{$name} = $script;
1204              
1205             # Optional: install as method on this instance
1206 0 0         if ($def->{install}) {
1207 0           $self->_install_script_method($name);
1208             }
1209              
1210 0           return $script;
1211             }
1212              
1213             # Run a registered script by name
1214             # Usage: $redis->run_script('name', @keys_then_args)
1215             # If num_keys is 'dynamic', first arg is the key count
1216 0     0 1   async sub run_script {
1217 0           my ($self, $name, @args) = @_;
1218              
1219 0 0         my $script = $self->{_scripts}{$name}
1220             or die "Unknown script: '$name' - use define_command() first";
1221              
1222 0           my $num_keys = $script->num_keys;
1223              
1224             # Handle dynamic key count
1225 0 0         if ($num_keys eq 'dynamic') {
1226 0           $num_keys = shift @args;
1227 0 0         die "Key count required as first argument for dynamic script '$name'"
1228             unless defined $num_keys;
1229             }
1230              
1231             # Split args into keys and argv
1232 0           my @keys = splice(@args, 0, $num_keys);
1233 0           return await $script->run(\@keys, \@args);
1234             }
1235              
1236             # Get a registered script by name
1237             sub get_script {
1238 0     0 1   my ($self, $name) = @_;
1239 0           return $self->{_scripts}{$name};
1240             }
1241              
1242             # List all registered script names
1243             sub list_scripts {
1244 0     0 1   my ($self) = @_;
1245 0           return CORE::keys %{$self->{_scripts}};
  0            
1246             }
1247              
1248             # Preload all registered scripts to Redis
1249             # Useful before pipeline execution
1250 0     0 1   async sub preload_scripts {
1251 0           my ($self) = @_;
1252              
1253 0           my @names = $self->list_scripts;
1254 0 0         return 0 unless @names;
1255              
1256 0           for my $name (@names) {
1257 0           my $script = $self->{_scripts}{$name};
1258 0           await $self->script_load($script->script);
1259             }
1260              
1261 0           return scalar @names;
1262             }
1263              
1264             # Install a script as a method (internal)
1265             sub _install_script_method {
1266 0     0     my ($self, $name) = @_;
1267              
1268             # Create closure that captures $name
1269             my $method = sub {
1270 0     0     my ($self, @args) = @_;
1271 0           return $self->run_script($name, @args);
1272 0           };
1273              
1274             # Install on the class (affects all instances)
1275 73     73   1054 no strict 'refs';
  73         204  
  73         4566  
1276 73     73   579 no warnings 'redefine';
  73         175  
  73         409137  
1277 0           *{"Async::Redis::$name"} = $method;
  0            
1278             }
1279              
1280             # ============================================================================
1281             # SCAN Iterators
1282             # ============================================================================
1283              
1284             sub scan_iter {
1285 0     0 1   my ($self, %opts) = @_;
1286             return Async::Redis::Iterator->new(
1287             redis => $self,
1288             command => 'SCAN',
1289             match => $opts{match},
1290             count => $opts{count},
1291             type => $opts{type},
1292 0           );
1293             }
1294              
1295             sub hscan_iter {
1296 0     0 0   my ($self, $key, %opts) = @_;
1297             return Async::Redis::Iterator->new(
1298             redis => $self,
1299             command => 'HSCAN',
1300             key => $key,
1301             match => $opts{match},
1302             count => $opts{count},
1303 0           );
1304             }
1305              
1306             sub sscan_iter {
1307 0     0 0   my ($self, $key, %opts) = @_;
1308             return Async::Redis::Iterator->new(
1309             redis => $self,
1310             command => 'SSCAN',
1311             key => $key,
1312             match => $opts{match},
1313             count => $opts{count},
1314 0           );
1315             }
1316              
1317             sub zscan_iter {
1318 0     0 0   my ($self, $key, %opts) = @_;
1319             return Async::Redis::Iterator->new(
1320             redis => $self,
1321             command => 'ZSCAN',
1322             key => $key,
1323             match => $opts{match},
1324             count => $opts{count},
1325 0           );
1326             }
1327              
1328             # ============================================================================
1329             # Transactions
1330             # ============================================================================
1331              
1332 0     0 1   async sub multi {
1333 0           my ($self, $callback) = @_;
1334              
1335             # Prevent nested multi() calls
1336             die "Cannot nest multi() calls - already in a transaction"
1337 0 0         if $self->{in_multi};
1338              
1339             # Mark that we're collecting transaction commands
1340 0           $self->{in_multi} = 1;
1341              
1342 0           my @commands;
1343 0           eval {
1344             # Create transaction collector
1345 0           my $tx = Async::Redis::Transaction->new(redis => $self);
1346              
1347             # Run callback to collect commands
1348 0           await $callback->($tx);
1349              
1350 0           @commands = $tx->commands;
1351             };
1352 0           my $collect_error = $@;
1353              
1354 0 0         if ($collect_error) {
1355 0           $self->{in_multi} = 0;
1356 0           die $collect_error;
1357             }
1358              
1359             # If no commands queued, return empty result
1360 0 0         unless (@commands) {
1361 0           $self->{in_multi} = 0;
1362 0           return [];
1363             }
1364              
1365             # Execute transaction (in_multi already set)
1366 0           return await $self->_execute_transaction(\@commands);
1367             }
1368              
1369 0     0     async sub _execute_transaction {
1370 0           my ($self, $commands) = @_;
1371              
1372             # in_multi should already be set by caller
1373              
1374 0           my $results;
1375 0           eval {
1376             # Send MULTI
1377 0           await $self->command('MULTI');
1378              
1379             # Queue all commands (they return +QUEUED)
1380 0           for my $cmd (@$commands) {
1381 0           await $self->command(@$cmd);
1382             }
1383              
1384             # Execute and get results
1385 0           $results = await $self->command('EXEC');
1386             };
1387 0           my $error = $@;
1388              
1389             # Always clear transaction state
1390 0           $self->{in_multi} = 0;
1391              
1392 0 0         if ($error) {
1393             # Try to clean up
1394 0           eval { await $self->command('DISCARD') };
  0            
1395 0           die $error;
1396             }
1397              
1398 0           return $results;
1399             }
1400              
1401             # Accessor for pool cleanliness tracking
1402 0     0 0   sub in_multi { shift->{in_multi} }
1403 0     0 0   sub watching { shift->{watching} }
1404 0     0 0   sub in_pubsub { shift->{in_pubsub} }
1405 0   0 0 0   sub inflight_count { scalar @{shift->{inflight} // []} }
  0            
1406              
1407             # Is connection dirty (unsafe to reuse)?
1408             sub is_dirty {
1409 0     0 0   my ($self) = @_;
1410              
1411 0 0         return 1 if $self->{in_multi};
1412 0 0         return 1 if $self->{watching};
1413 0 0         return 1 if $self->{in_pubsub};
1414 0 0 0       return 1 if @{$self->{inflight} // []} > 0;
  0            
1415              
1416 0           return 0;
1417             }
1418              
1419 0     0 1   async sub watch {
1420 0           my ($self, @keys) = @_;
1421 0           $self->{watching} = 1;
1422 0           return await $self->command('WATCH', @keys);
1423             }
1424              
1425 0     0 0   async sub unwatch {
1426 0           my ($self) = @_;
1427 0           my $result = await $self->command('UNWATCH');
1428 0           $self->{watching} = 0;
1429 0           return $result;
1430             }
1431              
1432 0     0 0   async sub multi_start {
1433 0           my ($self) = @_;
1434 0           $self->{in_multi} = 1;
1435 0           return await $self->command('MULTI');
1436             }
1437              
1438 0     0 0   async sub exec {
1439 0           my ($self) = @_;
1440 0           my $result = await $self->command('EXEC');
1441 0           $self->{in_multi} = 0;
1442 0           $self->{watching} = 0; # EXEC clears watches
1443 0           return $result;
1444             }
1445              
1446 0     0 0   async sub discard {
1447 0           my ($self) = @_;
1448 0           my $result = await $self->command('DISCARD');
1449 0           $self->{in_multi} = 0;
1450             # Note: DISCARD does NOT clear watches
1451 0           return $result;
1452             }
1453              
1454 0     0 1   async sub watch_multi {
1455 0           my ($self, $keys, $callback) = @_;
1456              
1457             # WATCH the keys
1458 0           await $self->watch(@$keys);
1459              
1460             # Get current values of watched keys
1461 0           my %watched;
1462 0           for my $key (@$keys) {
1463 0           $watched{$key} = await $self->get($key);
1464             }
1465              
1466             # Create transaction collector
1467 0           my $tx = Async::Redis::Transaction->new(redis => $self);
1468              
1469             # Run callback with watched values
1470 0           await $callback->($tx, \%watched);
1471              
1472 0           my @commands = $tx->commands;
1473              
1474             # If no commands queued, just unwatch and return empty
1475 0 0         unless (@commands) {
1476 0           await $self->unwatch;
1477 0           return [];
1478             }
1479              
1480             # Execute transaction
1481 0           $self->{in_multi} = 1;
1482              
1483 0           my $results;
1484 0           eval {
1485 0           await $self->command('MULTI');
1486              
1487 0           for my $cmd (@commands) {
1488 0           await $self->command(@$cmd);
1489             }
1490              
1491 0           $results = await $self->command('EXEC');
1492             };
1493 0           my $error = $@;
1494              
1495 0           $self->{in_multi} = 0;
1496 0           $self->{watching} = 0;
1497              
1498 0 0         if ($error) {
1499 0           eval { await $self->command('DISCARD') };
  0            
1500 0           die $error;
1501             }
1502              
1503             # EXEC returns undef/nil if WATCH failed
1504 0           return $results;
1505             }
1506              
1507             # ============================================================================
1508             # PUB/SUB
1509             # ============================================================================
1510              
1511             # Wait for inflight commands to complete before mode change
1512 0     0     async sub _wait_for_inflight_drain {
1513 0           my ($self, $timeout) = @_;
1514 0   0       $timeout //= 30;
1515              
1516 0 0         return unless @{$self->{inflight}};
  0            
1517              
1518 0           my $deadline = Time::HiRes::time() + $timeout;
1519              
1520 0   0       while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
  0            
1521 0           await Future::IO->sleep(0.001);
1522             }
1523              
1524 0 0         if (@{$self->{inflight}}) {
  0            
1525 0           $self->_fail_all_inflight("Timeout waiting for inflight commands");
1526             }
1527             }
1528              
1529 0     0 0   async sub publish {
1530 0           my ($self, $channel, $message) = @_;
1531 0           return await $self->command('PUBLISH', $channel, $message);
1532             }
1533              
1534 0     0 0   async sub spublish {
1535 0           my ($self, $channel, $message) = @_;
1536 0           return await $self->command('SPUBLISH', $channel, $message);
1537             }
1538              
1539             # Subscribe to channels - returns a Subscription object
1540 0     0 1   async sub subscribe {
1541 0           my ($self, @channels) = @_;
1542              
1543             die Async::Redis::Error::Disconnected->new(
1544             message => "Not connected",
1545 0 0         ) unless $self->{connected};
1546              
1547             # Wait for pending commands before entering PubSub mode
1548 0           await $self->_wait_for_inflight_drain;
1549              
1550             # Create or reuse subscription
1551 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1552              
1553             # Send SUBSCRIBE command
1554 0           await $self->_send_command('SUBSCRIBE', @channels);
1555              
1556             # Read subscription confirmations
1557 0           for my $ch (@channels) {
1558 0           my $msg = await $self->_read_pubsub_frame();
1559             # Response: ['subscribe', $channel, $count]
1560 0           $sub->_add_channel($ch);
1561             }
1562              
1563 0           $self->{in_pubsub} = 1;
1564              
1565 0           return $sub;
1566             }
1567              
1568             # Pattern subscribe
1569 0     0 1   async sub psubscribe {
1570 0           my ($self, @patterns) = @_;
1571              
1572             die Async::Redis::Error::Disconnected->new(
1573             message => "Not connected",
1574 0 0         ) unless $self->{connected};
1575              
1576             # Wait for pending commands before entering PubSub mode
1577 0           await $self->_wait_for_inflight_drain;
1578              
1579 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1580              
1581 0           await $self->_send_command('PSUBSCRIBE', @patterns);
1582              
1583 0           for my $p (@patterns) {
1584 0           my $msg = await $self->_read_pubsub_frame();
1585 0           $sub->_add_pattern($p);
1586             }
1587              
1588 0           $self->{in_pubsub} = 1;
1589              
1590 0           return $sub;
1591             }
1592              
1593             # Sharded subscribe (Redis 7+)
1594 0     0 0   async sub ssubscribe {
1595 0           my ($self, @channels) = @_;
1596              
1597             die Async::Redis::Error::Disconnected->new(
1598             message => "Not connected",
1599 0 0         ) unless $self->{connected};
1600              
1601             # Wait for pending commands before entering PubSub mode
1602 0           await $self->_wait_for_inflight_drain;
1603              
1604 0   0       my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1605              
1606 0           await $self->_send_command('SSUBSCRIBE', @channels);
1607              
1608 0           for my $ch (@channels) {
1609 0           my $msg = await $self->_read_pubsub_frame();
1610 0           $sub->_add_sharded_channel($ch);
1611             }
1612              
1613 0           $self->{in_pubsub} = 1;
1614              
1615 0           return $sub;
1616             }
1617              
1618             # Read pubsub frame (subscription confirmation or message)
1619 0     0     async sub _read_pubsub_frame {
1620 0           my ($self) = @_;
1621              
1622             die Async::Redis::Error::Disconnected->new(
1623             message => "Not connected",
1624 0 0         ) unless $self->{connected};
1625              
1626 0           my $msg = await $self->_read_response();
1627 0           return $self->_decode_response($msg);
1628             }
1629              
1630             # Send command without reading response (for pubsub)
1631 0     0     async sub _send_command {
1632 0           my ($self, @args) = @_;
1633              
1634 0           my $cmd = $self->_build_command(@args);
1635 0           await $self->_send($cmd);
1636             }
1637              
1638             # Read next pubsub message (blocking) - for compatibility
1639 0     0     async sub _read_pubsub_message {
1640 0           my ($self) = @_;
1641              
1642 0           my $msg = await $self->_read_response();
1643              
1644             # Message format: ['message', $channel, $payload]
1645             # or: ['pmessage', $pattern, $channel, $payload]
1646 0           return $msg;
1647             }
1648              
1649             # ============================================================================
1650             # Pipelining
1651             # ============================================================================
1652              
1653             sub pipeline {
1654 0     0 1   my ($self, %opts) = @_;
1655             return Async::Redis::Pipeline->new(
1656             redis => $self,
1657             max_depth => $opts{max_depth} // $self->{pipeline_depth},
1658 0   0       );
1659             }
1660              
1661             # Execute multiple commands, return all responses
1662 0     0     async sub _execute_pipeline {
1663 0           my ($self, $commands) = @_;
1664              
1665 0 0         die "Not connected" unless $self->{connected};
1666              
1667 0 0         return [] unless @$commands;
1668              
1669             # Wait for any inflight regular commands to complete before pipeline
1670             # This prevents interleaving pipeline responses with regular command responses
1671 0           await $self->_wait_for_inflight_drain;
1672              
1673             # Take over reading - prevent response reader from running
1674 0           $self->{_reading_responses} = 1;
1675              
1676 0           my $start_time = Time::HiRes::time();
1677 0           my @responses;
1678 0           my $count = scalar @$commands;
1679              
1680 0           my $ok = eval {
1681             # Send all commands
1682 0           my $data = '';
1683 0           for my $cmd (@$commands) {
1684 0           $data .= $self->_build_command(@$cmd);
1685             }
1686 0           await $self->_send($data);
1687              
1688             # Read all responses, capturing per-slot Redis errors
1689 0           for my $i (1 .. $count) {
1690 0           my $msg = await $self->_read_response();
1691              
1692             # Capture Redis errors inline rather than dying
1693 0           my $result;
1694 0           eval {
1695 0           $result = $self->_decode_response($msg);
1696             };
1697 0 0         if ($@) {
1698             # Capture the error object inline in the results
1699 0           $result = $@;
1700             }
1701 0           push @responses, $result;
1702             }
1703 0           1;
1704             };
1705              
1706 0           my $error = $@;
1707              
1708             # Release reading lock
1709 0           $self->{_reading_responses} = 0;
1710              
1711 0 0         if (!$ok) {
1712 0           die $error;
1713             }
1714              
1715             # Telemetry: record pipeline metrics
1716 0 0         if ($self->{_telemetry}) {
1717 0           my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
1718 0           $self->{_telemetry}->record_pipeline($count, $elapsed_ms);
1719             }
1720              
1721 0           return \@responses;
1722             }
1723              
1724             1;
1725              
1726             __END__