File Coverage

blib/lib/Async/Redis.pm
Criterion Covered Total %
statement 348 1033 33.6
branch 117 428 27.3
condition 66 225 29.3
subroutine 59 132 44.7
pod 36 49 73.4
total 626 1867 33.5


line stmt bran cond sub pod time code
1             package Async::Redis;
2              
3 91     91   20237489 use strict;
  91         167  
  91         3317  
4 91     91   421 use warnings;
  91         140  
  91         3998  
5 91     91   1301 use 5.018;
  91         261  
6              
7             our $VERSION = '0.002000';
8              
9 91     91   2247 use Future;
  91         38489  
  91         2219  
10 91     91   1856 use Future::AsyncAwait;
  91         12465  
  91         576  
11 91     91   7143 use Future::IO 0.23;
  91         201870  
  91         3766  
12 91     91   34804 use Future::Selector 0.05;
  91         759627  
  91         5381  
13 91     91   608 use Scalar::Util qw(blessed weaken);
  91         176  
  91         5040  
14 91     91   425 use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
  91         119  
  91         4588  
15 91     91   376 use IO::Handle ();
  91         104  
  91         1283  
16 91     91   37108 use IO::Socket::INET;
  91         849689  
  91         613  
17 91     91   34410 use Time::HiRes ();
  91         137  
  91         1852  
18              
19             # Error classes
20 91     91   36500 use Async::Redis::Error::Connection;
  91         206  
  91         2920  
21 91     91   32144 use Async::Redis::Error::Timeout;
  91         242  
  91         2781  
22 91     91   31626 use Async::Redis::Error::Disconnected;
  91         224  
  91         2537  
23 91     91   33804 use Async::Redis::Error::Redis;
  91         238  
  91         3619  
24 91     91   31756 use Async::Redis::Error::Protocol;
  91         236  
  91         2803  
25              
26             # Import auto-generated command methods
27 91     91   43070 use Async::Redis::Commands;
  91         304  
  91         6402  
28             our @ISA = qw(Async::Redis::Commands);
29              
30             # Key extraction for prefixing
31 91     91   42656 use Async::Redis::KeyExtractor;
  91         264  
  91         4961  
32              
33             # Transaction support
34 91     91   35950 use Async::Redis::Transaction;
  91         254  
  91         3772  
35              
36             # Script support
37 91     91   34854 use Async::Redis::Script;
  91         219  
  91         4780  
38              
39             # Iterator support
40 91     91   33452 use Async::Redis::Iterator;
  91         268  
  91         4581  
41              
42             # Pipeline support
43 91     91   34174 use Async::Redis::Pipeline;
  91         237  
  91         5114  
44 91     91   36940 use Async::Redis::AutoPipeline;
  91         235  
  91         5436  
45              
46             # PubSub support
47 91     91   43030 use Async::Redis::Subscription;
  91         289  
  91         9149  
48              
49             # Telemetry support
50 91     91   39080 use Async::Redis::Telemetry;
  91         625  
  91         9546  
