File Coverage

blib/lib/Async/Redis.pm
Criterion Covered Total %
statement 132 770 17.1
branch 23 282 8.1
condition 27 144 18.7
subroutine 33 109 30.2
pod 18 49 36.7
total 233 1354 17.2


line stmt bran cond sub pod time code
1             package Async::Redis;
2              
3 75     75   18202257 use strict;
  75         128  
  75         2731  
4 75     75   369 use warnings;
  75         132  
  75         3217  
5 75     75   1008 use 5.018;
  75         214  
6              
7             our $VERSION = '0.001007';
8              
9 75     75   306 use Future;
  75         157  
  75         1770  
10 75     75   269 use Future::AsyncAwait;
  75         104  
  75         430  
11 75     75   3863 use Future::IO 0.19;
  75         1061  
  75         3265  
12 75     75   272 use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
  75         205  
  75         5005  
13 75     75   509 use IO::Handle ();
  75         169  
  75         1298  
14 75     75   31873 use IO::Socket::INET;
  75         711691  
  75         392  
15 75     75   39066 use Time::HiRes ();
  75         106  
  75         1766  
16              
17             # Error classes
18 75     75   35556 use Async::Redis::Error::Connection;
  75         170  
  75         2609  
19 75     75   27063 use Async::Redis::Error::Timeout;
  75         174  
  75         2286  
20 75     75   26870 use Async::Redis::Error::Disconnected;
  75         207  
  75         2168  
21 75     75   28202 use Async::Redis::Error::Redis;
  75         163  
  75         2956  
22 75     75   26946 use Async::Redis::Error::Protocol;
  75         195  
  75         2190  
23              
24             # Import auto-generated command methods
25 75     75   37393 use Async::Redis::Commands;
  75         273  
  75         5179  
26             our @ISA = qw(Async::Redis::Commands);
27              
28             # Key extraction for prefixing
29 75     75   36454 use Async::Redis::KeyExtractor;
  75         228  
  75         4119  
30              
31             # Transaction support
32 75     75   30360 use Async::Redis::Transaction;
  75         222  
  75         3091  
33              
34             # Script support
35 75     75   30503 use Async::Redis::Script;
  75         200  
  75         4123  
36              
37             # Iterator support
38 75     75   27874 use Async::Redis::Iterator;
  75         217  
  75         3800  
39              
40             # Pipeline support
41 75     75   28890 use Async::Redis::Pipeline;
  75         211  
  75         4074  
42 75     75   27200 use Async::Redis::AutoPipeline;
  75         204  
  75         3948  
43              
44             # PubSub support
45 75     75   33604 use Async::Redis::Subscription;
  75         239  
  75         6710  
46              
47             # Telemetry support
48 75     75   32421 use Async::Redis::Telemetry;
  75         3601  
  75         6829  
49              
50             # Try XS version first, fall back to pure Perl
51             BEGIN {
52 75 50   75   777 eval { require Protocol::Redis::XS; 1 }
  75         27461  
  75         1214858  
53             or require Protocol::Redis;
54             }
55              
56             sub _parser_class {
57 0 0   0   0 return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
58             }
59              
60             sub _calculate_backoff {
61 0     0   0 my ($self, $attempt) = @_;
62              
63             # Exponential: delay * 2^(attempt-1)
64 0         0 my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
65              
66             # Cap at max
67 0 0       0 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
68              
69             # Apply jitter: delay * (1 +/- jitter)
70 0 0       0 if ($self->{reconnect_jitter} > 0) {
71 0         0 my $jitter_range = $delay * $self->{reconnect_jitter};
72 0         0 my $jitter = (rand(2) - 1) * $jitter_range;
73 0         0 $delay += $jitter;
74             }
75              
76 0         0 return $delay;
77             }
78              
79             sub new {
80 84     84 1 3648685 my ($class, %args) = @_;
81              
82             # Parse URI if provided
83 84 100       529 if ($args{uri}) {
84 2         507 require Async::Redis::URI;
85 2         13 my $uri = Async::Redis::URI->parse($args{uri});
86 2 50       7 if ($uri) {
87 2         6 my %uri_args = $uri->to_hash;
88             # URI values are defaults, explicit args override
89 2         5 %args = (%uri_args, %args);
90 2         8 delete $args{uri}; # don't store the string
91             }
92             }
93              
94             my $self = bless {
95             path => $args{path},
96             host => $args{path} ? undef : ($args{host} // 'localhost'),
97             port => $args{path} ? undef : ($args{port} // 6379),
98             socket => undef,
99             parser => undef,
100             connected => 0,
101              
102             # Timeout settings
103             connect_timeout => $args{connect_timeout} // 10,
104             request_timeout => $args{request_timeout} // 5,
105             blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
106              
107             # Inflight tracking with deadlines
108             # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
109             inflight => [],
110              
111             # Response reader synchronization
112             _reading_responses => 0,
113              
114             # Reconnection settings
115             reconnect => $args{reconnect} // 0,
116             reconnect_delay => $args{reconnect_delay} // 0.1,
117             reconnect_delay_max => $args{reconnect_delay_max} // 60,
118             reconnect_jitter => $args{reconnect_jitter} // 0.25,
119             _reconnect_attempt => 0,
120              
121             # Callbacks
122             on_connect => $args{on_connect},
123             on_disconnect => $args{on_disconnect},
124             on_error => $args{on_error},
125              
126             # Authentication
127             password => $args{password},
128             username => $args{username},
129             database => $args{database} // 0,
130             client_name => $args{client_name},
131              
132             # TLS
133             tls => $args{tls},
134              
135             # Key prefixing
136             prefix => $args{prefix},
137              
138             # Pipeline settings
139             pipeline_depth => $args{pipeline_depth} // 10000,
140             auto_pipeline => $args{auto_pipeline} // 0,
141              
142             # Transaction state
143             in_multi => 0,
144             watching => 0,
145              
146             # PubSub state
147             in_pubsub => 0,
148             _subscription => undef,
149             _pump_running => 0,
150              
151             # Fork safety
152             _pid => $$,
153              
154             # Script registry
155             _scripts => {},
156              
157             # Current read future for clean disconnect cancellation
158             _current_read_future => undef,
159              
160             # Telemetry options
161             debug => $args{debug},
162             otel_tracer => $args{otel_tracer},
163             otel_meter => $args{otel_meter},
164             otel_include_args => $args{otel_include_args} // 1,
165 84 100 50     6415 otel_redact => $args{otel_redact} // 1,
    100 100        
      100        
      50        
      50        
      50        
      50        
      50        
      50        
      100        
      50        
      50        
      50        
      50        
166             }, $class;
167              
168             # Initialize telemetry if any observability enabled
169 84 50 33     1032 if ($self->{debug} || $self->{otel_tracer} || $self->{otel_meter}) {
      33        
170             $self->{_telemetry} = Async::Redis::Telemetry->new(
171             tracer => $self->{otel_tracer},
172             meter => $self->{otel_meter},
173             debug => $self->{debug},
174             include_args => $self->{otel_include_args},
175             redact => $self->{otel_redact},
176             host => $self->{host},
177             port => $self->{port},
178 0   0     0 database => $self->{database} // 0,
179             );
180             }
181              
182 84         309 return $self;
183             }
184              
185             # Connect to Redis server
186 77     77 1 930 async sub connect {
187 77         165 my ($self) = @_;
188              
189 77 50       233 return $self if $self->{connected};
190              
191             # Create socket — AF_UNIX for path, AF_INET for host:port
192 77         163 my ($socket, $sockaddr);
193              
194 77 100       251 if ($self->{path}) {
195             socket($socket, AF_UNIX, SOCK_STREAM, 0)
196             or die Async::Redis::Error::Connection->new(
197             message => "Cannot create unix socket: $!",
198             host => $self->{path},
199 1 50       63 port => 0,
200             );
201 1         9 IO::Handle::blocking($socket, 0);
202 1         6 $sockaddr = pack_sockaddr_un($self->{path});
203             } else {
204             $socket = IO::Socket::INET->new(
205             Proto => 'tcp',
206             Blocking => 0,
207             ) or die Async::Redis::Error::Connection->new(
208             message => "Cannot create socket: $!",
209             host => $self->{host},
210             port => $self->{port},
211 76 50       832 );
212              
213             # Build sockaddr
214             my $addr = inet_aton($self->{host})
215             or die Async::Redis::Error::Connection->new(
216             message => "Cannot resolve host: $self->{host}",
217             host => $self->{host},
218             port => $self->{port},
219 76 50       55539 );
220 76         611 $sockaddr = pack_sockaddr_in($self->{port}, $addr);
221             }
222              
223             # Connect with timeout using Future->wait_any
224 77         799 my $connect_f = Future::IO->connect($socket, $sockaddr);
225 77         26091 my $sleep_f = Future::IO->sleep($self->{connect_timeout});
226              
227             my $timeout_f = $sleep_f->then(sub {
228 0     0   0 return Future->fail('connect_timeout');
229 77         7255 });
230              
231 77         5351 my $wait_f = Future->wait_any($connect_f, $timeout_f);
232              
233             # Use followed_by to handle both success and failure without await propagating failure
234             my $result_f = $wait_f->followed_by(sub {
235 77     77   63444 my ($f) = @_;
236 77         624 return Future->done($f); # wrap the future itself
237 77         9845 });
238              
239 77         4891 my $completed_f = await $result_f;
240              
241             # Now check the result
242 77 50       11024 if ($completed_f->is_failed) {
243 77         851 my ($error) = $completed_f->failure;
244             # Don't call close() - let $socket go out of scope when we die.
245             # Perl's DESTROY will close it after the exception unwinds.
246              
247 77 50       1062 if ($error eq 'connect_timeout') {
248             die Async::Redis::Error::Timeout->new(
249             message => "Connect timed out after $self->{connect_timeout}s",
250             timeout => $self->{connect_timeout},
251 0         0 );
252             }
253             die Async::Redis::Error::Connection->new(
254             message => "$error",
255             host => $self->{path} // $self->{host},
256 77   66     1838 port => $self->{port} // 0,
      100        
257             );
258             }
259              
260             # TLS upgrade if enabled
261 0 0       0 if ($self->{tls}) {
262 0         0 eval {
263 0         0 $socket = await $self->_tls_upgrade($socket);
264             };
265 0 0       0 if ($@) {
266             # Don't call close() - let $socket go out of scope when we die.
267             # Perl's DESTROY will close it after the exception unwinds.
268 0         0 die $@;
269             }
270             }
271              
272 0         0 $self->{socket} = $socket;
273 0         0 $self->{parser} = _parser_class()->new(api => 1);
274 0         0 $self->{connected} = 1;
275 0         0 $self->{inflight} = [];
276 0         0 $self->{_reading_responses} = 0;
277 0         0 $self->{_pid} = $$; # Track PID for fork safety
278 0         0 $self->{_current_read_future} = undef;
279              
280             # Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME)
281 0         0 await $self->_redis_handshake;
282              
283             # Initialize auto-pipeline if enabled
284 0 0       0 if ($self->{auto_pipeline}) {
285             $self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
286             redis => $self,
287             max_depth => $self->{pipeline_depth},
288 0         0 );
289             }
290              
291             # Fire on_connect callback and reset reconnect counter
292 0 0       0 if ($self->{on_connect}) {
293 0         0 $self->{on_connect}->($self);
294             }
295 0         0 $self->{_reconnect_attempt} = 0;
296              
297             # Telemetry: record connection
298 0 0       0 if ($self->{_telemetry}) {
299 0         0 $self->{_telemetry}->record_connection(1);
300             $self->{_telemetry}->log_event('connected',
301 0   0     0 $self->{path} // "$self->{host}:$self->{port}");
302             }
303              
304 0         0 return $self;
305             }
306              
307             # Redis protocol handshake after TCP connect
308 0     0   0 async sub _redis_handshake {
309 0         0 my ($self) = @_;
310              
311             # Use connect_timeout for the entire handshake (AUTH, SELECT, CLIENT SETNAME)
312             # This ensures the handshake can't block forever if Redis hangs
313 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
314              
315             # AUTH (password or username+password for ACL)
316 0 0       0 if ($self->{password}) {
317 0         0 my @auth_args = ('AUTH');
318 0 0       0 push @auth_args, $self->{username} if $self->{username};
319 0         0 push @auth_args, $self->{password};
320              
321 0         0 my $cmd = $self->_build_command(@auth_args);
322 0         0 await $self->_send($cmd);
323              
324 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['AUTH']);
325 0         0 my $result = $self->_decode_response($response);
326              
327             # AUTH returns OK on success, throws on failure
328 0 0 0     0 unless ($result && $result eq 'OK') {
329 0         0 die Async::Redis::Error::Redis->new(
330             message => "Authentication failed: $result",
331             type => 'NOAUTH',
332             );
333             }
334             }
335              
336             # SELECT database
337 0 0 0     0 if ($self->{database} && $self->{database} != 0) {
338 0         0 my $cmd = $self->_build_command('SELECT', $self->{database});
339 0         0 await $self->_send($cmd);
340              
341 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['SELECT', $self->{database}]);
342 0         0 my $result = $self->_decode_response($response);
343              
344 0 0 0     0 unless ($result && $result eq 'OK') {
345 0         0 die Async::Redis::Error::Redis->new(
346             message => "SELECT failed: $result",
347             type => 'ERR',
348             );
349             }
350             }
351              
352             # CLIENT SETNAME
353 0 0       0 if ($self->{client_name}) {
354 0         0 my $cmd = $self->_build_command('CLIENT', 'SETNAME', $self->{client_name});
355 0         0 await $self->_send($cmd);
356              
357 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['CLIENT', 'SETNAME']);
358             # Ignore result - SETNAME failing shouldn't prevent connection
359             }
360             }
361              
362             # Check if connected to Redis
363             sub is_connected {
364 0     0 0 0 my ($self) = @_;
365 0 0       0 return $self->{connected} ? 1 : 0;
366             }
367              
368             # Disconnect from Redis
369             sub disconnect {
370 0     0 1 0 my ($self, $reason) = @_;
371 0   0     0 $reason //= 'client_disconnect';
372              
373 0         0 my $was_connected = $self->{connected};
374              
375             # Notify any active pub/sub subscription that we're disconnecting
376             # cleanly, so its driver's on_fail on the in-flight read treats the
377             # EOF as expected rather than firing on_error / dying.
378 0 0 0     0 if ($self->{_subscription} && !$self->{_subscription}->is_closed) {
379 0         0 $self->{_subscription}->_close;
380             }
381              
382             # Cancel any active read future BEFORE closing socket
383             # This ensures Future::IO unregisters its watcher while fileno is still valid
384 0 0 0     0 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
385 0         0 $self->{_current_read_future}->cancel;
386 0         0 $self->{_current_read_future} = undef;
387             }
388              
389             # Cancel any pending inflight operations before closing socket
390 0 0       0 if (my $inflight = $self->{inflight}) {
391 0         0 for my $entry (@$inflight) {
392 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
393 0         0 $entry->{future}->cancel;
394             }
395             }
396 0         0 $self->{inflight} = [];
397             }
398              
399 0 0       0 if ($self->{socket}) {
400 0         0 $self->_close_socket;
401             }
402 0         0 $self->{connected} = 0;
403 0         0 $self->{parser} = undef;
404 0         0 $self->{_reading_responses} = 0;
405              
406 0 0 0     0 if ($was_connected && $self->{on_disconnect}) {
407 0         0 $self->{on_disconnect}->($self, $reason);
408             }
409              
410             # Telemetry: record disconnection
411 0 0 0     0 if ($was_connected && $self->{_telemetry}) {
412 0         0 $self->{_telemetry}->record_connection(-1);
413 0         0 $self->{_telemetry}->log_event('disconnected', $reason);
414             }
415              
416 0         0 return $self;
417             }
418              
419             # Destructor - clean up socket when object is garbage collected
420             sub DESTROY {
421 84     84   26251 my ($self) = @_;
422             # Only clean up if we have a socket and it's still open
423 84 50 33     889 if ($self->{socket} && fileno($self->{socket})) {
424 0         0 $self->_close_socket;
425             }
426             }
427              
428             # Properly close socket, canceling any pending futures first
429             sub _close_socket {
430 0     0   0 my ($self) = @_;
431              
432             # Take ownership - removes from $self immediately
433 0 0       0 my $socket = delete $self->{socket} or return;
434 0         0 my $fileno = fileno($socket);
435              
436             # Cancel any pending inflight futures - this propagates to
437             # Future::IO internals and cleans up any watchers on this socket.
438             # Important: must happen while fileno is still valid!
439 0 0       0 if (my $inflight = delete $self->{inflight}) {
440 0         0 for my $entry (@$inflight) {
441 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
442 0         0 $entry->{future}->cancel;
443             }
444             }
445             }
446 0         0 $self->{inflight} = [];
447              
448             # Initiate clean TCP shutdown (FIN) while fileno still valid
449 0 0       0 shutdown($socket, 2) if defined $fileno;
450              
451             # DON'T call close()!
452             # $socket falls out of scope here, Perl's DESTROY calls close().
453             # By this point, Future::IO has already unregistered its watchers
454             # via the cancel() calls above.
455             }
456              
457             # Check if fork occurred and invalidate connection
458             sub _check_fork {
459 0     0   0 my ($self) = @_;
460              
461 0 0 0     0 if ($self->{_pid} && $self->{_pid} != $$) {
462             # Fork detected - invalidate connection (parent owns the socket)
463             # Don't cancel futures - they belong to the parent's event loop
464             # Just clear references so we don't try to use them
465 0         0 $self->{connected} = 0;
466 0         0 $self->{socket} = undef;
467 0         0 $self->{parser} = undef;
468 0         0 $self->{inflight} = [];
469 0         0 $self->{_reading_responses} = 0;
470 0         0 $self->{_current_read_future} = undef; # Clear stale reference
471              
472 0         0 my $old_pid = $self->{_pid};
473 0         0 $self->{_pid} = $$;
474              
475 0 0       0 if ($self->{_telemetry}) {
476 0         0 $self->{_telemetry}->log_event('fork_detected', "old PID: $old_pid, new PID: $$");
477             }
478              
479 0         0 return 1; # Fork occurred
480             }
481              
482 0         0 return 0;
483             }
484              
485             # Build Redis command in RESP format
486             sub _build_command {
487 0     0   0 my ($self, @args) = @_;
488              
489 0         0 my $cmd = "*" . scalar(@args) . "\r\n";
490 0         0 for my $arg (@args) {
491 0   0     0 $arg //= '';
492 0         0 my $bytes = "$arg"; # stringify
493 0 0       0 utf8::encode($bytes) if utf8::is_utf8($bytes);
494 0         0 $cmd .= "\$" . length($bytes) . "\r\n" . $bytes . "\r\n";
495             }
496 0         0 return $cmd;
497             }
498              
499             # Send raw data
500 0     0   0 async sub _send {
501 0         0 my ($self, $data) = @_;
502 0         0 await Future::IO->write_exactly($self->{socket}, $data);
503 0         0 return length($data);
504             }
505              
506             # Add command to inflight queue - returns queue depth
507             sub _add_inflight {
508 0     0   0 my ($self, $future, $cmd, $args, $deadline) = @_;
509 0         0 push @{$self->{inflight}}, {
  0         0  
510             future => $future,
511             cmd => $cmd,
512             args => $args,
513             deadline => $deadline,
514             sent_at => Time::HiRes::time(),
515             };
516 0         0 return scalar @{$self->{inflight}};
  0         0  
517             }
518              
519             # Shift first entry from inflight queue
520             sub _shift_inflight {
521 0     0   0 my ($self) = @_;
522 0         0 return shift @{$self->{inflight}};
  0         0  
523             }
524              
525             # Fail all pending inflight futures with given error
526             sub _fail_all_inflight {
527 0     0   0 my ($self, $error) = @_;
528 0         0 while (my $entry = $self->_shift_inflight) {
529 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
530 0         0 $entry->{future}->fail($error);
531             }
532             }
533             }
534              
535             # Ensure response reader is running - the core response queue mechanism
536             # Only one reader should be active at a time, processing responses in FIFO order
537 0     0   0 async sub _ensure_response_reader {
538 0         0 my ($self) = @_;
539              
540             # Already reading - don't start another reader
541 0 0       0 return if $self->{_reading_responses};
542              
543 0         0 $self->{_reading_responses} = 1;
544              
545 0   0     0 while (@{$self->{inflight}} && $self->{connected}) {
  0         0  
546 0         0 my $entry = $self->{inflight}[0];
547              
548             # Read response with deadline from the entry
549 0         0 my $response;
550 0         0 my $read_ok = eval {
551             $response = await $self->_read_response_with_deadline(
552             $entry->{deadline},
553             $entry->{args}
554 0         0 );
555 0         0 1;
556             };
557              
558 0 0       0 if (!$read_ok) {
559 0         0 my $read_error = $@;
560             # Connection/timeout error - fail all inflight and abort
561 0         0 $self->_fail_all_inflight($read_error);
562 0         0 $self->{_reading_responses} = 0;
563 0         0 return;
564             }
565              
566             # Remove this entry from the queue now that we have its response
567 0         0 $self->_shift_inflight;
568              
569             # Decode response (sync operation, eval works fine here)
570 0         0 my $result;
571 0         0 my $decode_ok = eval {
572 0         0 $result = $self->_decode_response($response);
573 0         0 1;
574             };
575              
576             # Complete the future
577 0 0       0 if (!$decode_ok) {
578 0         0 my $decode_error = $@;
579             # Redis error (like WRONGTYPE) - fail just this future
580 0 0       0 $entry->{future}->fail($decode_error) unless $entry->{future}->is_ready;
581             } else {
582             # Success - complete the future with result
583 0 0       0 $entry->{future}->done($result) unless $entry->{future}->is_ready;
584             }
585             }
586              
587 0         0 $self->{_reading_responses} = 0;
588             }
589              
590             # Read and parse one response
591 0     0   0 async sub _read_response {
592 0         0 my ($self) = @_;
593              
594             # First check if parser already has a complete message
595             # (from previous read that contained multiple responses)
596 0 0       0 if (my $msg = $self->{parser}->get_message) {
597 0         0 return $msg;
598             }
599              
600             # Read until we get a complete message
601 0         0 while (1) {
602 0         0 my $buf = await Future::IO->read($self->{socket}, 65536);
603              
604             # EOF
605 0 0 0     0 if (!defined $buf || length($buf) == 0) {
606 0         0 die "Connection closed by server";
607             }
608              
609 0         0 $self->{parser}->parse($buf);
610              
611 0 0       0 if (my $msg = $self->{parser}->get_message) {
612 0         0 return $msg;
613             }
614             }
615             }
616              
617             # Calculate deadline based on command type
618             sub _calculate_deadline {
619 0     0   0 my ($self, $cmd, @args) = @_;
620              
621 0   0     0 $cmd = uc($cmd // '');
622              
623             # Blocking commands get extended deadline
624 0 0       0 if ($cmd =~ /^(BLPOP|BRPOP|BLMOVE|BRPOPLPUSH|BLMPOP|BZPOPMIN|BZPOPMAX|BZMPOP)$/) {
625             # Last arg is the timeout for these commands
626 0   0     0 my $server_timeout = $args[-1] // 0;
627 0         0 return Time::HiRes::time() + $server_timeout + $self->{blocking_timeout_buffer};
628             }
629              
630 0 0       0 if ($cmd =~ /^(XREAD|XREADGROUP)$/) {
631             # XREAD/XREADGROUP have BLOCK option
632 0         0 for my $i (0 .. $#args - 1) {
633 0 0       0 if (uc($args[$i]) eq 'BLOCK') {
634 0   0     0 my $block_ms = $args[$i + 1] // 0;
635 0         0 return Time::HiRes::time() + ($block_ms / 1000) + $self->{blocking_timeout_buffer};
636             }
637             }
638             }
639              
640             # Normal commands use request_timeout
641 0         0 return Time::HiRes::time() + $self->{request_timeout};
642             }
643              
644             # Non-blocking TLS upgrade
645 0     0   0 async sub _tls_upgrade {
646 0         0 my ($self, $socket) = @_;
647              
648 0         0 require IO::Socket::SSL;
649              
650             # Build SSL options
651 0         0 my %ssl_opts = (
652             SSL_startHandshake => 0, # Don't block during start_SSL!
653             );
654              
655 0 0       0 if (ref $self->{tls} eq 'HASH') {
656 0 0       0 $ssl_opts{SSL_ca_file} = $self->{tls}{ca_file} if $self->{tls}{ca_file};
657 0 0       0 $ssl_opts{SSL_cert_file} = $self->{tls}{cert_file} if $self->{tls}{cert_file};
658 0 0       0 $ssl_opts{SSL_key_file} = $self->{tls}{key_file} if $self->{tls}{key_file};
659              
660 0 0       0 if (exists $self->{tls}{verify}) {
661             $ssl_opts{SSL_verify_mode} = $self->{tls}{verify}
662 0 0       0 ? IO::Socket::SSL::SSL_VERIFY_PEER()
663             : IO::Socket::SSL::SSL_VERIFY_NONE();
664             } else {
665 0         0 $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
666             }
667             } else {
668 0         0 $ssl_opts{SSL_verify_mode} = IO::Socket::SSL::SSL_VERIFY_PEER();
669             }
670              
671             # Start SSL (does not block because SSL_startHandshake => 0)
672             IO::Socket::SSL->start_SSL($socket, %ssl_opts)
673             or die Async::Redis::Error::Connection->new(
674             message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
675             host => $self->{host},
676             port => $self->{port},
677 0 0       0 );
678              
679             # Drive handshake with non-blocking loop
680 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
681              
682 0         0 while (1) {
683             # Check timeout
684 0 0       0 if (Time::HiRes::time() >= $deadline) {
685             die Async::Redis::Error::Timeout->new(
686             message => "TLS handshake timed out",
687             timeout => $self->{connect_timeout},
688 0         0 );
689             }
690              
691             # Attempt handshake step
692 0         0 my $rv = $socket->connect_SSL;
693              
694 0 0       0 if ($rv) {
695             # Handshake complete!
696 0         0 return $socket;
697             }
698              
699             # Check what the handshake needs
700 0         0 my $remaining = $deadline - Time::HiRes::time();
701 0 0       0 $remaining = 0.1 if $remaining <= 0;
702              
703 0 0       0 if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
    0          
704             # Wait for socket to become readable with timeout
705 0         0 my $read_f = Future::IO->waitfor_readable($socket);
706             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
707 0     0   0 return Future->fail('tls_timeout');
708 0         0 });
709              
710 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
711 0         0 await $wait_f;
712              
713 0 0       0 if ($wait_f->is_failed) {
714             die Async::Redis::Error::Timeout->new(
715             message => "TLS handshake timed out",
716             timeout => $self->{connect_timeout},
717 0         0 );
718             }
719             }
720             elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
721             # Wait for socket to become writable with timeout
722 0         0 my $write_f = Future::IO->waitfor_writable($socket);
723             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
724 0     0   0 return Future->fail('tls_timeout');
725 0         0 });
726              
727 0         0 my $wait_f = Future->wait_any($write_f, $timeout_f);
728 0         0 await $wait_f;
729              
730 0 0       0 if ($wait_f->is_failed) {
731             die Async::Redis::Error::Timeout->new(
732             message => "TLS handshake timed out",
733             timeout => $self->{connect_timeout},
734 0         0 );
735             }
736             }
737             else {
738             # Actual error
739             die Async::Redis::Error::Connection->new(
740             message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
741             host => $self->{host},
742             port => $self->{port},
743 0         0 );
744             }
745             }
746             }
747              
748             # Reconnect with exponential backoff
749 0     0   0 async sub _reconnect {
750 0         0 my ($self) = @_;
751              
752 0         0 while (!$self->{connected}) {
753 0         0 $self->{_reconnect_attempt}++;
754 0         0 my $delay = $self->_calculate_backoff($self->{_reconnect_attempt});
755              
756 0         0 eval {
757 0         0 await $self->connect;
758             };
759              
760 0 0       0 if ($@) {
761 0         0 my $error = $@;
762              
763             # Fire on_error callback
764 0 0       0 if ($self->{on_error}) {
765 0         0 $self->{on_error}->($self, $error);
766             }
767              
768             # Wait before next attempt
769 0         0 await Future::IO->sleep($delay);
770             }
771             }
772             }
773              
774             # Reconnect and replay pubsub subscriptions
775 0     0   0 async sub _reconnect_pubsub {
776 0         0 my ($self) = @_;
777              
778             my $sub = $self->{_subscription}
779 0 0       0 or die Async::Redis::Error::Disconnected->new(
780             message => "No subscription to replay",
781             );
782              
783 0         0 my @replay = $sub->get_replay_commands;
784              
785             # Ensure connection state is fully cleaned up before reconnecting.
786             # _reset_connection may have already been called by _read_response,
787             # but if the socket was closed externally, we need to clean up
788             # stale IO watchers and state here. It is safe to call twice —
789             # the on_disconnect callback is guarded by $was_connected.
790 0         0 $self->_reset_connection('pubsub_reconnect');
791              
792 0         0 await $self->_reconnect;
793              
794             # Replay all subscription commands
795 0         0 for my $cmd (@replay) {
796 0         0 my ($command, @args) = @$cmd;
797              
798 0         0 await $self->_send_command($command, @args);
799              
800             # Read and discard subscription confirmations
801 0         0 for my $arg (@args) {
802 0         0 await $self->_read_pubsub_frame();
803             }
804             }
805              
806             # Re-enter pubsub mode
807 0         0 $self->{in_pubsub} = 1;
808             }
809              
810             # Execute a Redis command
811 0     0 1 0 async sub command {
812 0         0 my ($self, $cmd, @args) = @_;
813              
814             # Check for fork - invalidate connection if PID changed
815 0         0 $self->_check_fork;
816              
817             # Block regular commands on pubsub connection
818 0 0       0 if ($self->{in_pubsub}) {
819 0   0     0 my $ucmd = uc($cmd // '');
820 0 0       0 unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
821 0         0 die Async::Redis::Error::Protocol->new(
822             message => "Cannot execute '$cmd' on connection in PubSub mode",
823             );
824             }
825             }
826              
827             # Apply key prefixing if configured
828 0 0 0     0 if (defined $self->{prefix} && $self->{prefix} ne '') {
829             @args = Async::Redis::KeyExtractor::apply_prefix(
830 0         0 $self->{prefix}, $cmd, @args
831             );
832             }
833              
834             # Route through auto-pipeline if enabled
835 0 0       0 if ($self->{_auto_pipeline}) {
836 0         0 return await $self->{_auto_pipeline}->command($cmd, @args);
837             }
838              
839             # If disconnected and reconnect enabled, try to reconnect
840 0 0 0     0 if (!$self->{connected} && $self->{reconnect}) {
841 0         0 await $self->_reconnect;
842             }
843              
844             die Async::Redis::Error::Disconnected->new(
845             message => "Not connected",
846 0 0       0 ) unless $self->{connected};
847              
848             # Telemetry: start span and log send
849 0         0 my $span_context;
850 0         0 my $start_time = Time::HiRes::time();
851 0 0       0 if ($self->{_telemetry}) {
852 0         0 $span_context = $self->{_telemetry}->start_command_span($cmd, @args);
853 0         0 $self->{_telemetry}->log_send($cmd, @args);
854             }
855              
856 0         0 my $raw_cmd = $self->_build_command($cmd, @args);
857              
858             # Calculate deadline based on command type
859 0         0 my $deadline = $self->_calculate_deadline($cmd, @args);
860              
861             # Create response future and register in inflight queue BEFORE sending
862             # This ensures responses are matched in order
863 0         0 my $response_future = Future->new;
864 0         0 $self->_add_inflight($response_future, $cmd, \@args, $deadline);
865              
866 0         0 my $result;
867             my $error;
868              
869 0         0 my $send_ok = eval {
870             # Send command
871 0         0 await $self->_send($raw_cmd);
872 0         0 1;
873             };
874              
875 0 0       0 if (!$send_ok) {
876 0         0 $error = $@;
877             # Send failed - remove from inflight and fail
878 0         0 $self->_shift_inflight; # Remove the entry we just added
879 0 0       0 $response_future->fail($error) unless $response_future->is_ready;
880             } else {
881             # Trigger the response reader (fire and forget - it runs in background)
882 0         0 $self->_ensure_response_reader->retain;
883              
884             # Wait for our response future to be completed by the reader
885 0         0 my $await_ok = eval {
886 0         0 $result = await $response_future;
887 0         0 1;
888             };
889              
890 0 0       0 if (!$await_ok) {
891 0         0 $error = $@;
892             }
893             }
894              
895             # Telemetry: log result and end span
896 0 0       0 if ($self->{_telemetry}) {
897 0         0 my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
898 0 0       0 if ($error) {
899 0         0 $self->{_telemetry}->log_error($error);
900             }
901             else {
902 0         0 $self->{_telemetry}->log_recv($result, $elapsed_ms);
903             }
904 0         0 $self->{_telemetry}->end_command_span($span_context, $error);
905             }
906              
907 0 0       0 die $error if $error;
908 0         0 return $result;
909             }
910              
911             # Read response with deadline enforcement
912 0     0   0 async sub _read_response_with_deadline {
913 0         0 my ($self, $deadline, $cmd_ref) = @_;
914              
915             # First check if parser already has a complete message
916 0 0       0 if (my $msg = $self->{parser}->get_message) {
917 0         0 return $msg;
918             }
919              
920             # Read until we get a complete message
921 0         0 while (1) {
922 0         0 my $remaining = $deadline - Time::HiRes::time();
923              
924 0 0       0 if ($remaining <= 0) {
925 0         0 $self->_reset_connection;
926             die Async::Redis::Error::Timeout->new(
927             message => "Request timed out after $self->{request_timeout}s",
928             command => $cmd_ref,
929             timeout => $self->{request_timeout},
930 0         0 maybe_executed => 1, # already sent the command
931             );
932             }
933              
934             # Use wait_any for timeout
935 0         0 my $read_f = Future::IO->read($self->{socket}, 65536);
936              
937             # Store reference so disconnect() can cancel it
938 0         0 $self->{_current_read_future} = $read_f;
939              
940             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
941 0     0   0 return Future->fail('read_timeout');
942 0         0 });
943              
944 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
945 0         0 await $wait_f;
946              
947             # Clear stored reference after await completes
948 0         0 $self->{_current_read_future} = undef;
949              
950             # Check if read was cancelled (by disconnect)
951 0 0       0 if ($read_f->is_cancelled) {
952 0         0 die Async::Redis::Error::Disconnected->new(
953             message => "Disconnected during read",
954             );
955             }
956              
957 0 0       0 if ($wait_f->is_failed) {
958 0         0 my ($error) = $wait_f->failure;
959 0 0       0 if ($error eq 'read_timeout') {
960 0         0 $self->_reset_connection;
961             die Async::Redis::Error::Timeout->new(
962             message => "Request timed out after $self->{request_timeout}s",
963             command => $cmd_ref,
964             timeout => $self->{request_timeout},
965 0         0 maybe_executed => 1,
966             );
967             }
968 0         0 $self->_reset_connection;
969 0         0 die Async::Redis::Error::Connection->new(
970             message => "$error",
971             );
972             }
973              
974             # Get the read result
975 0         0 my $buf = $wait_f->get;
976              
977             # EOF
978 0 0 0     0 if (!defined $buf || length($buf) == 0) {
979 0         0 $self->_reset_connection;
980 0         0 die Async::Redis::Error::Connection->new(
981             message => "Connection closed by server",
982             );
983             }
984              
985 0         0 $self->{parser}->parse($buf);
986              
987 0 0       0 if (my $msg = $self->{parser}->get_message) {
988 0         0 return $msg;
989             }
990             }
991             }
992              
993             # Reset connection after timeout (stream is desynced)
994             sub _reset_connection {
995 1     1   11 my ($self, $reason) = @_;
996 1   50     4 $reason //= 'timeout';
997              
998 1         3 my $was_connected = $self->{connected};
999              
1000             # Cancel any active read future BEFORE closing socket
1001             # This ensures Future::IO unregisters its watcher while fileno is still valid
1002 1 50 33     4 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
1003 0         0 $self->{_current_read_future}->cancel;
1004 0         0 $self->{_current_read_future} = undef;
1005             }
1006              
1007             # Cancel any pending inflight operations before closing socket
1008 1 50       3 if (my $inflight = $self->{inflight}) {
1009 1         3 for my $entry (@$inflight) {
1010 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
1011 0         0 $entry->{future}->cancel;
1012             }
1013             }
1014 1         2 $self->{inflight} = [];
1015             }
1016              
1017 1 50       4 if ($self->{socket}) {
1018 0         0 $self->_close_socket;
1019             }
1020              
1021 1         2 $self->{connected} = 0;
1022 1         2 $self->{parser} = undef;
1023 1         2 $self->{_reading_responses} = 0;
1024 1         1 $self->{in_pubsub} = 0;
1025              
1026 1 50 33     6 if ($was_connected && $self->{on_disconnect}) {
1027 0         0 $self->{on_disconnect}->($self, $reason);
1028             }
1029             }
1030              
1031             # Decode Protocol::Redis response to Perl value
1032             sub _decode_response {
1033 0     0   0 my ($self, $msg) = @_;
1034              
1035 0 0       0 return undef unless $msg;
1036              
1037 0         0 my $type = $msg->{type};
1038 0         0 my $data = $msg->{data};
1039              
1040             # Simple string (+)
1041 0 0       0 if ($type eq '+') {
    0          
    0          
    0          
    0          
1042 0         0 return $data;
1043             }
1044             # Error (-)
1045             elsif ($type eq '-') {
1046 0         0 die Async::Redis::Error::Redis->from_message($data);
1047             }
1048             # Integer (:)
1049             elsif ($type eq ':') {
1050 0         0 return 0 + $data;
1051             }
1052             # Bulk string ($)
1053             elsif ($type eq '$') {
1054 0         0 return $data; # undef for null bulk
1055             }
1056             # Array (*)
1057             elsif ($type eq '*') {
1058 0 0       0 return undef unless defined $data; # null array
1059 0         0 return [ map { $self->_decode_response($_) } @$data ];
  0         0  
1060             }
1061              
1062 0         0 return $data;
1063             }
1064              
1065             # ============================================================================
1066             # Convenience Commands
1067             # ============================================================================
1068              
1069 0     0 0 0 async sub ping {
1070 0         0 my ($self) = @_;
1071 0         0 return await $self->command('PING');
1072             }
1073              
1074 0     0 0 0 async sub set {
1075 0         0 my ($self, $key, $value, %opts) = @_;
1076 0         0 my @cmd = ('SET', $key, $value);
1077 0 0       0 push @cmd, 'EX', $opts{ex} if exists $opts{ex};
1078 0 0       0 push @cmd, 'PX', $opts{px} if exists $opts{px};
1079 0 0       0 push @cmd, 'NX' if $opts{nx};
1080 0 0       0 push @cmd, 'XX' if $opts{xx};
1081 0         0 return await $self->command(@cmd);
1082             }
1083              
1084 0     0 0 0 async sub get {
1085 0         0 my ($self, $key) = @_;
1086 0         0 return await $self->command('GET', $key);
1087             }
1088              
1089 0     0 0 0 async sub del {
1090 0         0 my ($self, @keys) = @_;
1091 0         0 return await $self->command('DEL', @keys);
1092             }
1093              
1094 0     0 0 0 async sub incr {
1095 0         0 my ($self, $key) = @_;
1096 0         0 return await $self->command('INCR', $key);
1097             }
1098              
1099 0     0 0 0 async sub lpush {
1100 0         0 my ($self, $key, @values) = @_;
1101 0         0 return await $self->command('LPUSH', $key, @values);
1102             }
1103              
1104 0     0 0 0 async sub rpush {
1105 0         0 my ($self, $key, @values) = @_;
1106 0         0 return await $self->command('RPUSH', $key, @values);
1107             }
1108              
1109 0     0 0 0 async sub lpop {
1110 0         0 my ($self, $key) = @_;
1111 0         0 return await $self->command('LPOP', $key);
1112             }
1113              
1114 0     0 0 0 async sub lrange {
1115 0         0 my ($self, $key, $start, $stop) = @_;
1116 0         0 return await $self->command('LRANGE', $key, $start, $stop);
1117             }
1118              
1119 0     0 1 0 async sub keys {
1120 0         0 my ($self, $pattern) = @_;
1121 0         0 return await $self->command('KEYS', $pattern // '*');
1122             }
1123              
1124 0     0 0 0 async sub flushdb {
1125 0         0 my ($self) = @_;
1126 0         0 return await $self->command('FLUSHDB');
1127             }
1128              
1129             # ============================================================================
1130             # Lua Scripting
1131             # ============================================================================
1132              
1133 0     0 0 0 async sub script_load {
1134 0         0 my ($self, $script) = @_;
1135 0         0 return await $self->command('SCRIPT', 'LOAD', $script);
1136             }
1137              
1138 0     0 0 0 async sub script_exists {
1139 0         0 my ($self, @shas) = @_;
1140 0         0 return await $self->command('SCRIPT', 'EXISTS', @shas);
1141             }
1142              
1143 0     0 0 0 async sub script_flush {
1144 0         0 my ($self, $mode) = @_;
1145 0         0 my @args = ('SCRIPT', 'FLUSH');
1146 0 0       0 push @args, $mode if $mode; # ASYNC or SYNC
1147 0         0 return await $self->command(@args);
1148             }
1149              
1150 0     0 0 0 async sub script_kill {
1151 0         0 my ($self) = @_;
1152 0         0 return await $self->command('SCRIPT', 'KILL');
1153             }
1154              
1155 0     0 0 0 async sub evalsha_or_eval {
1156 0         0 my ($self, $sha, $script, $numkeys, @keys_and_args) = @_;
1157              
1158             # Try EVALSHA first
1159 0         0 my $result;
1160 0         0 eval {
1161 0         0 $result = await $self->evalsha($sha, $numkeys, @keys_and_args);
1162             };
1163              
1164 0 0       0 if ($@) {
1165 0         0 my $error = $@;
1166              
1167             # Check if it's a NOSCRIPT error
1168 0 0       0 if ("$error" =~ /NOSCRIPT/i) {
1169             # Fall back to EVAL (which also loads the script)
1170 0         0 $result = await $self->eval($script, $numkeys, @keys_and_args);
1171             }
1172             else {
1173             # Re-throw other errors
1174 0         0 die $error;
1175             }
1176             }
1177              
1178 0         0 return $result;
1179             }
1180              
1181             sub script {
1182 0     0 1 0 my ($self, $code) = @_;
1183 0         0 return Async::Redis::Script->new(
1184             redis => $self,
1185             script => $code,
1186             );
1187             }
1188              
1189             # Define a named script command
1190             # Usage: $redis->define_command(name => { keys => N, lua => '...' })
1191             sub define_command {
1192 0     0 1 0 my ($self, $name, $def) = @_;
1193              
1194 0 0 0     0 die "Command name required" unless defined $name && length $name;
1195 0 0       0 die "Command definition required" unless ref $def eq 'HASH';
1196 0 0       0 die "Lua script required (lua => '...')" unless defined $def->{lua};
1197              
1198             # Validate name (alphanumeric and underscore only)
1199 0 0       0 die "Invalid command name '$name' - use only alphanumeric and underscore"
1200             unless $name =~ /^[a-zA-Z_][a-zA-Z0-9_]*$/;
1201              
1202             my $script = Async::Redis::Script->new(
1203             redis => $self,
1204             script => $def->{lua},
1205             name => $name,
1206             num_keys => $def->{keys} // 'dynamic',
1207             description => $def->{description},
1208 0   0     0 );
1209              
1210 0         0 $self->{_scripts}{$name} = $script;
1211              
1212             # Optional: install as method on this instance
1213 0 0       0 if ($def->{install}) {
1214 0         0 $self->_install_script_method($name);
1215             }
1216              
1217 0         0 return $script;
1218             }
1219              
1220             # Run a registered script by name
1221             # Usage: $redis->run_script('name', @keys_then_args)
1222             # If num_keys is 'dynamic', first arg is the key count
1223 0     0 1 0 async sub run_script {
1224 0         0 my ($self, $name, @args) = @_;
1225              
1226 0 0       0 my $script = $self->{_scripts}{$name}
1227             or die "Unknown script: '$name' - use define_command() first";
1228              
1229 0         0 my $num_keys = $script->num_keys;
1230              
1231             # Handle dynamic key count
1232 0 0       0 if ($num_keys eq 'dynamic') {
1233 0         0 $num_keys = shift @args;
1234 0 0       0 die "Key count required as first argument for dynamic script '$name'"
1235             unless defined $num_keys;
1236             }
1237              
1238             # Split args into keys and argv
1239 0         0 my @keys = splice(@args, 0, $num_keys);
1240 0         0 return await $script->run(\@keys, \@args);
1241             }
1242              
1243             # Get a registered script by name
1244             sub get_script {
1245 0     0 1 0 my ($self, $name) = @_;
1246 0         0 return $self->{_scripts}{$name};
1247             }
1248              
1249             # List all registered script names
1250             sub list_scripts {
1251 0     0 1 0 my ($self) = @_;
1252 0         0 return CORE::keys %{$self->{_scripts}};
  0         0  
1253             }
1254              
1255             # Preload all registered scripts to Redis
1256             # Useful before pipeline execution
1257 0     0 1 0 async sub preload_scripts {
1258 0         0 my ($self) = @_;
1259              
1260 0         0 my @names = $self->list_scripts;
1261 0 0       0 return 0 unless @names;
1262              
1263 0         0 for my $name (@names) {
1264 0         0 my $script = $self->{_scripts}{$name};
1265 0         0 await $self->script_load($script->script);
1266             }
1267              
1268 0         0 return scalar @names;
1269             }
1270              
1271             # Install a script as a method (internal)
1272             sub _install_script_method {
1273 0     0   0 my ($self, $name) = @_;
1274              
1275             # Create closure that captures $name
1276             my $method = sub {
1277 0     0   0 my ($self, @args) = @_;
1278 0         0 return $self->run_script($name, @args);
1279 0         0 };
1280              
1281             # Install on the class (affects all instances)
1282 75     75   840 no strict 'refs';
  75         134  
  75         3509  
1283 75     75   384 no warnings 'redefine';
  75         135  
  75         261261  
1284 0         0 *{"Async::Redis::$name"} = $method;
  0         0  
1285             }
1286              
1287             # ============================================================================
1288             # SCAN Iterators
1289             # ============================================================================
1290              
1291             sub scan_iter {
1292 0     0 1 0 my ($self, %opts) = @_;
1293             return Async::Redis::Iterator->new(
1294             redis => $self,
1295             command => 'SCAN',
1296             match => $opts{match},
1297             count => $opts{count},
1298             type => $opts{type},
1299 0         0 );
1300             }
1301              
1302             sub hscan_iter {
1303 0     0 0 0 my ($self, $key, %opts) = @_;
1304             return Async::Redis::Iterator->new(
1305             redis => $self,
1306             command => 'HSCAN',
1307             key => $key,
1308             match => $opts{match},
1309             count => $opts{count},
1310 0         0 );
1311             }
1312              
1313             sub sscan_iter {
1314 0     0 0 0 my ($self, $key, %opts) = @_;
1315             return Async::Redis::Iterator->new(
1316             redis => $self,
1317             command => 'SSCAN',
1318             key => $key,
1319             match => $opts{match},
1320             count => $opts{count},
1321 0         0 );
1322             }
1323              
1324             sub zscan_iter {
1325 0     0 0 0 my ($self, $key, %opts) = @_;
1326             return Async::Redis::Iterator->new(
1327             redis => $self,
1328             command => 'ZSCAN',
1329             key => $key,
1330             match => $opts{match},
1331             count => $opts{count},
1332 0         0 );
1333             }
1334              
1335             # ============================================================================
1336             # Transactions
1337             # ============================================================================
1338              
1339 0     0 1 0 async sub multi {
1340 0         0 my ($self, $callback) = @_;
1341              
1342             # Prevent nested multi() calls
1343             die "Cannot nest multi() calls - already in a transaction"
1344 0 0       0 if $self->{in_multi};
1345              
1346             # Mark that we're collecting transaction commands
1347 0         0 $self->{in_multi} = 1;
1348              
1349 0         0 my @commands;
1350 0         0 eval {
1351             # Create transaction collector
1352 0         0 my $tx = Async::Redis::Transaction->new(redis => $self);
1353              
1354             # Run callback to collect commands
1355 0         0 await $callback->($tx);
1356              
1357 0         0 @commands = $tx->commands;
1358             };
1359 0         0 my $collect_error = $@;
1360              
1361 0 0       0 if ($collect_error) {
1362 0         0 $self->{in_multi} = 0;
1363 0         0 die $collect_error;
1364             }
1365              
1366             # If no commands queued, return empty result
1367 0 0       0 unless (@commands) {
1368 0         0 $self->{in_multi} = 0;
1369 0         0 return [];
1370             }
1371              
1372             # Execute transaction (in_multi already set)
1373 0         0 return await $self->_execute_transaction(\@commands);
1374             }
1375              
1376 0     0   0 async sub _execute_transaction {
1377 0         0 my ($self, $commands) = @_;
1378              
1379             # in_multi should already be set by caller
1380              
1381 0         0 my $results;
1382 0         0 eval {
1383             # Send MULTI
1384 0         0 await $self->command('MULTI');
1385              
1386             # Queue all commands (they return +QUEUED)
1387 0         0 for my $cmd (@$commands) {
1388 0         0 await $self->command(@$cmd);
1389             }
1390              
1391             # Execute and get results
1392 0         0 $results = await $self->command('EXEC');
1393             };
1394 0         0 my $error = $@;
1395              
1396             # Always clear transaction state
1397 0         0 $self->{in_multi} = 0;
1398              
1399 0 0       0 if ($error) {
1400             # Try to clean up
1401 0         0 eval { await $self->command('DISCARD') };
  0         0  
1402 0         0 die $error;
1403             }
1404              
1405 0         0 return $results;
1406             }
1407              
1408             # Accessor for pool cleanliness tracking
1409 0     0 0 0 sub in_multi { shift->{in_multi} }
1410 0     0 0 0 sub watching { shift->{watching} }
1411 0     0 0 0 sub in_pubsub { shift->{in_pubsub} }
1412 0   0 0 0 0 sub inflight_count { scalar @{shift->{inflight} // []} }
  0         0  
1413              
1414             # Is connection dirty (unsafe to reuse)?
1415             sub is_dirty {
1416 0     0 0 0 my ($self) = @_;
1417              
1418 0 0       0 return 1 if $self->{in_multi};
1419 0 0       0 return 1 if $self->{watching};
1420 0 0       0 return 1 if $self->{in_pubsub};
1421 0 0 0     0 return 1 if @{$self->{inflight} // []} > 0;
  0         0  
1422              
1423 0         0 return 0;
1424             }
1425              
1426 0     0 1 0 async sub watch {
1427 0         0 my ($self, @keys) = @_;
1428 0         0 $self->{watching} = 1;
1429 0         0 return await $self->command('WATCH', @keys);
1430             }
1431              
1432 0     0 0 0 async sub unwatch {
1433 0         0 my ($self) = @_;
1434 0         0 my $result = await $self->command('UNWATCH');
1435 0         0 $self->{watching} = 0;
1436 0         0 return $result;
1437             }
1438              
1439 0     0 0 0 async sub multi_start {
1440 0         0 my ($self) = @_;
1441 0         0 $self->{in_multi} = 1;
1442 0         0 return await $self->command('MULTI');
1443             }
1444              
1445 0     0 0 0 async sub exec {
1446 0         0 my ($self) = @_;
1447 0         0 my $result = await $self->command('EXEC');
1448 0         0 $self->{in_multi} = 0;
1449 0         0 $self->{watching} = 0; # EXEC clears watches
1450 0         0 return $result;
1451             }
1452              
1453 0     0 0 0 async sub discard {
1454 0         0 my ($self) = @_;
1455 0         0 my $result = await $self->command('DISCARD');
1456 0         0 $self->{in_multi} = 0;
1457             # Note: DISCARD does NOT clear watches
1458 0         0 return $result;
1459             }
1460              
1461 0     0 1 0 async sub watch_multi {
1462 0         0 my ($self, $keys, $callback) = @_;
1463              
1464             # WATCH the keys
1465 0         0 await $self->watch(@$keys);
1466              
1467             # Get current values of watched keys
1468 0         0 my %watched;
1469 0         0 for my $key (@$keys) {
1470 0         0 $watched{$key} = await $self->get($key);
1471             }
1472              
1473             # Create transaction collector
1474 0         0 my $tx = Async::Redis::Transaction->new(redis => $self);
1475              
1476             # Run callback with watched values
1477 0         0 await $callback->($tx, \%watched);
1478              
1479 0         0 my @commands = $tx->commands;
1480              
1481             # If no commands queued, just unwatch and return empty
1482 0 0       0 unless (@commands) {
1483 0         0 await $self->unwatch;
1484 0         0 return [];
1485             }
1486              
1487             # Execute transaction
1488 0         0 $self->{in_multi} = 1;
1489              
1490 0         0 my $results;
1491 0         0 eval {
1492 0         0 await $self->command('MULTI');
1493              
1494 0         0 for my $cmd (@commands) {
1495 0         0 await $self->command(@$cmd);
1496             }
1497              
1498 0         0 $results = await $self->command('EXEC');
1499             };
1500 0         0 my $error = $@;
1501              
1502 0         0 $self->{in_multi} = 0;
1503 0         0 $self->{watching} = 0;
1504              
1505 0 0       0 if ($error) {
1506 0         0 eval { await $self->command('DISCARD') };
  0         0  
1507 0         0 die $error;
1508             }
1509              
1510             # EXEC returns undef/nil if WATCH failed
1511 0         0 return $results;
1512             }
1513              
1514             # ============================================================================
1515             # PUB/SUB
1516             # ============================================================================
1517              
1518             # Wait for inflight commands to complete before mode change
1519 0     0   0 async sub _wait_for_inflight_drain {
1520 0         0 my ($self, $timeout) = @_;
1521 0   0     0 $timeout //= 30;
1522              
1523 0 0       0 return unless @{$self->{inflight}};
  0         0  
1524              
1525 0         0 my $deadline = Time::HiRes::time() + $timeout;
1526              
1527 0   0     0 while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
  0         0  
1528 0         0 await Future::IO->sleep(0.001);
1529             }
1530              
1531 0 0       0 if (@{$self->{inflight}}) {
  0         0  
1532 0         0 $self->_fail_all_inflight("Timeout waiting for inflight commands");
1533             }
1534             }
1535              
1536 0     0 0 0 async sub publish {
1537 0         0 my ($self, $channel, $message) = @_;
1538 0         0 return await $self->command('PUBLISH', $channel, $message);
1539             }
1540              
1541 0     0 0 0 async sub spublish {
1542 0         0 my ($self, $channel, $message) = @_;
1543 0         0 return await $self->command('SPUBLISH', $channel, $message);
1544             }
1545              
1546             # Subscribe to channels - returns a Subscription object
1547 0     0 1 0 async sub subscribe {
1548 0         0 my ($self, @channels) = @_;
1549              
1550             die Async::Redis::Error::Disconnected->new(
1551             message => "Not connected",
1552 0 0       0 ) unless $self->{connected};
1553              
1554             # Wait for pending commands before entering PubSub mode
1555 0         0 await $self->_wait_for_inflight_drain;
1556              
1557             # Create or reuse subscription
1558 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1559              
1560             # Send SUBSCRIBE command
1561 0         0 await $self->_send_command('SUBSCRIBE', @channels);
1562              
1563             # Read subscription confirmations
1564 0         0 for my $ch (@channels) {
1565 0         0 my $msg = await $self->_read_pubsub_frame();
1566             # Response: ['subscribe', $channel, $count]
1567 0         0 $sub->_add_channel($ch);
1568             }
1569              
1570 0         0 $self->{in_pubsub} = 1;
1571              
1572 0         0 return $sub;
1573             }
1574              
1575             # Pattern subscribe
1576 0     0 1 0 async sub psubscribe {
1577 0         0 my ($self, @patterns) = @_;
1578              
1579             die Async::Redis::Error::Disconnected->new(
1580             message => "Not connected",
1581 0 0       0 ) unless $self->{connected};
1582              
1583             # Wait for pending commands before entering PubSub mode
1584 0         0 await $self->_wait_for_inflight_drain;
1585              
1586 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1587              
1588 0         0 await $self->_send_command('PSUBSCRIBE', @patterns);
1589              
1590 0         0 for my $p (@patterns) {
1591 0         0 my $msg = await $self->_read_pubsub_frame();
1592 0         0 $sub->_add_pattern($p);
1593             }
1594              
1595 0         0 $self->{in_pubsub} = 1;
1596              
1597 0         0 return $sub;
1598             }
1599              
1600             # Sharded subscribe (Redis 7+)
1601 0     0 0 0 async sub ssubscribe {
1602 0         0 my ($self, @channels) = @_;
1603              
1604             die Async::Redis::Error::Disconnected->new(
1605             message => "Not connected",
1606 0 0       0 ) unless $self->{connected};
1607              
1608             # Wait for pending commands before entering PubSub mode
1609 0         0 await $self->_wait_for_inflight_drain;
1610              
1611 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
1612              
1613 0         0 await $self->_send_command('SSUBSCRIBE', @channels);
1614              
1615 0         0 for my $ch (@channels) {
1616 0         0 my $msg = await $self->_read_pubsub_frame();
1617 0         0 $sub->_add_sharded_channel($ch);
1618             }
1619              
1620 0         0 $self->{in_pubsub} = 1;
1621              
1622 0         0 return $sub;
1623             }
1624              
1625             # Read pubsub frame (subscription confirmation or message)
1626 1     1   2 async sub _read_pubsub_frame {
1627 1         2 my ($self) = @_;
1628              
1629             die Async::Redis::Error::Disconnected->new(
1630             message => "Not connected",
1631 1 50       20 ) unless $self->{connected};
1632              
1633 0           my $msg = await $self->_read_response();
1634 0           return $self->_decode_response($msg);
1635             }
1636              
1637             # Send command without reading response (for pubsub)
1638 0     0     async sub _send_command {
1639 0           my ($self, @args) = @_;
1640              
1641 0           my $cmd = $self->_build_command(@args);
1642 0           await $self->_send($cmd);
1643             }
1644              
1645             # Read next pubsub message (blocking) - for compatibility
1646 0     0     async sub _read_pubsub_message {
1647 0           my ($self) = @_;
1648              
1649 0           my $msg = await $self->_read_response();
1650              
1651             # Message format: ['message', $channel, $payload]
1652             # or: ['pmessage', $pattern, $channel, $payload]
1653 0           return $msg;
1654             }
1655              
1656             # ============================================================================
1657             # Pipelining
1658             # ============================================================================
1659              
1660             sub pipeline {
1661 0     0 1   my ($self, %opts) = @_;
1662             return Async::Redis::Pipeline->new(
1663             redis => $self,
1664             max_depth => $opts{max_depth} // $self->{pipeline_depth},
1665 0   0       );
1666             }
1667              
1668             # Execute multiple commands, return all responses
1669 0     0     async sub _execute_pipeline {
1670 0           my ($self, $commands) = @_;
1671              
1672 0 0         die "Not connected" unless $self->{connected};
1673              
1674 0 0         return [] unless @$commands;
1675              
1676             # Wait for any inflight regular commands to complete before pipeline
1677             # This prevents interleaving pipeline responses with regular command responses
1678 0           await $self->_wait_for_inflight_drain;
1679              
1680             # Take over reading - prevent response reader from running
1681 0           $self->{_reading_responses} = 1;
1682              
1683 0           my $start_time = Time::HiRes::time();
1684 0           my @responses;
1685 0           my $count = scalar @$commands;
1686              
1687 0           my $ok = eval {
1688             # Send all commands
1689 0           my $data = '';
1690 0           for my $cmd (@$commands) {
1691 0           $data .= $self->_build_command(@$cmd);
1692             }
1693 0           await $self->_send($data);
1694              
1695             # Read all responses, capturing per-slot Redis errors
1696 0           for my $i (1 .. $count) {
1697 0           my $msg = await $self->_read_response();
1698              
1699             # Capture Redis errors inline rather than dying
1700 0           my $result;
1701 0           eval {
1702 0           $result = $self->_decode_response($msg);
1703             };
1704 0 0         if ($@) {
1705             # Capture the error object inline in the results
1706 0           $result = $@;
1707             }
1708 0           push @responses, $result;
1709             }
1710 0           1;
1711             };
1712              
1713 0           my $error = $@;
1714              
1715             # Release reading lock
1716 0           $self->{_reading_responses} = 0;
1717              
1718 0 0         if (!$ok) {
1719 0           die $error;
1720             }
1721              
1722             # Telemetry: record pipeline metrics
1723 0 0         if ($self->{_telemetry}) {
1724 0           my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
1725 0           $self->{_telemetry}->record_pipeline($count, $elapsed_ms);
1726             }
1727              
1728 0           return \@responses;
1729             }
1730              
1731             1;
1732              
1733             __END__