51              
52             # Try XS version first, fall back to pure Perl
53             BEGIN {
54 91 50   91   428 eval { require Protocol::Redis::XS; 1 }
  91         33601  
  91         2138233  
55             or require Protocol::Redis;
56             }
57              
58             sub _parser_class {
59 0 0   0   0 return $INC{'Protocol/Redis/XS.pm'} ? 'Protocol::Redis::XS' : 'Protocol::Redis';
60             }
61              
62             sub _calculate_backoff {
63 3     3   7 my ($self, $attempt) = @_;
64              
65             # Exponential: delay * 2^(attempt-1)
66 3         8 my $delay = $self->{reconnect_delay} * (2 ** ($attempt - 1));
67              
68             # Cap at max
69 3 100       7 $delay = $self->{reconnect_delay_max} if $delay > $self->{reconnect_delay_max};
70              
71             # Apply jitter: delay * (1 +/- jitter)
72 3 50       6 if ($self->{reconnect_jitter} > 0) {
73 0         0 my $jitter_range = $delay * $self->{reconnect_jitter};
74 0         0 my $jitter = (rand(2) - 1) * $jitter_range;
75 0         0 $delay += $jitter;
76             }
77              
78 3         6 return $delay;
79             }
80              
81             # Free function, not a method. Call as _await_with_deadline($f, $deadline).
82             # Race a read future against a deadline. Returns a Future resolving to
83             # ($read_future, $timed_out_bool). The caller inspects $timed_out and
84             # $read_future->is_failed explicitly; we never throw from here.
85             #
86             # On timeout win: $read_future is left pending. _reader_fatal is the sole
87             # owner of its cancellation (it must happen before _close_socket so
88             # Future::IO unregisters while fileno is still valid).
89             # On read win: the internal timeout timer is cancelled here for hygiene.
90             sub _await_with_deadline {
91 5     5   232242 my ($read_f, $deadline) = @_;
92              
93 5 100       15 if (!defined $deadline) {
94 1     1   10 return $read_f->followed_by(sub { Future->done($read_f, 0) });
  1         169  
95             }
96              
97 4         9 my $remaining = $deadline - Time::HiRes::time();
98 4 100       10 if ($remaining <= 0) {
99 1         5 return Future->done($read_f, 1);
100             }
101              
102             my $timeout_f = Future::IO->sleep($remaining)
103 3     1   36 ->then(sub { Future->fail('__deadline__') });
  1         50350  
104              
105             # Use without_cancel so that if timeout wins, wait_any's cancel of the
106             # losing future does not propagate to $read_f (caller owns its lifecycle).
107             return Future->wait_any($read_f->without_cancel, $timeout_f)
108             ->followed_by(sub {
109 3     3   1331 my ($f) = @_;
110 3 100 66     10 my $timed_out = $f->is_failed
111             && (($f->failure)[0] // '') eq '__deadline__' ? 1 : 0;
112              
113 3 50 66     49 if (!$timed_out && !$timeout_f->is_ready) {
114 0         0 $timeout_f->cancel;
115             }
116              
117 3         32 return Future->done($read_f, $timed_out);
118 3         712 });
119             }
120              
121             sub new {
122 109     109 1 4862875 my ($class, %args) = @_;
123              
124             # Parse URI if provided
125 109 100       471 if ($args{uri}) {
126 2         489 require Async::Redis::URI;
127 2         13 my $uri = Async::Redis::URI->parse($args{uri});
128 2 50       8 if ($uri) {
129 2         5 my %uri_args = $uri->to_hash;
130             # URI values are defaults, explicit args override
131 2         5 %args = (%uri_args, %args);
132 2         8 delete $args{uri}; # don't store the string
133             }
134             }
135              
136             my $self = bless {
137             path => $args{path},
138             host => $args{path} ? undef : ($args{host} // 'localhost'),
139             port => $args{path} ? undef : ($args{port} // 6379),
140             socket => undef,
141             parser => undef,
142             connected => 0,
143             _socket_live => 0,
144             _fatal_in_progress => 0,
145             _reader_running => 0, # dedup guard; the selector owns the reader Future itself
146             _write_lock => undef, # will be a Future used as a lock, populated lazily
147             _reconnect_future => undef,
148             _tasks => Future::Selector->new,
149              
150             # Timeout settings
151             connect_timeout => $args{connect_timeout} // 10,
152             request_timeout => $args{request_timeout} // 5,
153             blocking_timeout_buffer => $args{blocking_timeout_buffer} // 2,
154              
155             # Inflight tracking with deadlines
156             # Entry: { future => $f, cmd => $cmd, args => \@args, deadline => $t, sent_at => $t }
157             inflight => [],
158              
159             # Reconnection settings
160             reconnect => $args{reconnect} // 0,
161             reconnect_delay => $args{reconnect_delay} // 0.1,
162             reconnect_delay_max => $args{reconnect_delay_max} // 60,
163             reconnect_jitter => $args{reconnect_jitter} // 0.25,
164             reconnect_max_attempts => $args{reconnect_max_attempts} // 10, # 0 = unlimited
165             _reconnect_attempt => 0,
166              
167             # Callbacks
168             on_connect => $args{on_connect},
169             on_disconnect => $args{on_disconnect},
170             on_error => $args{on_error},
171              
172             # Authentication
173             password => $args{password},
174             username => $args{username},
175             database => $args{database} // 0,
176             client_name => $args{client_name},
177              
178             # TLS
179             tls => $args{tls},
180              
181             # Key prefixing
182             prefix => $args{prefix},
183              
184             # Pipeline settings
185             pipeline_depth => $args{pipeline_depth} // 10000,
186             auto_pipeline => $args{auto_pipeline} // 0,
187              
188             # Backpressure: max queued messages before _dispatch_frame's slot wait blocks.
189             message_queue_depth => do {
190 109   100     4382 my $d = $args{message_queue_depth} // 1;
191 109 100       323 die "message_queue_depth must be >= 1 (got $d)" if $d < 1;
192 107         3701 $d;
193             },
194              
195             # Transaction state
196             in_multi => 0,
197             watching => 0,
198              
199             # PubSub state
200             in_pubsub => 0,
201             _subscription => undef,
202             _pump_running => 0,
203              
204             # Fork safety
205             _pid => $$,
206              
207             # Script registry
208             _scripts => {},
209              
210             # Current read future for clean disconnect cancellation
211             _current_read_future => undef,
212              
213             # Telemetry options
214             debug => $args{debug},
215             otel_tracer => $args{otel_tracer},
216             otel_meter => $args{otel_meter},
217             otel_include_args => $args{otel_include_args} // 0,
218 109 100 100     2497 otel_redact => $args{otel_redact} // 1,
    100 100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      50        
      50        
      50        
      50        
219             }, $class;
220              
221             # Initialize telemetry if any observability enabled
222 107 50 33     1259 if ($self->{debug} || $self->{otel_tracer} || $self->{otel_meter}) {
      33        
223             $self->{_telemetry} = Async::Redis::Telemetry->new(
224             tracer => $self->{otel_tracer},
225             meter => $self->{otel_meter},
226             debug => $self->{debug},
227             include_args => $self->{otel_include_args},
228             redact => $self->{otel_redact},
229             host => $self->{host},
230             port => $self->{port},
231 0   0     0 database => $self->{database} // 0,
232             );
233             }
234              
235 107         409 return $self;
236             }
237              
238             # Connect to Redis server
239 78     78 1 955 async sub connect {
240 78         150 my ($self) = @_;
241              
242 78 50       225 return $self if $self->{connected};
243              
244             # Create socket — AF_UNIX for path, AF_INET for host:port
245 78         136 my ($socket, $sockaddr);
246              
247 78 100       221 if ($self->{path}) {
248             socket($socket, AF_UNIX, SOCK_STREAM, 0)
249             or die Async::Redis::Error::Connection->new(
250             message => "Cannot create unix socket: $!",
251             host => $self->{path},
252 1 50       77 port => 0,
253             );
254 1         9 IO::Handle::blocking($socket, 0);
255 1         6 $sockaddr = pack_sockaddr_un($self->{path});
256             } else {
257             $socket = IO::Socket::INET->new(
258             Proto => 'tcp',
259             Blocking => 0,
260             ) or die Async::Redis::Error::Connection->new(
261             message => "Cannot create socket: $!",
262             host => $self->{host},
263             port => $self->{port},
264 77 50       778 );
265              
266             # Build sockaddr
267             my $addr = inet_aton($self->{host})
268             or die Async::Redis::Error::Connection->new(
269             message => "Cannot resolve host: $self->{host}",
270             host => $self->{host},
271             port => $self->{port},
272 77 50       58596 );
273 77         607 $sockaddr = pack_sockaddr_in($self->{port}, $addr);
274             }
275              
276             # Connect with timeout using Future->wait_any
277 78         847 my $connect_f = Future::IO->connect($socket, $sockaddr);
278 78         27125 my $sleep_f = Future::IO->sleep($self->{connect_timeout});
279              
280             my $timeout_f = $sleep_f->then(sub {
281 0     0   0 return Future->fail('connect_timeout');
282 78         7252 });
283              
284 78         5288 my $wait_f = Future->wait_any($connect_f, $timeout_f);
285              
286             # Use followed_by to handle both success and failure without await propagating failure
287             my $result_f = $wait_f->followed_by(sub {
288 78     78   62156 my ($f) = @_;
289 78         623 return Future->done($f); # wrap the future itself
290 78         9917 });
291              
292 78         4730 my $completed_f = await $result_f;
293              
294             # Now check the result
295 78 50       9980 if ($completed_f->is_failed) {
296 78         830 my ($error) = $completed_f->failure;
297             # Don't call close() - let $socket go out of scope when we die.
298             # Perl's DESTROY will close it after the exception unwinds.
299              
300 78 50       1006 if ($error eq 'connect_timeout') {
301             die Async::Redis::Error::Timeout->new(
302             message => "Connect timed out after $self->{connect_timeout}s",
303             timeout => $self->{connect_timeout},
304 0         0 );
305             }
306             die Async::Redis::Error::Connection->new(
307             message => "$error",
308             host => $self->{path} // $self->{host},
309 78   66     1472 port => $self->{port} // 0,
      100        
310             );
311             }
312              
313             # TLS upgrade if enabled
314 0 0       0 if ($self->{tls}) {
315 0         0 eval {
316 0         0 $socket = await $self->_tls_upgrade($socket);
317             };
318 0 0       0 if ($@) {
319             # Don't call close() - let $socket go out of scope when we die.
320             # Perl's DESTROY will close it after the exception unwinds.
321 0         0 die $@;
322             }
323             }
324              
325 0         0 $self->{socket} = $socket;
326 0         0 $self->{parser} = _parser_class()->new(api => 1);
327 0         0 $self->{_socket_live} = 1; # write gate and reader can now submit
328 0         0 $self->{inflight} = [];
329 0         0 $self->{_pid} = $$; # Track PID for fork safety
330 0         0 $self->{_current_read_future} = undef;
331              
332             # Run Redis protocol handshake (AUTH, SELECT, CLIENT SETNAME).
333             # connected stays 0 during handshake; set it only on success so
334             # callers never see a half-initialised object.
335 0         0 my $handshake_ok = eval { await $self->_redis_handshake; 1 };
  0         0  
  0         0  
336 0 0       0 unless ($handshake_ok) {
337 0         0 my $err = $@;
338 0         0 $self->_reset_connection('handshake_failure');
339 0         0 die $err;
340             }
341              
342 0         0 $self->{connected} = 1;
343              
344             # Initialize auto-pipeline if enabled
345 0 0       0 if ($self->{auto_pipeline}) {
346             $self->{_auto_pipeline} = Async::Redis::AutoPipeline->new(
347             redis => $self,
348             max_depth => $self->{pipeline_depth},
349 0         0 );
350             }
351              
352             # Fire on_connect callback and reset reconnect counter
353 0 0       0 if ($self->{on_connect}) {
354 0         0 $self->{on_connect}->($self);
355             }
356 0         0 $self->{_reconnect_attempt} = 0;
357              
358             # Telemetry: record connection
359 0 0       0 if ($self->{_telemetry}) {
360 0         0 $self->{_telemetry}->record_connection(1);
361             $self->{_telemetry}->log_event('connected',
362 0   0     0 $self->{path} // "$self->{host}:$self->{port}");
363             }
364              
365 0         0 return $self;
366             }
367              
368             # Redis protocol handshake after TCP connect
369 0     0   0 async sub _redis_handshake {
370 0         0 my ($self) = @_;
371              
372             # Use connect_timeout for the entire handshake (AUTH, SELECT, CLIENT SETNAME)
373             # This ensures the handshake can't block forever if Redis hangs
374 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
375              
376             # AUTH (password or username+password for ACL)
377 0 0       0 if (defined $self->{password}) {
378 0         0 my @auth_args = ('AUTH');
379 0 0       0 push @auth_args, $self->{username} if defined $self->{username};
380 0         0 push @auth_args, $self->{password};
381              
382 0         0 my $cmd = $self->_build_command(@auth_args);
383 0         0 await $self->_send($cmd);
384              
385 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['AUTH']);
386 0         0 my $result = $self->_decode_response($response);
387              
388             # AUTH returns OK on success, throws on failure
389 0 0 0     0 unless ($result && $result eq 'OK') {
390 0         0 die Async::Redis::Error::Redis->new(
391             message => "Authentication failed: $result",
392             type => 'NOAUTH',
393             );
394             }
395             }
396              
397             # SELECT database
398 0 0 0     0 if ($self->{database} && $self->{database} != 0) {
399 0         0 my $cmd = $self->_build_command('SELECT', $self->{database});
400 0         0 await $self->_send($cmd);
401              
402 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['SELECT', $self->{database}]);
403 0         0 my $result = $self->_decode_response($response);
404              
405 0 0 0     0 unless ($result && $result eq 'OK') {
406 0         0 die Async::Redis::Error::Redis->new(
407             message => "SELECT failed: $result",
408             type => 'ERR',
409             );
410             }
411             }
412              
413             # CLIENT SETNAME
414 0 0 0     0 if (defined $self->{client_name} && length $self->{client_name}) {
415 0         0 my $cmd = $self->_build_command('CLIENT', 'SETNAME', $self->{client_name});
416 0         0 await $self->_send($cmd);
417              
418 0         0 my $response = await $self->_read_response_with_deadline($deadline, ['CLIENT', 'SETNAME']);
419             # Ignore result - SETNAME failing shouldn't prevent connection
420             }
421             }
422              
423             # Check if connected to Redis
424             sub is_connected {
425 0     0 1 0 my ($self) = @_;
426 0 0       0 return $self->{connected} ? 1 : 0;
427             }
428              
429             # Disconnect from Redis — user-initiated path.
430             #
431             # Distinct from _reader_fatal (which handles stream-level failure): this
432             # path is deterministic for user context. Key differences:
433             # - Inflight futures fail with Async::Redis::Error::Disconnected
434             # ("Client disconnect") rather than Connection ("Connection closed
435             # by peer"). Callers can distinguish "I disconnected" from "the
436             # server/network dropped me."
437             # - Subscription gets _close (clean; iterator next() returns undef,
438             # callback driver exits cleanly) rather than _fail_fatal.
439             # - No reconnect handoff — disconnect means stay down.
440             #
441             # Relationship to the selector (_tasks): this method does explicit
442             # teardown rather than relying on _reader_fatal propagation. Any tasks
443             # still in the selector (e.g., in-flight reconnect, autopipeline submit)
444             # will see their underlying I/O fail when the socket is closed and
445             # unwind via their existing on_fail handlers. Cancelling them explicitly
446             # would be cleaner but requires Future::Selector API that doesn't yet
447             # exist; this is acceptable because the failing I/O is a deterministic
448             # wakeup.
449             sub disconnect {
450 0     0 1 0 my ($self) = @_;
451 0 0 0     0 return $self unless $self->{_socket_live} || $self->{connected};
452              
453 0         0 my $was_connected = $self->{connected};
454              
455             # Close subscription cleanly before socket close so the pubsub branch
456             # in any subsequent _reader_fatal (triggered by the failing read)
457             # sees _closed and no-ops on _fail_fatal.
458 0 0       0 if (my $sub = $self->{_subscription}) {
459 0 0       0 $sub->_close unless $sub->is_closed;
460             }
461              
462             # Detach inflight + auto-pipeline queue before socket close so
463             # _close_socket doesn't cancel them — we will fail them explicitly
464             # with a user-context error type.
465 0         0 my $detached_inflight = $self->{inflight};
466 0         0 $self->{inflight} = [];
467 0         0 my $detached_autopipe = [];
468 0 0       0 if (my $ap = $self->{_auto_pipeline}) {
469 0         0 $detached_autopipe = $ap->_detach_queued;
470             }
471              
472             # Cancel current read BEFORE closing socket so Future::IO can
473             # unregister its watcher while fileno is still valid.
474 0 0 0     0 if ($self->{_current_read_future}
475             && !$self->{_current_read_future}->is_ready) {
476 0         0 $self->{_current_read_future}->cancel;
477             }
478 0         0 $self->{_current_read_future} = undef;
479              
480             # Cancel any in-flight reconnect task so it doesn't re-establish
481             # state (connecting a new socket, setting _socket_live=1) after the
482             # user has intentionally disconnected.
483 0 0       0 if (my $rf = delete $self->{_reconnect_future}) {
484 0 0       0 $rf->cancel unless $rf->is_ready;
485             }
486              
487 0         0 my $err = Async::Redis::Error::Disconnected->new(
488             message => "Client disconnect",
489             );
490              
491             # Wake the write-gate chain. Any commands waiting in
492             # _acquire_write_lock's `await $prev` unwind via their await
493             # throwing, and their _execute_command eval rethrows Disconnected
494             # to the caller. Without this, write-gate waiters stay suspended
495             # because the normal release path only runs when the current
496             # holder's body completes.
497 0 0       0 if (my $lock = delete $self->{_write_lock}) {
498 0 0       0 $lock->fail($err) unless $lock->is_ready;
499             }
500              
501 0 0       0 $self->_close_socket if $self->{socket};
502 0         0 $self->{_socket_live} = 0;
503 0         0 $self->{_fatal_in_progress} = 0;
504 0         0 $self->{_reader_running} = 0;
505 0         0 $self->{connected} = 0;
506 0         0 $self->{parser} = undef;
507 0         0 $self->{in_pubsub} = 0;
508              
509             # Fail detached futures with the user-context type so callers can
510             # distinguish "I disconnected" from Connection/EOF.
511 0         0 for my $entry (@$detached_inflight, @$detached_autopipe) {
512 0 0       0 next if $entry->{future}->is_ready;
513 0         0 $entry->{future}->fail($err);
514             }
515              
516             # on_disconnect + telemetry fire only if we were publicly connected,
517             # mirroring _reader_fatal's guard so a failed initial handshake
518             # doesn't spuriously emit these.
519 0 0 0     0 if ($was_connected && $self->{on_disconnect}) {
520 0         0 $self->{on_disconnect}->($self, 'client disconnect');
521             }
522 0 0 0     0 if ($was_connected && $self->{_telemetry}) {
523 0         0 $self->{_telemetry}->record_connection(-1);
524 0         0 $self->{_telemetry}->log_event('disconnected', 'client disconnect');
525             }
526              
527 0         0 return $self;
528             }
529              
530             # Destructor - clean up socket when object is garbage collected
531             sub DESTROY {
532 104     104   38632 my ($self) = @_;
533             # Only clean up if we have a socket and it's still open
534 104 50 33     2133 if ($self->{socket} && fileno($self->{socket})) {
535 0         0 $self->_close_socket;
536             }
537             }
538              
539             # Properly close socket, canceling any pending futures first
540             sub _close_socket {
541 0     0   0 my ($self) = @_;
542              
543             # Take ownership - removes from $self immediately
544 0 0       0 my $socket = delete $self->{socket} or return;
545 0         0 my $fileno = fileno($socket);
546              
547             # Cancel any pending inflight futures - this propagates to
548             # Future::IO internals and cleans up any watchers on this socket.
549             # Important: must happen while fileno is still valid!
550 0 0       0 if (my $inflight = delete $self->{inflight}) {
551 0         0 for my $entry (@$inflight) {
552 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
553 0         0 $entry->{future}->cancel;
554             }
555             }
556             }
557 0         0 $self->{inflight} = [];
558              
559             # Initiate clean TCP shutdown (FIN) while fileno still valid
560 0 0       0 shutdown($socket, 2) if defined $fileno;
561              
562             # DON'T call close()!
563             # $socket falls out of scope here, Perl's DESTROY calls close().
564             # By this point, Future::IO has already unregistered its watchers
565             # via the cancel() calls above.
566             }
567              
568             # Check if fork occurred and invalidate connection
569             sub _check_fork {
570 2     2   4 my ($self) = @_;
571              
572 2 50 33     12 if ($self->{_pid} && $self->{_pid} != $$) {
573             # Fork detected - invalidate connection (parent owns the socket)
574             # Don't cancel futures - they belong to the parent's event loop
575             # Just clear references so we don't try to use them
576 0         0 $self->{connected} = 0;
577 0         0 $self->{_socket_live} = 0;
578 0         0 $self->{socket} = undef;
579 0         0 $self->{parser} = undef;
580 0         0 $self->{inflight} = [];
581 0         0 $self->{_current_read_future} = undef; # Clear stale reference
582              
583 0         0 my $old_pid = $self->{_pid};
584 0         0 $self->{_pid} = $$;
585              
586 0 0       0 if ($self->{_telemetry}) {
587 0         0 $self->{_telemetry}->log_event('fork_detected', "old PID: $old_pid, new PID: $$");
588             }
589              
590 0         0 return 1; # Fork occurred
591             }
592              
593 2         4 return 0;
594             }
595              
596             # Build Redis command in RESP format
597             sub _build_command {
598 2     2   5 my ($self, @args) = @_;
599              
600 2         4 my $cmd = "*" . scalar(@args) . "\r\n";
601 2         5 for my $arg (@args) {
602 3   50     7 $arg //= '';
603 3         6 my $bytes = "$arg"; # stringify
604 3 50       9 utf8::encode($bytes) if utf8::is_utf8($bytes);
605 3         10 $cmd .= "\$" . length($bytes) . "\r\n" . $bytes . "\r\n";
606             }
607 2         7 return $cmd;
608             }
609              
610             # Send raw data
611 0     0   0 async sub _send {
612 0         0 my ($self, $data) = @_;
613 0         0 await Future::IO->write_exactly($self->{socket}, $data);
614 0         0 return length($data);
615             }
616              
617             # Add command to inflight queue - returns queue depth.
618             # redis_error_policy: 'fail' (default) fails the future on -ERR frames;
619             # 'capture' calls ->done($err_obj) so callers can inspect per-slot errors
620             # (used by pipelining in Task N+).
621             sub _add_inflight {
622 0     0   0 my ($self, $future, $cmd, $args, $deadline, $redis_error_policy) = @_;
623 0   0     0 push @{$self->{inflight}}, {
  0         0  
624             future => $future,
625             cmd => $cmd,
626             args => $args,
627             deadline => $deadline,
628             redis_error => $redis_error_policy // 'fail',
629             sent_at => Time::HiRes::time(),
630             };
631 0         0 return scalar @{$self->{inflight}};
  0         0  
632             }
633              
634             # Shift first entry from inflight queue
635             sub _shift_inflight {
636 0     0   0 my ($self) = @_;
637 0         0 return shift @{$self->{inflight}};
  0         0  
638             }
639              
640             # Fail all pending inflight futures with given error
641             sub _fail_all_inflight {
642 0     0   0 my ($self, $error) = @_;
643 0         0 while (my $entry = $self->_shift_inflight) {
644 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
645 0         0 $entry->{future}->fail($error);
646             }
647             }
648             }
649              
650             # The single socket reader. Runs while there is work (inflight or pubsub).
651             # Calls _reader_fatal on any stream-alignment failure. The selector
652             # (_tasks) owns this task; _reader_running is cleared on every exit path
653             # via on_ready so _ensure_reader can restart it on the next submission.
654 0     0   0 async sub _run_reader {
655 0         0 my ($self) = @_;
656              
657 0         0 while (1) {
658             # Exit conditions.
659 0 0       0 return unless $self->{_socket_live};
660 0 0 0     0 last if !$self->{in_pubsub} && !@{$self->{inflight}};
  0         0  
661 0 0 0     0 last if $self->{in_pubsub} && !$self->{_subscription};
662              
663 0         0 my $head = $self->{inflight}[0];
664 0 0       0 my $deadline = $head ? $head->{deadline} : undef;
665              
666             # Set up read future; track so _reader_fatal can cancel it.
667 0         0 my $read_f = Future::IO->read($self->{socket}, 65536);
668 0         0 $self->{_current_read_future} = $read_f;
669              
670 0         0 my ($returned_f, $timed_out) = await _await_with_deadline($read_f, $deadline);
671              
672             # Clear slot on success path; fatal clears it on timeout/cancel.
673             $self->{_current_read_future} = undef
674 0 0 0     0 if !$timed_out && $returned_f->is_ready && !$returned_f->is_failed;
      0        
675              
676 0 0       0 if ($timed_out) {
677             my $err = Async::Redis::Error::Timeout->new(
678             message => "Request timed out",
679             command => $head ? $head->{args} : undef,
680             timeout => $self->{request_timeout},
681 0 0       0 maybe_executed => 1,
682             );
683 0         0 $self->_reader_fatal($err);
684 0         0 return;
685             }
686              
687 0 0       0 if ($returned_f->is_failed) {
688 0         0 my ($rerr) = $returned_f->failure;
689             my $err = Async::Redis::Error::Connection->new(
690             message => "Connection read error: $rerr",
691             host => $self->{host},
692             port => $self->{port},
693 0         0 );
694 0         0 $self->_reader_fatal($err);
695 0         0 return;
696             }
697              
698 0         0 my $buf = $returned_f->get;
699 0 0 0     0 if (!defined $buf || length($buf) == 0) {
700             my $err = Async::Redis::Error::Connection->new(
701             message => "Connection closed by peer",
702             host => $self->{host},
703             port => $self->{port},
704 0         0 );
705 0         0 $self->_reader_fatal($err);
706 0         0 return;
707             }
708              
709 0         0 $self->{parser}->parse($buf);
710              
711             # Drain all complete messages the parser has.
712 0         0 while (my $msg = $self->{parser}->get_message) {
713 0         0 my ($kind, $value) = $self->_decode_response_result($msg);
714              
715 0 0       0 if ($kind eq 'protocol_error') {
716 0         0 $self->_reader_fatal($value);
717 0         0 return;
718             }
719              
720 0         0 my $is_pubsub_message = 0;
721 0 0 0     0 if ($self->{in_pubsub} && $kind eq 'ok' && ref($value) eq 'ARRAY') {
      0        
722 0   0     0 my $frame_name = $value->[0] // '';
723 0 0 0     0 $is_pubsub_message = 1
      0        
724             if $frame_name eq 'message'
725             || $frame_name eq 'pmessage'
726             || $frame_name eq 'smessage';
727             }
728              
729 0 0       0 if ($is_pubsub_message) {
730 0         0 my $sub = $self->{_subscription};
731 0 0       0 if (!$sub) {
732             # No active subscription but got a message frame: strict desync.
733 0         0 $self->_reader_fatal(
734             Async::Redis::Error::Protocol->new(
735             message => "message frame but no active subscription",
736             )
737             );
738 0         0 return;
739             }
740             # _dispatch_frame is sync today (returns undef) and will become
741             # async in Task 15 (returning a Future for backpressure). Await
742             # only if we got a Future back.
743 0         0 my $dispatch_result = $sub->_dispatch_frame($value);
744 0 0 0     0 if (blessed($dispatch_result) && $dispatch_result->isa('Future')) {
745 0         0 await $dispatch_result;
746             }
747 0         0 next;
748             }
749              
750 0 0       0 if (!@{$self->{inflight}}) {
  0         0  
751             # Strict: unexpected frame with empty inflight = desync.
752 0         0 $self->_reader_fatal(
753             Async::Redis::Error::Protocol->new(
754             message => "unexpected frame (kind=$kind) with empty inflight",
755             )
756             );
757 0         0 return;
758             }
759              
760 0         0 my $entry = shift @{$self->{inflight}};
  0         0  
761 0 0       0 if ($kind eq 'redis_error') {
762 0 0 0     0 if (($entry->{redis_error} // 'fail') eq 'capture') {
763 0 0       0 $entry->{future}->done($value) unless $entry->{future}->is_ready;
764             } else {
765 0 0       0 $entry->{future}->fail($value) unless $entry->{future}->is_ready;
766             }
767             } else {
768 0 0       0 $entry->{future}->done($value) unless $entry->{future}->is_ready;
769             }
770             }
771             }
772             }
773              
774             # Start the reader if not already running. Idempotent.
775             #
776             # Ownership: the reader Future lives in $self->{_tasks} (a Future::Selector).
777             # The selector holds the strong reference and auto-removes the item on
778             # completion. $self->{_reader_running} is a boolean dedup guard — it's NOT
779             # a second source of truth about ownership, only a "is one already running?"
780             # flag that's cheap to check without peeking at selector internals.
781             #
782             # Failure propagation: because the reader is in the selector, any awaiting
783             # caller using $self->{_tasks}->run_until_ready($their_future) will see the
784             # reader's failure propagated to them. That's the structured-concurrency
785             # guarantee — no hanging callers when the reader dies unhandled.
786             sub _ensure_reader {
787 0     0   0 my ($self) = @_;
788 0 0       0 return if $self->{_reader_running};
789 0         0 $self->{_reader_running} = 1;
790 0         0 my $f = $self->_run_reader;
791 0     0   0 $f->on_ready(sub { $self->{_reader_running} = 0 });
  0         0  
792 0         0 $self->{_tasks}->add(data => 'reader', f => $f);
793 0         0 return;
794             }
795              
796             # Read and parse one response
797 0     0   0 async sub _read_response {
798 0         0 my ($self) = @_;
799              
800             # First check if parser already has a complete message
801             # (from previous read that contained multiple responses)
802 0 0       0 if (my $msg = $self->{parser}->get_message) {
803 0         0 return $msg;
804             }
805              
806             # Read until we get a complete message
807 0         0 while (1) {
808 0         0 my $buf = await Future::IO->read($self->{socket}, 65536);
809              
810             # EOF
811 0 0 0     0 if (!defined $buf || length($buf) == 0) {
812 0         0 die "Connection closed by server";
813             }
814              
815 0         0 $self->{parser}->parse($buf);
816              
817 0 0       0 if (my $msg = $self->{parser}->get_message) {
818 0         0 return $msg;
819             }
820             }
821             }
822              
823             # Dispatch table mapping each blocking command to how its timeout is encoded.
824             # position: 'last' => final argument (seconds)
825             # position: N (integer) => argument at index N (seconds, unless unit=>'ms')
826             # position: 'block_option' => scan for BLOCK keyword, next arg is timeout (ms)
827             # unit: 'ms' => divide raw value by 1000 to get seconds
828             # A timeout of zero means "block indefinitely" — no client-side deadline.
829             my %BLOCKING_TIMEOUT = (
830             BLPOP => { position => 'last' },
831             BRPOP => { position => 'last' },
832             BRPOPLPUSH => { position => 'last' },
833             BLMOVE => { position => 'last' },
834             BZPOPMIN => { position => 'last' },
835             BZPOPMAX => { position => 'last' },
836             BLMPOP => { position => 0 },
837             BZMPOP => { position => 0 },
838             XREAD => { position => 'block_option', unit => 'ms' },
839             XREADGROUP => { position => 'block_option', unit => 'ms' },
840             WAIT => { position => 'last', unit => 'ms' },
841             WAITAOF => { position => 'last', unit => 'ms' },
842             );
843              
844             # Calculate deadline based on command type
845             sub _calculate_deadline {
846 14     14   19141 my ($self, $cmd, @args) = @_;
847 14   50     115 $cmd = uc($cmd // '');
848              
849 14         26 my $spec = $BLOCKING_TIMEOUT{$cmd};
850 14 100       29 if (!$spec) {
851 3         14 return Time::HiRes::time() + $self->{request_timeout};
852             }
853              
854 11         12 my $raw;
855 11         17 my $pos = $spec->{position};
856              
857 11 100       24 if ($pos eq 'last') {
    100          
858 5         36 $raw = $args[-1];
859             }
860             elsif ($pos eq 'block_option') {
861 3         9 for my $i (0 .. $#args - 1) {
862 4 100 50     11 if (uc($args[$i] // '') eq 'BLOCK') {
863 2         3 $raw = $args[$i + 1];
864 2         3 last;
865             }
866             }
867             # No BLOCK option found — non-blocking variant; use request_timeout
868             return Time::HiRes::time() + $self->{request_timeout}
869 3 100       10 unless defined $raw;
870             }
871             else {
872             # Numeric index into @args
873 3         4 $raw = $args[$pos];
874             }
875              
876 10 100 66     81 if (!defined $raw || $raw !~ /^-?\d+(?:\.\d+)?$/) {
877 1         12 warn "_calculate_deadline: non-numeric timeout for $cmd; falling back to request_timeout\n";
878 1         8 return Time::HiRes::time() + $self->{request_timeout};
879             }
880              
881 9 100 100     31 my $seconds = ($spec->{unit} // 'seconds') eq 'ms'
882             ? $raw / 1000
883             : $raw + 0;
884              
885             # Zero means block indefinitely — no client-side deadline
886 9 100       22 return undef if $seconds == 0;
887              
888 6         26 return Time::HiRes::time() + $seconds + $self->{blocking_timeout_buffer};
889             }
890              
891             sub _ssl_verify_peer {
892 7     7   1674 require IO::Socket::SSL;
893 7         92543 return IO::Socket::SSL::SSL_VERIFY_PEER();
894             }
895              
896             sub _ssl_verify_none {
897 2     2   9 require IO::Socket::SSL;
898 2         7 return IO::Socket::SSL::SSL_VERIFY_NONE();
899             }
900              
901             # Build the IO::Socket::SSL option hash for the current connection.
902             # Handles chain verification, SNI, hostname identity checking, and
903             # client cert/key/CA forwarding. Called by _tls_upgrade and directly
904             # by unit tests.
905             sub _build_tls_options {
906 5     5   21 my ($self) = @_;
907 5         11 my %ssl_opts = (SSL_startHandshake => 0);
908              
909 5         6 my $tls = $self->{tls};
910 5 100       10 my $tls_hash = ref $tls eq 'HASH' ? $tls : {};
911              
912 5 100       11 my $verify = exists $tls_hash->{verify} ? !!$tls_hash->{verify} : 1;
913 5 100       8 my $verify_hostname = exists $tls_hash->{verify_hostname} ? !!$tls_hash->{verify_hostname} : 1;
914              
915 5 100       14 $ssl_opts{SSL_ca_file} = $tls_hash->{ca_file} if $tls_hash->{ca_file};
916 5 100       7 $ssl_opts{SSL_cert_file} = $tls_hash->{cert_file} if $tls_hash->{cert_file};
917 5 100       12 $ssl_opts{SSL_key_file} = $tls_hash->{key_file} if $tls_hash->{key_file};
918              
919 5 100       8 if ($verify) {
920 4         11 $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_peer;
921 4         24 $ssl_opts{SSL_hostname} = $self->{host};
922              
923 4 100       11 if ($verify_hostname) {
924 3         5 $ssl_opts{SSL_verifycn_name} = $self->{host};
925 3         9 $ssl_opts{SSL_verifycn_scheme} = 'default';
926             }
927             } else {
928 1         3 $ssl_opts{SSL_verify_mode} = $self->_ssl_verify_none;
929             }
930              
931 5         29 return %ssl_opts;
932             }
933              
934             # Non-blocking TLS upgrade
935 0     0   0 async sub _tls_upgrade {
936 0         0 my ($self, $socket) = @_;
937              
938 0         0 require IO::Socket::SSL;
939              
940 0         0 my %ssl_opts = $self->_build_tls_options;
941              
942             # Start SSL (does not block because SSL_startHandshake => 0)
943             IO::Socket::SSL->start_SSL($socket, %ssl_opts)
944             or die Async::Redis::Error::Connection->new(
945             message => "SSL setup failed: " . IO::Socket::SSL::errstr(),
946             host => $self->{host},
947             port => $self->{port},
948 0 0       0 );
949              
950             # Drive handshake with non-blocking loop
951 0         0 my $deadline = Time::HiRes::time() + $self->{connect_timeout};
952              
953 0         0 while (1) {
954             # Check timeout
955 0 0       0 if (Time::HiRes::time() >= $deadline) {
956             die Async::Redis::Error::Timeout->new(
957             message => "TLS handshake timed out",
958             timeout => $self->{connect_timeout},
959 0         0 );
960             }
961              
962             # Attempt handshake step
963 0         0 my $rv = $socket->connect_SSL;
964              
965 0 0       0 if ($rv) {
966             # Handshake complete!
967 0         0 return $socket;
968             }
969              
970             # Check what the handshake needs
971 0         0 my $remaining = $deadline - Time::HiRes::time();
972 0 0       0 $remaining = 0.1 if $remaining <= 0;
973              
974 0 0       0 if ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_READ()) {
    0          
975             # Wait for socket to become readable with timeout
976 0         0 my $read_f = Future::IO->waitfor_readable($socket);
977             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
978 0     0   0 return Future->fail('tls_timeout');
979 0         0 });
980              
981 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
982 0         0 await $wait_f;
983              
984 0 0       0 if ($wait_f->is_failed) {
985             die Async::Redis::Error::Timeout->new(
986             message => "TLS handshake timed out",
987             timeout => $self->{connect_timeout},
988 0         0 );
989             }
990             }
991             elsif ($IO::Socket::SSL::SSL_ERROR == IO::Socket::SSL::SSL_ERROR_WANT_WRITE()) {
992             # Wait for socket to become writable with timeout
993 0         0 my $write_f = Future::IO->waitfor_writable($socket);
994             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
995 0     0   0 return Future->fail('tls_timeout');
996 0         0 });
997              
998 0         0 my $wait_f = Future->wait_any($write_f, $timeout_f);
999 0         0 await $wait_f;
1000              
1001 0 0       0 if ($wait_f->is_failed) {
1002             die Async::Redis::Error::Timeout->new(
1003             message => "TLS handshake timed out",
1004             timeout => $self->{connect_timeout},
1005 0         0 );
1006             }
1007             }
1008             else {
1009             # Actual error
1010             die Async::Redis::Error::Connection->new(
1011             message => "TLS handshake failed: " . IO::Socket::SSL::errstr(),
1012             host => $self->{host},
1013             port => $self->{port},
1014 0         0 );
1015             }
1016             }
1017             }
1018              
1019             # Reconnect with exponential backoff
1020 5     5   70 async sub _reconnect {
1021 5         7 my ($self) = @_;
1022              
1023 5         18 my $max = $self->{reconnect_max_attempts};
1024 5         6 my $attempt = 0;
1025              
1026 5         26 while (!$self->{connected}) {
1027 8         2838 $attempt++;
1028 8         12 $self->{_reconnect_attempt} = $attempt;
1029              
1030 8         9 my $ok = eval {
1031 8         20 await $self->connect;
1032 2         89 1;
1033             };
1034              
1035 8 100       279 if ($ok) {
1036 2         2 $self->{_reconnect_attempt} = 0;
1037 2         4 last;
1038             }
1039              
1040 6         10 my $error = $@;
1041              
1042             # Fire on_error callback
1043 6 50       14 if ($self->{on_error}) {
1044 0         0 $self->{on_error}->($self, $error);
1045             }
1046              
1047             # Honor reconnect_max_attempts cap so an unreachable Redis
1048             # doesn't spin forever. 0 means unlimited.
1049 6 100 66     22 if ($max && $attempt >= $max) {
1050 3         5 $self->{_reconnect_attempt} = 0;
1051 3         11 die Async::Redis::Error::Disconnected->new(
1052             message => "Reconnect gave up after $max attempts",
1053             );
1054             }
1055              
1056 3         8 my $delay = $self->_calculate_backoff($attempt);
1057 3         15 await Future::IO->sleep($delay);
1058             }
1059              
1060             # Reset attempt counter on success so subsequent reconnects start fresh.
1061 2         5 $self->{_reconnect_attempt} = 0;
1062             }
1063              
1064             # Ensure the socket is live, reconnecting if configured.
1065             #
1066             # Dedup: $self->{_reconnect_future} is the Future for the in-flight
1067             # reconnect. Concurrent callers share it. The slot is the shared-await
1068             # signal, NOT the ownership — ownership lives in $self->{_tasks}.
1069             #
1070             # Structured-concurrency: the reconnect task is added to the selector
1071             # so any caller currently awaiting via run_until_ready sees reconnect
1072             # failures propagated.
1073             #
1074             # NOTE: dedup is race-safe only when called from inside the write
1075             # gate (which serialises callers). Outside the gate, a failed reconnect
1076             # could be observed after on_ready clears the slot, allowing a second
1077             # reconnect to start before state converges.
1078 0     0   0 async sub _ensure_connected {
1079 0         0 my ($self) = @_;
1080 0 0       0 return if $self->{_socket_live};
1081 0 0       0 if (my $f = $self->{_reconnect_future}) {
1082 0         0 await $f;
1083 0         0 return;
1084             }
1085 0         0 my $f = $self->_reconnect;
1086 0         0 $self->{_reconnect_future} = $f;
1087 0         0 $self->{_tasks}->add(data => 'reconnect', f => $f);
1088 0     0   0 $f->on_ready(sub { $self->{_reconnect_future} = undef });
  0         0  
1089 0         0 await $f;
1090             }
1091              
1092             # Reconnect and replay pubsub subscriptions
1093 1     1   2 async sub _reconnect_pubsub {
1094 1         2 my ($self) = @_;
1095              
1096             my $sub = $self->{_subscription}
1097 1 50       4 or die Async::Redis::Error::Disconnected->new(
1098             message => "No subscription to replay",
1099             );
1100              
1101 1         3 my @replay = $sub->get_replay_commands;
1102              
1103             # Ensure connection state is fully cleaned up before reconnecting.
1104             # _reset_connection may have already been called by _read_response,
1105             # but if the socket was closed externally, we need to clean up
1106             # stale IO watchers and state here. It is safe to call twice —
1107             # the on_disconnect callback is guarded by $was_connected.
1108 1         4 $self->_reset_connection('pubsub_reconnect');
1109              
1110 1         3 await $self->_reconnect;
1111              
1112             # Re-enter pubsub mode before replaying so the unified reader
1113             # classifies incoming message frames correctly during replay.
1114 0         0 $self->{in_pubsub} = 1;
1115              
1116             # Replay all subscription commands through the write gate and unified
1117             # reader. Each channel/pattern gets its own command so confirmations
1118             # are matched one-to-one via the inflight queue.
1119 0         0 for my $cmd (@replay) {
1120 0         0 my ($command, @args) = @$cmd;
1121 0         0 for my $arg (@args) {
1122 0         0 await $self->_pubsub_command($command, $arg);
1123             }
1124             }
1125             }
1126              
1127             # Asynchronously reconnect after a pubsub connection drop. Called by
1128             # _reader_fatal when reconnect is enabled and a subscription is active.
1129             # Fires _resume_after_reconnect on the subscription on success, or
1130             # _fail_fatal on unrecoverable reconnect failure.
1131             sub _reconnect_async {
1132 0     0   0 my ($self, $sub) = @_;
1133              
1134             # Dedup against any reconnect already in progress (from either this
1135             # path or _ensure_connected). The slot is the shared signal.
1136             return if $self->{_reconnect_future}
1137 0 0 0     0 && !$self->{_reconnect_future}->is_ready;
1138              
1139 0         0 weaken(my $weak_self = $self);
1140 0         0 weaken(my $weak_sub = $sub);
1141              
1142 0     0   0 my $f = (async sub {
1143             # Reconnect the socket. _reconnect handles retry/backoff and
1144             # dies with Disconnected if reconnect_max_attempts is exhausted.
1145 0         0 await $weak_self->_reconnect;
1146              
1147             # Delegate the replay, on_reconnect, and driver-restart work to
1148             # the subscription's unified resume path. _resume_after_reconnect
1149             # handles clearing _paused, setting in_pubsub, replaying all
1150             # tracked channels/patterns, firing on_reconnect, and starting
1151             # the driver. Keeps the "who restarts what after reconnect"
1152             # logic in one place.
1153 0 0       0 if ($weak_sub) {
1154 0         0 await $weak_sub->_resume_after_reconnect;
1155             }
1156 0         0 })->();
1157              
1158             # Ownership: the selector owns the task; the slot is the dedup signal.
1159             # No ->retain — the selector holds the strong reference.
1160 0         0 $self->{_reconnect_future} = $f;
1161 0         0 $self->{_tasks}->add(data => 'pubsub-reconnect', f => $f);
1162              
1163             $f->on_ready(sub {
1164 0 0   0   0 return unless $weak_self;
1165 0         0 $weak_self->{_reconnect_future} = undef;
1166 0         0 });
1167             $f->on_fail(sub {
1168 0     0   0 my $err = shift;
1169 0 0       0 return unless $weak_sub;
1170 0         0 $weak_sub->_fail_fatal($err);
1171 0         0 });
1172              
1173 0         0 return;
1174             }
1175              
1176             # Execute a Redis command
1177 2     2 1 4 async sub command {
1178 2         3 my ($self, $cmd, @args) = @_;
1179              
1180             # Check for fork - invalidate connection if PID changed
1181 2         7 $self->_check_fork;
1182              
1183             # Block regular commands on pubsub connection
1184 2 50       3 if ($self->{in_pubsub}) {
1185 0   0     0 my $ucmd = uc($cmd // '');
1186 0 0       0 unless ($ucmd =~ /^(SUBSCRIBE|UNSUBSCRIBE|PSUBSCRIBE|PUNSUBSCRIBE|SSUBSCRIBE|SUNSUBSCRIBE|PING|QUIT)$/) {
1187 0         0 die Async::Redis::Error::Protocol->new(
1188             message => "Cannot execute '$cmd' on connection in PubSub mode",
1189             );
1190             }
1191             }
1192              
1193             # Apply key prefixing if configured
1194 2 50 33     6 if (defined $self->{prefix} && $self->{prefix} ne '') {
1195             @args = Async::Redis::KeyExtractor::apply_prefix(
1196 0         0 $self->{prefix}, $cmd, @args
1197             );
1198             }
1199              
1200             # Route through auto-pipeline if enabled
1201 2 50       5 if ($self->{_auto_pipeline}) {
1202 0         0 return await $self->{_auto_pipeline}->command($cmd, @args);
1203             }
1204              
1205             # Telemetry: start span and log send
1206 2         3 my $span_context;
1207 2         4 my $start_time = Time::HiRes::time();
1208 2 50       4 if ($self->{_telemetry}) {
1209 0         0 $span_context = $self->{_telemetry}->start_command_span($cmd, @args);
1210 0         0 $self->{_telemetry}->log_send($cmd, @args);
1211             }
1212              
1213 2         6 my $raw_cmd = $self->_build_command($cmd, @args);
1214 2         9 my $deadline = $self->_calculate_deadline($cmd, @args);
1215 2         30 my $response = Future->new;
1216              
1217 2         16 my $result;
1218             my $error;
1219              
1220 2         4 my $submit_ok = eval {
1221             await $self->_with_write_gate(sub {
1222 2         3 return (async sub {
1223             # Ensure the socket is live. Reconnect if enabled, else fail.
1224 2 50       4 if (!$self->{_socket_live}) {
1225 2 50       6 if ($self->_reconnect_enabled) {
1226 0         0 await $self->_ensure_connected;
1227             } else {
1228 2         32 die Async::Redis::Error::Disconnected->new(
1229             message => "Not connected",
1230             );
1231             }
1232             }
1233             # Register inflight BEFORE writing so order matches the wire.
1234 0         0 $self->_add_inflight($response, $cmd, \@args, $deadline, 'fail');
1235 0         0 await $self->_send($raw_cmd);
1236 2     2   6 })->();
1237 2         13 });
1238 0         0 1;
1239             };
1240              
1241 2 50       74 if (!$submit_ok) {
1242 2         3 $error = $@;
1243             # _with_write_gate already called _reader_fatal on write failure.
1244             } else {
1245 0         0 $self->_ensure_reader;
1246             # run_until_ready awaits $response while the selector pumps the
1247             # reader (and any other adopted tasks). If any selector task fails
1248             # unhandled — in particular, the reader — the failure propagates
1249             # here, so callers never hang waiting on a dead reader.
1250 0         0 my $await_ok = eval {
1251 0         0 $result = await $self->{_tasks}->run_until_ready($response);
1252 0         0 1;
1253             };
1254 0 0       0 if (!$await_ok) { $error = $@ }
  0         0  
1255             }
1256              
1257             # Telemetry: log result and end span
1258 2 50       4 if ($self->{_telemetry}) {
1259 0         0 my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
1260 0 0       0 if ($error) {
1261 0         0 $self->{_telemetry}->log_error($error);
1262             }
1263             else {
1264 0         0 $self->{_telemetry}->log_recv($result, $elapsed_ms);
1265             }
1266 0         0 $self->{_telemetry}->end_command_span($span_context, $error);
1267             }
1268              
1269 2 50       3 die $error if $error;
1270 0         0 return $result;
1271             }
1272              
1273             # Read response with deadline enforcement
1274 0     0   0 async sub _read_response_with_deadline {
1275 0         0 my ($self, $deadline, $cmd_ref) = @_;
1276              
1277             # First check if parser already has a complete message
1278 0 0       0 if (my $msg = $self->{parser}->get_message) {
1279 0         0 return $msg;
1280             }
1281              
1282             # Read until we get a complete message
1283 0         0 while (1) {
1284 0         0 my $remaining = $deadline - Time::HiRes::time();
1285              
1286 0 0       0 if ($remaining <= 0) {
1287 0         0 $self->_reset_connection;
1288             die Async::Redis::Error::Timeout->new(
1289             message => "Request timed out after $self->{request_timeout}s",
1290             command => $cmd_ref,
1291             timeout => $self->{request_timeout},
1292 0         0 maybe_executed => 1, # already sent the command
1293             );
1294             }
1295              
1296             # Use wait_any for timeout
1297 0         0 my $read_f = Future::IO->read($self->{socket}, 65536);
1298              
1299             # Store reference so disconnect() can cancel it
1300 0         0 $self->{_current_read_future} = $read_f;
1301              
1302             my $timeout_f = Future::IO->sleep($remaining)->then(sub {
1303 0     0   0 return Future->fail('read_timeout');
1304 0         0 });
1305              
1306 0         0 my $wait_f = Future->wait_any($read_f, $timeout_f);
1307 0         0 await $wait_f;
1308              
1309             # Clear stored reference after await completes
1310 0         0 $self->{_current_read_future} = undef;
1311              
1312             # Check if read was cancelled (by disconnect)
1313 0 0       0 if ($read_f->is_cancelled) {
1314 0         0 die Async::Redis::Error::Disconnected->new(
1315             message => "Disconnected during read",
1316             );
1317             }
1318              
1319 0 0       0 if ($wait_f->is_failed) {
1320 0         0 my ($error) = $wait_f->failure;
1321 0 0       0 if ($error eq 'read_timeout') {
1322 0         0 $self->_reset_connection;
1323             die Async::Redis::Error::Timeout->new(
1324             message => "Request timed out after $self->{request_timeout}s",
1325             command => $cmd_ref,
1326             timeout => $self->{request_timeout},
1327 0         0 maybe_executed => 1,
1328             );
1329             }
1330 0         0 $self->_reset_connection;
1331 0         0 die Async::Redis::Error::Connection->new(
1332             message => "$error",
1333             );
1334             }
1335              
1336             # Get the read result
1337 0         0 my $buf = $wait_f->get;
1338              
1339             # EOF
1340 0 0 0     0 if (!defined $buf || length($buf) == 0) {
1341 0         0 $self->_reset_connection;
1342 0         0 die Async::Redis::Error::Connection->new(
1343             message => "Connection closed by server",
1344             );
1345             }
1346              
1347 0         0 $self->{parser}->parse($buf);
1348              
1349 0 0       0 if (my $msg = $self->{parser}->get_message) {
1350 0         0 return $msg;
1351             }
1352             }
1353             }
1354              
1355             # Reset connection after timeout (stream is desynced)
1356             sub _reset_connection {
1357 2     2   14 my ($self, $reason) = @_;
1358 2   50     4 $reason //= 'timeout';
1359              
1360 2         4 my $was_connected = $self->{connected};
1361              
1362             # Cancel any active read future BEFORE closing socket
1363             # This ensures Future::IO unregisters its watcher while fileno is still valid
1364 2 50 33     6 if ($self->{_current_read_future} && !$self->{_current_read_future}->is_ready) {
1365 0         0 $self->{_current_read_future}->cancel;
1366 0         0 $self->{_current_read_future} = undef;
1367             }
1368              
1369             # Cancel any pending inflight operations before closing socket
1370 2 50       6 if (my $inflight = $self->{inflight}) {
1371 2         5 for my $entry (@$inflight) {
1372 0 0 0     0 if ($entry->{future} && !$entry->{future}->is_ready) {
1373 0         0 $entry->{future}->cancel;
1374             }
1375             }
1376 2         5 $self->{inflight} = [];
1377             }
1378              
1379 2 50       10 if ($self->{socket}) {
1380 0         0 $self->_close_socket;
1381             }
1382              
1383 2         6 $self->{_socket_live} = 0;
1384 2         4 $self->{_fatal_in_progress} = 0;
1385 2         4 $self->{_reader_running} = 0;
1386 2         6 $self->{_reconnect_future} = undef;
1387 2         3 $self->{connected} = 0;
1388 2         4 $self->{parser} = undef;
1389 2         4 $self->{in_pubsub} = 0;
1390              
1391 2 50 66     11 if ($was_connected && $self->{on_disconnect}) {
1392 0         0 $self->{on_disconnect}->($self, $reason);
1393             }
1394             }
1395              
1396             # Async write lock. The lock is a Future that resolves when the current
1397             # holder releases. Waiters chain onto it; each waiter replaces the slot
1398             # with its own Future before returning to the caller.
1399 5     5   16910 async sub _acquire_write_lock {
1400 5         10 my ($self) = @_;
1401              
1402             # Wait out any in-progress fatal. _reader_fatal is synchronous so this
1403             # is typically immediate, but a callback inside fatal could yield.
1404             # NOTE: this is a poll loop (sleep(0) per tick). Acceptable because
1405             # _reader_fatal's transition is synchronous; if teardown ever becomes
1406             # async, replace with a one-shot Future waiters can await on.
1407 5         13 while ($self->{_fatal_in_progress}) {
1408 99         20280 await Future::IO->sleep(0);
1409             }
1410              
1411             # Chain onto the existing lock Future if any.
1412 5         595 while (my $prev = $self->{_write_lock}) {
1413 1         14 await $prev;
1414             }
1415              
1416             # We are the owner now. Install our own Future so the next caller
1417             # waits on us.
1418 5         93 $self->{_write_lock} = Future->new;
1419 5         62 return;
1420             }
1421              
1422             sub _release_write_lock {
1423 5     5   14940 my ($self) = @_;
1424 5         10 my $f = delete $self->{_write_lock};
1425 5 50 33     19 $f->done if $f && !$f->is_ready;
1426             }
1427              
1428             # Wrap a body in gate acquire/release with guaranteed release even if the
1429             # body dies. On body failure, calls _reader_fatal with a transport error.
1430 2     2   3 async sub _with_write_gate {
1431 2         3 my ($self, $body) = @_;
1432 2         6 await $self->_acquire_write_lock;
1433 2         82 my $ok = eval { await $body->(); 1 };
  2         5  
  0         0  
1434 2         107 my $err = $@;
1435 2         6 $self->_release_write_lock;
1436 2 50       68 if (!$ok) {
1437             # Convert to a typed transport error if not already.
1438             my $typed = (ref $err && eval { $err->isa('Async::Redis::Error') })
1439             ? $err
1440             : Async::Redis::Error::Connection->new(
1441             message => "Write failed: $err",
1442             host => $self->{host},
1443             port => $self->{port},
1444 2 50 33     4 );
1445 2         6 $self->_reader_fatal($typed);
1446 2         8 die $typed;
1447             }
1448 0         0 return;
1449             }
1450              
1451             # Is reconnect enabled for this client?
1452             sub _reconnect_enabled {
1453 2     2   4 my ($self) = @_;
1454 2         6 return !!$self->{reconnect};
1455             }
1456              
1457             # Central "something went wrong with the stream" transition. Detaches
1458             # inflight BEFORE closing the socket so the typed error is preserved
1459             # (the old _reset_connection cancels inflight directly, which would
1460             # overwrite $typed_error with a generic cancellation).
1461             sub _reader_fatal {
1462 10     10   20 my ($self, $typed_error) = @_;
1463              
1464 10 100       22 return if $self->{_fatal_in_progress};
1465 9         13 $self->{_fatal_in_progress} = 1;
1466              
1467 9         12 my $ok = eval {
1468             # 1. Capture pre-reset state BEFORE any mutation.
1469 9         12 my $was_connected = $self->{connected};
1470 9         10 my $was_pubsub = $self->{in_pubsub};
1471 9         10 my $subscription = $self->{_subscription};
1472              
1473             # 2. Detach inflight so the close path cannot cancel them.
1474 9         12 my $detached_inflight = $self->{inflight};
1475 9         26 $self->{inflight} = [];
1476              
1477             # 3. Detach auto-pipeline's queued-but-not-registered commands.
1478 9         11 my $detached_autopipe = [];
1479 9 100       19 if (my $ap = $self->{_auto_pipeline}) {
1480 1         6 $detached_autopipe = $ap->_detach_queued;
1481             }
1482              
1483             # 4. Cancel the current read BEFORE closing the socket.
1484 9 50 33     18 if ($self->{_current_read_future}
1485             && !$self->{_current_read_future}->is_ready) {
1486 0         0 $self->{_current_read_future}->cancel;
1487             }
1488 9         10 $self->{_current_read_future} = undef;
1489              
1490             # 5. Close socket, clear internal state.
1491 9 50       18 $self->_close_socket if $self->{socket};
1492 9         12 $self->{_socket_live} = 0;
1493 9         9 $self->{connected} = 0;
1494 9         11 $self->{parser} = undef;
1495 9         10 $self->{in_pubsub} = 0;
1496 9         9 $self->{_reader_running} = 0;
1497              
1498             # 6. Fail all detached futures with the SAME typed error.
1499 9         19 for my $entry (@$detached_inflight, @$detached_autopipe) {
1500 3 50       43 next if $entry->{future}->is_ready;
1501 3         20 $entry->{future}->fail($typed_error);
1502             }
1503              
1504             # 7. Pubsub reconnect handoff. _fail_fatal and _pause_for_reconnect
1505             # land in Phase 2 (Task 14); for now this branch only triggers
1506             # when the subscription API is used, and we call the older
1507             # _close path as a placeholder until Task 14 replaces it.
1508 9 50 33     59 if ($was_pubsub && $subscription) {
1509 0 0 0     0 if ($self->_reconnect_enabled
    0          
1510             && $subscription->can('_pause_for_reconnect')) {
1511 0         0 $subscription->_pause_for_reconnect;
1512 0 0       0 if ($self->can('_reconnect_async')) {
1513 0         0 $self->_reconnect_async($subscription);
1514             }
1515             }
1516             elsif ($subscription->can('_fail_fatal')) {
1517 0         0 $subscription->_fail_fatal($typed_error);
1518             }
1519             else {
1520             # Pre-Phase-2 fallback: existing _close method.
1521 0 0       0 $subscription->_close if $subscription->can('_close');
1522             }
1523             }
1524              
1525             # 8. on_disconnect: only if we were publicly connected.
1526 9 100 100     22 if ($was_connected && $self->{on_disconnect}) {
1527 4         33 $self->{on_disconnect}->($self, "$typed_error");
1528             }
1529              
1530 8         27 1;
1531             };
1532              
1533 9         19 my $caught = $@;
1534             # Always clear the guard, even if a callback died.
1535 9         10 $self->{_fatal_in_progress} = 0;
1536 9 100 66     23 die $caught if !$ok && $caught;
1537             }
1538              
1539             # Decode Protocol::Redis response to Perl value
1540             sub _decode_response {
1541 0     0   0 my ($self, $msg) = @_;
1542              
1543 0 0       0 return undef unless $msg;
1544              
1545 0         0 my $type = $msg->{type};
1546 0         0 my $data = $msg->{data};
1547              
1548             # Simple string (+)
1549 0 0       0 if ($type eq '+') {
    0          
    0          
    0          
    0          
1550 0         0 return $data;
1551             }
1552             # Error (-)
1553             elsif ($type eq '-') {
1554 0         0 die Async::Redis::Error::Redis->from_message($data);
1555             }
1556             # Integer (:)
1557             elsif ($type eq ':') {
1558 0         0 return 0 + $data;
1559             }
1560             # Bulk string ($)
1561             elsif ($type eq '$') {
1562 0         0 return $data; # undef for null bulk
1563             }
1564             # Array (*)
1565             elsif ($type eq '*') {
1566 0 0       0 return undef unless defined $data; # null array
1567 0         0 return [ map { $self->_decode_response($_) } @$data ];
  0         0  
1568             }
1569              
1570 0         0 return $data;
1571             }
1572              
1573             # Non-throwing decoder used by the unified reader. Classifies each frame
1574             # as one of:
1575             # ('ok', $decoded_value) - normal response
1576             # ('redis_error', $error_object) - -ERR frame from Redis
1577             # ('protocol_error', $error_object) - fatal desync (malformed)
1578             sub _decode_response_result {
1579 14     14   21704 my ($self, $msg) = @_;
1580              
1581 14 100       28 if (!defined $msg) {
1582 1         27 return ('protocol_error', Async::Redis::Error::Protocol->new(
1583             message => 'undef message from parser',
1584             ));
1585             }
1586              
1587 13   50     26 my $type = $msg->{type} // '';
1588 13         16 my $data = $msg->{data};
1589              
1590 13 100       37 if ($type eq '+') {
    100          
    100          
    100          
    100          
1591 3         10 return ('ok', $data);
1592             }
1593             elsif ($type eq '-') {
1594 2         14 return ('redis_error', Async::Redis::Error::Redis->from_message($data));
1595             }
1596             elsif ($type eq ':') {
1597 3   50     12 return ('ok', 0 + ($data // 0));
1598             }
1599             elsif ($type eq '$') {
1600 2         7 return ('ok', $data);
1601             }
1602             elsif ($type eq '*') {
1603 2 50       5 return ('ok', undef) if !defined $data; # nil array
1604 2         2 my @out;
1605 2         4 for my $child (@$data) {
1606 5         10 my ($k, $v) = $self->_decode_response_result($child);
1607 5 50       9 if ($k eq 'protocol_error') {
1608 0         0 return ($k, $v); # propagate fatal
1609             }
1610 5         10 push @out, $v;
1611             }
1612 2         6 return ('ok', \@out);
1613             }
1614             else {
1615 1         5 return ('protocol_error', Async::Redis::Error::Protocol->new(
1616             message => "unknown frame type: $type",
1617             ));
1618             }
1619             }
1620              
1621             # ============================================================================
1622             # Convenience Commands
1623             # ============================================================================
1624              
1625 0     0 1 0 async sub ping {
1626 0         0 my ($self) = @_;
1627 0         0 return await $self->command('PING');
1628             }
1629              
1630 0     0 0 0 async sub set {
1631 0         0 my ($self, $key, $value, %opts) = @_;
1632 0         0 my @cmd = ('SET', $key, $value);
1633 0 0       0 push @cmd, 'EX', $opts{ex} if exists $opts{ex};
1634 0 0       0 push @cmd, 'PX', $opts{px} if exists $opts{px};
1635 0 0       0 push @cmd, 'NX' if $opts{nx};
1636 0 0       0 push @cmd, 'XX' if $opts{xx};
1637 0         0 return await $self->command(@cmd);
1638             }
1639              
1640 0     0 0 0 async sub get {
1641 0         0 my ($self, $key) = @_;
1642 0         0 return await $self->command('GET', $key);
1643             }
1644              
1645 0     0 0 0 async sub del {
1646 0         0 my ($self, @keys) = @_;
1647 0         0 return await $self->command('DEL', @keys);
1648             }
1649              
1650 0     0 0 0 async sub incr {
1651 0         0 my ($self, $key) = @_;
1652 0         0 return await $self->command('INCR', $key);
1653             }
1654              
1655 0     0 0 0 async sub lpush {
1656 0         0 my ($self, $key, @values) = @_;
1657 0         0 return await $self->command('LPUSH', $key, @values);
1658             }
1659              
1660 0     0 0 0 async sub rpush {
1661 0         0 my ($self, $key, @values) = @_;
1662 0         0 return await $self->command('RPUSH', $key, @values);
1663             }
1664              
1665 0     0 0 0 async sub lpop {
1666 0         0 my ($self, $key) = @_;
1667 0         0 return await $self->command('LPOP', $key);
1668             }
1669              
1670 0     0 0 0 async sub lrange {
1671 0         0 my ($self, $key, $start, $stop) = @_;
1672 0         0 return await $self->command('LRANGE', $key, $start, $stop);
1673             }
1674              
1675 0     0 1 0 async sub keys {
1676 0         0 my ($self, $pattern) = @_;
1677 0         0 return await $self->command('KEYS', $pattern // '*');
1678             }
1679              
1680 0     0 0 0 async sub flushdb {
1681 0         0 my ($self) = @_;
1682 0         0 return await $self->command('FLUSHDB');
1683             }
1684              
1685             # ============================================================================
1686             # Lua Scripting
1687             # ============================================================================
1688              
1689 0     0 1 0 async sub script_load {
1690 0         0 my ($self, $script) = @_;
1691 0         0 return await $self->command('SCRIPT', 'LOAD', $script);
1692             }
1693              
1694 0     0 1 0 async sub script_exists {
1695 0         0 my ($self, @shas) = @_;
1696 0         0 return await $self->command('SCRIPT', 'EXISTS', @shas);
1697             }
1698              
1699 0     0 1 0 async sub script_flush {
1700 0         0 my ($self, $mode) = @_;
1701 0         0 my @args = ('SCRIPT', 'FLUSH');
1702 0 0       0 push @args, $mode if $mode; # ASYNC or SYNC
1703 0         0 return await $self->command(@args);
1704             }
1705              
1706 0     0 1 0 async sub script_kill {
1707 0         0 my ($self) = @_;
1708 0         0 return await $self->command('SCRIPT', 'KILL');
1709             }
1710              
1711 0     0 0 0 async sub evalsha_or_eval {
1712 0         0 my ($self, $sha, $script, $numkeys, @keys_and_args) = @_;
1713              
1714             # Try EVALSHA first
1715 0         0 my $result;
1716 0         0 eval {
1717 0         0 $result = await $self->evalsha($sha, $numkeys, @keys_and_args);
1718             };
1719              
1720 0 0       0 if ($@) {
1721 0         0 my $error = $@;
1722              
1723             # Check if it's a NOSCRIPT error
1724 0 0       0 if ("$error" =~ /NOSCRIPT/i) {
1725             # Fall back to EVAL (which also loads the script)
1726 0         0 $result = await $self->eval($script, $numkeys, @keys_and_args);
1727             }
1728             else {
1729             # Re-throw other errors
1730 0         0 die $error;
1731             }
1732             }
1733              
1734 0         0 return $result;
1735             }
1736              
1737             sub script {
1738 0     0 1 0 my ($self, $code) = @_;
1739 0         0 return Async::Redis::Script->new(
1740             redis => $self,
1741             script => $code,
1742             );
1743             }
1744              
1745             # Define a named script command
1746             # Usage: $redis->define_command(name => { keys => N, lua => '...' })
1747             sub define_command {
1748 0     0 1 0 my ($self, $name, $def) = @_;
1749              
1750 0 0 0     0 die "Command name required" unless defined $name && length $name;
1751 0 0       0 die "Command definition required" unless ref $def eq 'HASH';
1752 0 0       0 die "Lua script required (lua => '...')" unless defined $def->{lua};
1753             die "define_command install option is not supported; use run_script()"
1754 0 0       0 if exists $def->{install};
1755              
1756             # Validate name (alphanumeric and underscore only)
1757 0 0       0 die "Invalid command name '$name' - use only alphanumeric and underscore"
1758             unless $name =~ /^[a-zA-Z_][a-zA-Z0-9_]*$/;
1759              
1760             my $script = Async::Redis::Script->new(
1761             redis => $self,
1762             script => $def->{lua},
1763             name => $name,
1764             num_keys => $def->{keys} // 'dynamic',
1765             description => $def->{description},
1766 0   0     0 );
1767              
1768 0         0 $self->{_scripts}{$name} = $script;
1769              
1770 0         0 return $script;
1771             }
1772              
1773             # Run a registered script by name
1774             # Usage: $redis->run_script('name', @keys_then_args)
1775             # If num_keys is 'dynamic', first arg is the key count
1776 0     0 1 0 async sub run_script {
1777 0         0 my ($self, $name, @args) = @_;
1778              
1779 0 0       0 my $script = $self->{_scripts}{$name}
1780             or die "Unknown script: '$name' - use define_command() first";
1781              
1782 0         0 my $num_keys = $script->num_keys;
1783              
1784             # Handle dynamic key count
1785 0 0       0 if ($num_keys eq 'dynamic') {
1786 0         0 $num_keys = shift @args;
1787 0 0       0 die "Key count required as first argument for dynamic script '$name'"
1788             unless defined $num_keys;
1789             }
1790              
1791             # Split args into keys and argv
1792 0         0 my @keys = splice(@args, 0, $num_keys);
1793 0         0 return await $script->run(\@keys, \@args);
1794             }
1795              
1796             # Get a registered script by name
1797             sub get_script {
1798 0     0 1 0 my ($self, $name) = @_;
1799 0         0 return $self->{_scripts}{$name};
1800             }
1801              
1802             # List all registered script names
1803             sub list_scripts {
1804 0     0 1 0 my ($self) = @_;
1805 0         0 return CORE::keys %{$self->{_scripts}};
  0         0  
1806             }
1807              
1808             # Preload all registered scripts to Redis
1809             # Useful before pipeline execution
1810 0     0 1 0 async sub preload_scripts {
1811 0         0 my ($self) = @_;
1812              
1813 0         0 my @names = $self->list_scripts;
1814 0 0       0 return 0 unless @names;
1815              
1816 0         0 for my $name (@names) {
1817 0         0 my $script = $self->{_scripts}{$name};
1818 0         0 await $self->script_load($script->script);
1819             }
1820              
1821 0         0 return scalar @names;
1822             }
1823              
1824             # ============================================================================
1825             # SCAN Iterators
1826             # ============================================================================
1827              
1828             sub scan_iter {
1829 0     0 1 0 my ($self, %opts) = @_;
1830             return Async::Redis::Iterator->new(
1831             redis => $self,
1832             command => 'SCAN',
1833             match => $opts{match},
1834             count => $opts{count},
1835             type => $opts{type},
1836 0         0 );
1837             }
1838              
1839             sub hscan_iter {
1840 0     0 0 0 my ($self, $key, %opts) = @_;
1841             return Async::Redis::Iterator->new(
1842             redis => $self,
1843             command => 'HSCAN',
1844             key => $key,
1845             match => $opts{match},
1846             count => $opts{count},
1847 0         0 );
1848             }
1849              
1850             sub sscan_iter {
1851 0     0 0 0 my ($self, $key, %opts) = @_;
1852             return Async::Redis::Iterator->new(
1853             redis => $self,
1854             command => 'SSCAN',
1855             key => $key,
1856             match => $opts{match},
1857             count => $opts{count},
1858 0         0 );
1859             }
1860              
1861             sub zscan_iter {
1862 0     0 0 0 my ($self, $key, %opts) = @_;
1863             return Async::Redis::Iterator->new(
1864             redis => $self,
1865             command => 'ZSCAN',
1866             key => $key,
1867             match => $opts{match},
1868             count => $opts{count},
1869 0         0 );
1870             }
1871              
1872             # ============================================================================
1873             # Transactions
1874             # ============================================================================
1875              
1876 0     0 1 0 async sub multi {
1877 0         0 my ($self, $callback) = @_;
1878              
1879             # Prevent nested multi() calls
1880             die "Cannot nest multi() calls - already in a transaction"
1881 0 0       0 if $self->{in_multi};
1882              
1883             # Mark that we're collecting transaction commands
1884 0         0 $self->{in_multi} = 1;
1885              
1886 0         0 my @commands;
1887 0         0 eval {
1888             # Create transaction collector
1889 0         0 my $tx = Async::Redis::Transaction->new(redis => $self);
1890              
1891             # Run callback to collect commands
1892 0         0 await $callback->($tx);
1893              
1894 0         0 @commands = $tx->commands;
1895             };
1896 0         0 my $collect_error = $@;
1897              
1898 0 0       0 if ($collect_error) {
1899 0         0 $self->{in_multi} = 0;
1900 0         0 die $collect_error;
1901             }
1902              
1903             # If no commands queued, return empty result
1904 0 0       0 unless (@commands) {
1905 0         0 $self->{in_multi} = 0;
1906 0         0 return [];
1907             }
1908              
1909             # Execute transaction (in_multi already set)
1910 0         0 return await $self->_execute_transaction(\@commands);
1911             }
1912              
1913 0     0   0 async sub _execute_transaction {
1914 0         0 my ($self, $commands) = @_;
1915              
1916             # in_multi should already be set by caller
1917              
1918 0         0 my $results;
1919 0         0 eval {
1920             # Send MULTI
1921 0         0 await $self->command('MULTI');
1922              
1923             # Queue all commands (they return +QUEUED)
1924 0         0 for my $cmd (@$commands) {
1925 0         0 await $self->command(@$cmd);
1926             }
1927              
1928             # Execute and get results
1929 0         0 $results = await $self->command('EXEC');
1930             };
1931 0         0 my $error = $@;
1932              
1933             # Always clear transaction state
1934 0         0 $self->{in_multi} = 0;
1935              
1936 0 0       0 if ($error) {
1937             # Try to clean up
1938 0         0 eval { await $self->command('DISCARD') };
  0         0  
1939 0         0 die $error;
1940             }
1941              
1942 0         0 return $results;
1943             }
1944              
1945             # Accessor for pool cleanliness tracking
1946 1     1 1 5 sub in_multi { shift->{in_multi} }
1947 1     1 1 15 sub watching { shift->{watching} }
1948 0     0 1 0 sub in_pubsub { shift->{in_pubsub} }
1949 0   0 0 1 0 sub inflight_count { scalar @{shift->{inflight} // []} }
  0         0  
1950              
1951             # Is connection dirty (unsafe to reuse)?
1952             sub is_dirty {
1953 2     2 1 5 my ($self) = @_;
1954              
1955 2 50       5 return 1 if $self->{in_multi};
1956 2 50       5 return 1 if $self->{watching};
1957 2 50       4 return 1 if $self->{in_pubsub};
1958 2 50 50     2 return 1 if @{$self->{inflight} // []} > 0;
  2         8  
1959              
1960 2         5 return 0;
1961             }
1962              
1963 1     1 1 30 async sub watch {
1964 1         3 my ($self, @keys) = @_;
1965 1         4 my $result = await $self->command('WATCH', @keys);
1966 0         0 $self->{watching} = 1;
1967 0         0 return $result;
1968             }
1969              
1970 0     0 1 0 async sub unwatch {
1971 0         0 my ($self) = @_;
1972 0         0 my $result = await $self->command('UNWATCH');
1973 0         0 $self->{watching} = 0;
1974 0         0 return $result;
1975             }
1976              
1977 1     1 1 23 async sub multi_start {
1978 1         2 my ($self) = @_;
1979 1         3 my $result = await $self->command('MULTI');
1980 0         0 $self->{in_multi} = 1;
1981 0         0 return $result;
1982             }
1983              
1984 0     0 1 0 async sub exec {
1985 0         0 my ($self) = @_;
1986 0         0 my $result = await $self->command('EXEC');
1987 0         0 $self->{in_multi} = 0;
1988 0         0 $self->{watching} = 0; # EXEC clears watches
1989 0         0 return $result;
1990             }
1991              
1992 0     0 1 0 async sub discard {
1993 0         0 my ($self) = @_;
1994 0         0 my $result = await $self->command('DISCARD');
1995 0         0 $self->{in_multi} = 0;
1996 0         0 $self->{watching} = 0; # DISCARD clears watches
1997 0         0 return $result;
1998             }
1999              
2000 0     0 1 0 async sub watch_multi {
2001 0         0 my ($self, $keys, $callback) = @_;
2002              
2003 0         0 my $watch_active = 0;
2004 0         0 my $multi_started = 0;
2005 0         0 my $results;
2006              
2007 0         0 my $ok = eval {
2008             # WATCH must be unwound on any pre-MULTI failure, including a
2009             # callback die, otherwise the connection remains poisoned.
2010 0         0 await $self->watch(@$keys);
2011 0         0 $watch_active = 1;
2012              
2013             # Get current values of watched keys
2014 0         0 my %watched;
2015 0         0 for my $key (@$keys) {
2016 0         0 $watched{$key} = await $self->get($key);
2017             }
2018              
2019             # Create transaction collector
2020 0         0 my $tx = Async::Redis::Transaction->new(redis => $self);
2021              
2022             # Run callback with watched values
2023 0         0 await $callback->($tx, \%watched);
2024              
2025 0         0 my @commands = $tx->commands;
2026              
2027             # If no commands queued, just unwatch and return empty
2028 0 0       0 unless (@commands) {
2029 0         0 await $self->unwatch;
2030 0         0 $watch_active = 0;
2031 0         0 $results = [];
2032             }
2033             else {
2034 0         0 await $self->multi_start;
2035 0         0 $multi_started = 1;
2036              
2037 0         0 for my $cmd (@commands) {
2038 0         0 await $self->command(@$cmd);
2039             }
2040              
2041 0         0 $results = await $self->exec;
2042 0         0 $multi_started = 0;
2043 0         0 $watch_active = 0;
2044             }
2045              
2046 0         0 1;
2047             };
2048 0         0 my $error = $@;
2049              
2050 0 0       0 if (!$ok) {
2051 0 0       0 if ($multi_started) {
    0          
2052 0         0 eval { await $self->discard; 1 };
  0         0  
  0         0  
2053             }
2054             elsif ($watch_active) {
2055 0         0 eval { await $self->unwatch; 1 };
  0         0  
  0         0  
2056             }
2057              
2058             # Cleanup can fail on a dead socket; keep local state conservative
2059             # and preserve the original caller-facing error.
2060 0         0 $self->{in_multi} = 0;
2061 0         0 $self->{watching} = 0;
2062              
2063 0         0 die $error;
2064             }
2065              
2066             # EXEC returns undef/nil if WATCH failed
2067 0         0 return $results;
2068             }
2069              
2070             # ============================================================================
2071             # PUB/SUB
2072             # ============================================================================
2073              
2074             # Wait for inflight commands to complete before mode change
2075 0     0   0 async sub _wait_for_inflight_drain {
2076 0         0 my ($self, $timeout) = @_;
2077 0   0     0 $timeout //= 30;
2078              
2079 0 0       0 return unless @{$self->{inflight}};
  0         0  
2080              
2081 0         0 my $deadline = Time::HiRes::time() + $timeout;
2082              
2083 0   0     0 while (@{$self->{inflight}} && Time::HiRes::time() < $deadline) {
  0         0  
2084 0         0 await Future::IO->sleep(0.001);
2085             }
2086              
2087 0 0       0 if (@{$self->{inflight}}) {
  0         0  
2088 0         0 $self->_fail_all_inflight("Timeout waiting for inflight commands");
2089             }
2090             }
2091              
2092 0     0 1 0 async sub publish {
2093 0         0 my ($self, $channel, $message) = @_;
2094 0         0 return await $self->command('PUBLISH', $channel, $message);
2095             }
2096              
2097 0     0 1 0 async sub spublish {
2098 0         0 my ($self, $channel, $message) = @_;
2099 0         0 return await $self->command('SPUBLISH', $channel, $message);
2100             }
2101              
2102             # Subscribe to channels - returns a Subscription object
2103 0     0 1 0 async sub subscribe {
2104 0         0 my ($self, @channels) = @_;
2105              
2106             die Async::Redis::Error::Disconnected->new(
2107             message => "Not connected",
2108 0 0       0 ) unless $self->{connected};
2109              
2110             # Wait for pending commands before entering PubSub mode
2111 0         0 await $self->_wait_for_inflight_drain;
2112              
2113             # Clear a stale closed subscription so we allocate a fresh object.
2114 0 0 0     0 if ($self->{_subscription} && $self->{_subscription}->is_closed) {
2115 0         0 delete $self->{_subscription};
2116             }
2117              
2118             # Create or reuse subscription
2119 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
2120              
2121             # Set in_pubsub BEFORE submitting so the unified reader classifies
2122             # racing message frames correctly (e.g. published before our
2123             # confirmation arrives).
2124 0         0 $self->{in_pubsub} = 1;
2125              
2126             # Issue one SUBSCRIBE per channel through the write gate and unified
2127             # reader. Each call awaits its matching confirmation frame.
2128 0         0 for my $ch (@channels) {
2129 0         0 await $self->_pubsub_command('SUBSCRIBE', $ch);
2130 0         0 $sub->_add_channel($ch);
2131             }
2132              
2133 0         0 return $sub;
2134             }
2135              
2136             # Pattern subscribe
2137 0     0 1 0 async sub psubscribe {
2138 0         0 my ($self, @patterns) = @_;
2139              
2140             die Async::Redis::Error::Disconnected->new(
2141             message => "Not connected",
2142 0 0       0 ) unless $self->{connected};
2143              
2144             # Wait for pending commands before entering PubSub mode
2145 0         0 await $self->_wait_for_inflight_drain;
2146              
2147             # Clear a stale closed subscription so we allocate a fresh object.
2148 0 0 0     0 if ($self->{_subscription} && $self->{_subscription}->is_closed) {
2149 0         0 delete $self->{_subscription};
2150             }
2151              
2152 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
2153              
2154 0         0 $self->{in_pubsub} = 1;
2155              
2156 0         0 for my $p (@patterns) {
2157 0         0 await $self->_pubsub_command('PSUBSCRIBE', $p);
2158 0         0 $sub->_add_pattern($p);
2159             }
2160              
2161 0         0 return $sub;
2162             }
2163              
2164             # Sharded subscribe (Redis 7+)
2165 0     0 1 0 async sub ssubscribe {
2166 0         0 my ($self, @channels) = @_;
2167              
2168             die Async::Redis::Error::Disconnected->new(
2169             message => "Not connected",
2170 0 0       0 ) unless $self->{connected};
2171              
2172             # Wait for pending commands before entering PubSub mode
2173 0         0 await $self->_wait_for_inflight_drain;
2174              
2175             # Clear a stale closed subscription so we allocate a fresh object.
2176 0 0 0     0 if ($self->{_subscription} && $self->{_subscription}->is_closed) {
2177 0         0 delete $self->{_subscription};
2178             }
2179              
2180 0   0     0 my $sub = $self->{_subscription} //= Async::Redis::Subscription->new(redis => $self);
2181              
2182 0         0 $self->{in_pubsub} = 1;
2183              
2184 0         0 for my $ch (@channels) {
2185 0         0 await $self->_pubsub_command('SSUBSCRIBE', $ch);
2186 0         0 $sub->_add_sharded_channel($ch);
2187             }
2188              
2189 0         0 return $sub;
2190             }
2191              
2192             # Read pubsub frame (subscription confirmation or message)
2193 1     1   2 async sub _read_pubsub_frame {
2194 1         2 my ($self) = @_;
2195              
2196             die Async::Redis::Error::Disconnected->new(
2197             message => "Not connected",
2198 1 50       16 ) unless $self->{connected};
2199              
2200 0           my $msg = await $self->_read_response();
2201 0           return $self->_decode_response($msg);
2202             }
2203              
2204             # Execute a single pubsub management command (SUBSCRIBE, UNSUBSCRIBE,
2205             # PSUBSCRIBE, PUNSUBSCRIBE, SSUBSCRIBE, SUNSUBSCRIBE) through the write
2206             # gate and unified reader. Each call registers one inflight entry and
2207             # awaits the matching confirmation frame from the reader.
2208             #
2209             # Use instead of _send_command + _read_pubsub_frame when in_pubsub=1
2210             # so that the unified reader (_run_reader) remains the sole socket
2211             # reader. Does not apply prefix or go through auto-pipeline.
2212 0     0     async sub _pubsub_command {
2213 0           my ($self, $cmd, @args) = @_;
2214              
2215 0           my $raw_cmd = $self->_build_command($cmd, @args);
2216 0           my $deadline = $self->_calculate_deadline($cmd, @args);
2217 0           my $response = Future->new;
2218              
2219 0           my $submit_ok = eval {
2220             await $self->_with_write_gate(sub {
2221 0           return (async sub {
2222 0 0         if (!$self->{_socket_live}) {
2223 0           die Async::Redis::Error::Disconnected->new(
2224             message => "Not connected",
2225             );
2226             }
2227             # Register inflight BEFORE writing so order matches the wire.
2228 0           $self->_add_inflight($response, $cmd, \@args, $deadline, 'fail');
2229 0           await $self->_send($raw_cmd);
2230 0     0     })->();
2231 0           });
2232 0           1;
2233             };
2234              
2235 0 0         unless ($submit_ok) {
2236 0           my $err = $@;
2237 0           die $err;
2238             }
2239              
2240 0           $self->_ensure_reader;
2241 0           return await $response;
2242             }
2243              
2244             # Send command without reading response (for pubsub)
2245 0     0     async sub _send_command {
2246 0           my ($self, @args) = @_;
2247              
2248 0           my $cmd = $self->_build_command(@args);
2249 0           await $self->_send($cmd);
2250             }
2251              
2252             # Read next pubsub message (blocking) - for compatibility
2253 0     0     async sub _read_pubsub_message {
2254 0           my ($self) = @_;
2255              
2256 0           my $msg = await $self->_read_response();
2257              
2258             # Message format: ['message', $channel, $payload]
2259             # or: ['pmessage', $pattern, $channel, $payload]
2260 0           return $msg;
2261             }
2262              
2263             # ============================================================================
2264             # Pipelining
2265             # ============================================================================
2266              
2267             sub pipeline {
2268 0     0 1   my ($self, %opts) = @_;
2269             return Async::Redis::Pipeline->new(
2270             redis => $self,
2271             max_depth => $opts{max_depth} // $self->{pipeline_depth},
2272 0   0       );
2273             }
2274              
2275             # Execute multiple commands, return all responses
2276 0     0     async sub _execute_pipeline {
2277 0           my ($self, $commands) = @_;
2278 0 0         return [] unless @$commands;
2279              
2280 0           my $start_time = Time::HiRes::time();
2281 0           my $count = scalar @$commands;
2282              
2283             # Build one RESP buffer and one Future per command up front.
2284 0           my $buffer = '';
2285 0           my @futures;
2286             my @deadlines;
2287 0           for my $cmd (@$commands) {
2288 0           $buffer .= $self->_build_command(@$cmd);
2289 0           push @futures, Future->new;
2290 0           push @deadlines, $self->_calculate_deadline(@$cmd);
2291             }
2292              
2293             await $self->_with_write_gate(sub {
2294 0           return (async sub {
2295 0 0         if (!$self->{_socket_live}) {
2296 0 0         if ($self->_reconnect_enabled) {
2297 0           await $self->_ensure_connected;
2298             } else {
2299 0           die Async::Redis::Error::Disconnected->new(
2300             message => "Not connected",
2301             );
2302             }
2303             }
2304 0           for my $i (0 .. $#$commands) {
2305             $self->_add_inflight(
2306             $futures[$i],
2307             $commands->[$i][0],
2308 0           [ @{$commands->[$i]}[1..$#{$commands->[$i]}] ],
  0            
  0            
2309             $deadlines[$i],
2310             'capture',
2311             );
2312             }
2313 0           await $self->_send($buffer);
2314 0     0     })->();
2315 0           });
2316              
2317 0           $self->_ensure_reader;
2318              
2319             # Await every future. capture policy means Redis errors come back as
2320             # done($error_object); transport failures come back as fail().
2321 0           my @results;
2322 0           for my $i (0 .. $#futures) {
2323 0           my $ok = eval { push @results, await $futures[$i]; 1 };
  0            
  0            
2324 0 0         next if $ok;
2325             # Transport failure mid-pipeline: the remaining futures already
2326             # got _reader_fatal's typed error. Collect them too (as error
2327             # values) so the caller sees a full array.
2328 0           push @results, $@;
2329 0           for my $j ($i + 1 .. $#futures) {
2330 0           my $r_ok = eval { push @results, await $futures[$j]; 1 };
  0            
  0            
2331 0 0         push @results, $@ unless $r_ok;
2332             }
2333 0           last;
2334             }
2335              
2336 0 0         if ($self->{_telemetry}) {
2337 0           my $elapsed_ms = (Time::HiRes::time() - $start_time) * 1000;
2338 0           $self->{_telemetry}->record_pipeline($count, $elapsed_ms);
2339             }
2340              
2341 0           return \@results;
2342             }
2343              
2344             1;
2345              
2346             __END__