File Coverage

blib/lib/Async/Redis/Subscription.pm
Criterion Covered Total %
statement 77 319 24.1
branch 13 176 7.3
condition 3 74 4.0
subroutine 20 43 46.5
pod 14 15 93.3
total 127 627 20.2


line stmt bran cond sub pod time code
1             package Async::Redis::Subscription;
2              
3 91     91   467 use strict;
  91         138  
  91         3185  
4 91     91   329 use warnings;
  91         131  
  91         3497  
5 91     91   1093 use 5.018;
  91         238  
6              
7 91     91   396 use Carp ();
  91         149  
  91         1263  
8 91     91   291 use Future;
  91         167  
  91         1610  
9 91     91   292 use Future::AsyncAwait;
  91         146  
  91         470  
10 91     91   3614 use Future::IO;
  91         164  
  91         4334  
11 91     91   388 use Scalar::Util qw(blessed refaddr weaken);
  91         147  
  91         5149  
12              
13              
14             # Threshold for periodic event-loop yield inside the callback driver
15             # loop. Prevents stack growth when many messages are pre-queued and
16             # await on an already-ready Future returns synchronously.
17 91     91   412 use constant MAX_SYNC_DEPTH => 32;
  91         174  
  91         433822  
18              
19             sub new {
20 4     4 1 26 my ($class, %args) = @_;
21              
22             return bless {
23             redis => $args{redis},
24 4         41 channels => {}, # channel => 1 (for regular subscribe)
25             patterns => {}, # pattern => 1 (for psubscribe)
26             sharded_channels => {}, # channel => 1 (for ssubscribe)
27             _pending_messages => [], # Queued messages for iterator consumers
28             _message_waiter => undef, # Future signalled when a message arrives
29             _slot_waiter => undef, # Future signalled when queue drains below depth
30             _fatal_error => undef, # Typed error set by _fail_fatal
31             _on_reconnect => undef, # Callback for reconnect notification
32             _on_message => undef, # Message-arrived callback (callback mode)
33             _on_error => undef, # Fatal-error callback
34             _driver_step => undef, # Running driver loop Future (owned by _tasks selector)
35             _closed => 0,
36             _paused => 0, # Set during reconnect; clears in _resume_after_reconnect
37             }, $class;
38             }
39              
40             # Set/get reconnect callback
41             sub on_reconnect {
42 3     3 1 9 my ($self, $cb) = @_;
43 3 100       6 $self->{_on_reconnect} = $cb if @_ > 1;
44 3         9 return $self->{_on_reconnect};
45             }
46              
47             # Set/get message-arrived callback. Once set, next() croaks — the
48             # subscription is in callback mode for the rest of its lifetime.
49             # $cb->($sub, $msg) receives the Subscription and the message hashref.
50             sub on_message {
51 0     0 1 0 my ($self, $cb) = @_;
52 0 0       0 if (@_ > 1) {
53 0 0 0     0 if (!$cb && $self->{_on_message}) {
54 0         0 Carp::croak(
55             "on_message is sticky; cannot clear once set "
56             . "(construct a new Subscription for iterator mode)"
57             );
58             }
59 0         0 $self->{_on_message} = $cb;
60             # If the subscription already has channels and is open, start
61             # the driver. If not, it'll be started when channels are added.
62 0 0       0 $self->_start_driver if $cb;
63             }
64 0         0 return $self->{_on_message};
65             }
66              
67             # Set/get fatal-error callback. Fires once per fatal error; default
68             # (when unset) is to die so silent death is impossible.
69             # $cb->($sub, $err) receives the Subscription and the error.
70             sub on_error {
71 0     0 1 0 my ($self, $cb) = @_;
72 0 0       0 if (@_ > 1) {
73 0         0 $self->{_on_error} = $cb;
74             }
75 0         0 return $self->{_on_error};
76             }
77              
78             # Invoke a user-supplied callback with the standard exception-handling
79             # policy: save/restore $@, use eval-and-check-boolean idiom to survive
80             # DESTROY side effects, and route die to the fatal-error handler.
81             # Returns the callback's return value. Task 7 wires backpressure: if
82             # the return is a Future the driver will await it before the next
83             # read. Task 6's driver does not yet consume the return value.
84             sub _invoke_user_callback {
85 0     0   0 my ($self, $cb, $msg) = @_;
86 0         0 local $@;
87 0         0 my $result;
88 0         0 my $ok = eval {
89 0         0 $result = $cb->($self, $msg);
90 0         0 1;
91             };
92 0 0       0 unless ($ok) {
93 0   0     0 my $err = $@ // 'unknown error';
94 0         0 $self->_handle_fatal_error("on_message callback died: $err");
95 0         0 return undef;
96             }
97 0         0 return $result;
98             }
99              
100             # Single chokepoint for fatal errors from either the read loop or the
101             # callback path. Closes the subscription, fires on_error if registered,
102             # and dies loudly if not. Loud-by-default prevents silent zombies.
103             sub _handle_fatal_error {
104 0     0   0 my ($self, $err) = @_;
105 0         0 $self->_close;
106 0 0       0 if (my $cb = $self->{_on_error}) {
107 0         0 local $@;
108 0         0 my $ok = eval { $cb->($self, $err); 1 };
  0         0  
  0         0  
109 0 0       0 unless ($ok) {
110 0   0     0 Carp::carp("on_error callback died: " . ($@ // 'unknown error'));
111             }
112 0         0 return;
113             }
114 0         0 die $err;
115             }
116              
117             # Track a channel subscription
118             sub _add_channel {
119 3     3   16 my ($self, $channel) = @_;
120 3         7 $self->{channels}{$channel} = 1;
121 3         9 $self->_start_driver;
122             }
123              
124             sub _add_pattern {
125 1     1   4 my ($self, $pattern) = @_;
126 1         3 $self->{patterns}{$pattern} = 1;
127 1         2 $self->_start_driver;
128             }
129              
130             sub _add_sharded_channel {
131 0     0   0 my ($self, $channel) = @_;
132 0         0 $self->{sharded_channels}{$channel} = 1;
133 0         0 $self->_start_driver;
134             }
135              
136             sub _remove_channel {
137 0     0   0 my ($self, $channel) = @_;
138 0         0 delete $self->{channels}{$channel};
139             }
140              
141             sub _remove_pattern {
142 0     0   0 my ($self, $pattern) = @_;
143 0         0 delete $self->{patterns}{$pattern};
144             }
145              
146             sub _remove_sharded_channel {
147 0     0   0 my ($self, $channel) = @_;
148 0         0 delete $self->{sharded_channels}{$channel};
149             }
150              
151             # List subscribed channels/patterns
152 2     2 1 2 sub channels { keys %{shift->{channels}} }
  2         7  
153 2     2 1 2 sub patterns { keys %{shift->{patterns}} }
  2         5  
154 2     2 1 3 sub sharded_channels { keys %{shift->{sharded_channels}} }
  2         4  
155              
156             sub channel_count {
157 1     1 1 2 my ($self) = @_;
158 1         3 return scalar(keys %{$self->{channels}})
159 1         2 + scalar(keys %{$self->{patterns}})
160 1         2 + scalar(keys %{$self->{sharded_channels}});
  1         5  
161             }
162              
163             # Internal dequeue: wait for a message from the queue, dequeue it, and
164             # signal _slot_waiter so any pending _dispatch_frame can proceed. Used by
165             # both next() (iterator mode) and _start_driver (callback mode driver loop).
166             # Returns undef on clean close; dies with typed error on fatal close.
167 0     0   0 async sub _dequeue {
168 0         0 my ($self, $exit_on_pause) = @_;
169              
170             # Iterator mode (default): pause is transient. Block through it and
171             # return real messages once the driver resumes.
172             #
173             # Callback driver (exit_on_pause=1): exit cleanly on pause so the
174             # driver task can be restarted after reconnect without two drivers
175             # racing. _pause_for_reconnect wakes any in-flight _message_waiter
176             # via done() so this path can exit promptly.
177 0         0 while (!@{$self->{_pending_messages}}) {
  0         0  
178 0 0       0 die $self->{_fatal_error} if $self->{_fatal_error};
179 0 0       0 return undef if $self->{_closed};
180 0 0 0     0 return undef if $exit_on_pause && $self->{_paused};
181 0   0     0 $self->{_message_waiter} //= Future->new;
182 0         0 await $self->{_message_waiter};
183 0         0 delete $self->{_message_waiter};
184             }
185              
186 0 0       0 die $self->{_fatal_error} if $self->{_fatal_error};
187 0 0 0     0 return undef if $self->{_closed} && !@{$self->{_pending_messages}};
  0         0  
188             return undef
189 0 0 0     0 if $exit_on_pause && $self->{_paused} && !@{$self->{_pending_messages}};
  0   0     0  
190              
191 0         0 my $msg = shift @{$self->{_pending_messages}};
  0         0  
192 0 0       0 if (my $w = delete $self->{_slot_waiter}) {
193 0 0       0 $w->done unless $w->is_ready;
194             }
195 0         0 return $msg;
196             }
197              
198             # Receive next message (async iterator pattern). Waits on the queue
199             # populated by _run_reader via _dispatch_frame. Returns undef on clean
200             # close; dies with the typed error on fatal close.
201 0     0 1 0 async sub next {
202 0         0 my ($self) = @_;
203              
204             # Exclusivity check: callback mode disables iterator mode.
205 0 0       0 if ($self->{_on_message}) {
206 0         0 Carp::croak("Cannot call next() on a callback-driven subscription");
207             }
208              
209             # In iterator mode the unified reader (_run_reader) feeds the queue.
210             # The reader is already running (started by _pubsub_command during
211             # subscribe). No separate driver start is needed.
212              
213 0         0 return await $self->_dequeue;
214             }
215              
216             # Read one pub/sub frame from the underlying connection. On transient
217             # read error, attempt reconnect if enabled and fire on_reconnect on
218             # success; on unrecoverable failure, propagate the error.
219             # Returns a Future resolving to the raw frame (arrayref) or undef if
220             # the connection is gone and no more frames are available.
221             # Shared by next() and the callback driver loop added in a later task.
222 2     2   388 async sub _read_frame_with_reconnect {
223 2         4 my ($self) = @_;
224 2         4 my $redis = $self->{redis};
225              
226 2         2 while (1) {
227 2         3 my $frame;
228 2         2 my $ok = eval {
229 2         8 $frame = await $redis->_read_pubsub_frame;
230 0         0 1;
231             };
232              
233 2 50       123 unless ($ok) {
234 2         4 my $error = $@;
235 2 100 66     11 if ($redis->{reconnect} && $self->channel_count > 0) {
236 1         2 my $reconnect_error;
237 1         2 my $reconnect_ok = eval {
238 1         4 await $redis->_reconnect_pubsub;
239 0         0 1;
240             };
241 1 50       26 $reconnect_error = $@ unless $reconnect_ok;
242 1 50       3 unless ($reconnect_ok) {
243 1         3 die $reconnect_error;
244             }
245              
246 0 0       0 if ($self->{_on_reconnect}) {
247 0         0 $self->{_on_reconnect}->($self);
248             }
249              
250 0         0 next;
251             }
252 1         3 die $error;
253             }
254              
255 0         0 return $frame;
256             }
257             }
258              
259             # Convert a raw RESP pub/sub frame into a message hashref and deliver it.
260             # In callback mode, invokes _on_message via _invoke_user_callback and
261             # returns its result (which may be a Future for consumer-side backpressure).
262             # In iterator mode, queues the message into _pending_messages and signals
263             # _message_waiter so a blocked next() can wake up.
264             #
265             # Non-message frames (subscribe confirmations, etc.) return undef and
266             # take no action — the driver loop will read the next frame.
267             sub _dispatch_frame {
268 0     0   0 my ($self, $frame) = @_;
269 0 0 0     0 return unless $frame && ref $frame eq 'ARRAY';
270              
271 0   0     0 my $type = $frame->[0] // '';
272 0         0 my $msg;
273              
274 0 0       0 if ($type eq 'message') {
    0          
    0          
275 0         0 $msg = {
276             type => 'message',
277             channel => $frame->[1],
278             pattern => undef,
279             data => $frame->[2],
280             };
281             }
282             elsif ($type eq 'pmessage') {
283 0         0 $msg = {
284             type => 'pmessage',
285             pattern => $frame->[1],
286             channel => $frame->[2],
287             data => $frame->[3],
288             };
289             }
290             elsif ($type eq 'smessage') {
291 0         0 $msg = {
292             type => 'smessage',
293             channel => $frame->[1],
294             pattern => undef,
295             data => $frame->[2],
296             };
297             }
298             else {
299 0         0 return undef; # non-message frame (subscribe confirmation, etc.)
300             }
301              
302             # Queue the message for consumption by next() (iterator mode) or the
303             # callback driver loop (callback mode). The driver invokes _on_message;
304             # _dispatch_frame is intentionally agnostic about the consumption mode.
305             # This keeps backpressure uniform: the depth limit applies to both modes.
306 0 0       0 return if $self->{_closed};
307              
308 0         0 my $redis = $self->{redis};
309             my $depth = ($redis && $redis->{message_queue_depth})
310             ? $redis->{message_queue_depth}
311 0 0 0     0 : 0; # 0 = unbounded (default)
312              
313 0 0 0     0 if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
  0         0  
314             # Queue full. Return a Future that queues the message once a slot
315             # opens (signalled by next() calling _slot_waiter->done).
316 0   0     0 $self->{_slot_waiter} //= Future->new;
317 0         0 my $slot = $self->{_slot_waiter};
318 0         0 weaken(my $weak = $self);
319             return $slot->then(sub {
320 0 0 0 0   0 return Future->done if !$weak || $weak->{_closed};
321 0         0 push @{$weak->{_pending_messages}}, $msg;
  0         0  
322 0 0       0 if (my $w = delete $weak->{_message_waiter}) {
323 0 0       0 $w->done unless $w->is_ready;
324             }
325 0         0 Future->done;
326 0         0 });
327             }
328              
329 0         0 push @{$self->{_pending_messages}}, $msg;
  0         0  
330 0 0       0 if (my $w = delete $self->{_message_waiter}) {
331 0 0       0 $w->done unless $w->is_ready;
332             }
333 0         0 return undef;
334             }
335              
336             # The callback-mode driver loop. Consumes from _pending_messages via
337             # _dequeue (populated by _run_reader's dispatch path), invokes the
338             # user's _on_message callback, and awaits its returned Future if any
339             # for consumer-opted backpressure.
340             #
341             # Exits cleanly when _dequeue returns undef (subscription closed or
342             # paused for reconnect). Dies with the typed error if _dequeue dies
343             # (fatal); _run_driver's Future failure is visible through the
344             # client's Future::Selector to any caller using run_until_ready.
345             #
346             # Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
347             # stack growth when messages are pre-queued and await returns
348             # synchronously from an already-ready Future.
349 0     0   0 async sub _run_driver {
350 0         0 my ($self) = @_;
351 0         0 my $iter = 0;
352 0   0     0 while (!$self->{_closed} && !$self->{_paused}) {
353 0         0 my $msg;
354 0         0 my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
  0         0  
  0         0  
355 0 0       0 unless ($deq_ok) {
356 0         0 my $err = $@;
357             # _fail_fatal already set _closed and fired on_error; don't
358             # double-fire. Any other propagation path routes through
359             # _handle_fatal_error.
360 0 0 0     0 return if $self->{_closed} || $self->{_paused};
361 0         0 $self->_handle_fatal_error($err);
362 0         0 return;
363             }
364 0 0       0 last unless defined $msg;
365 0 0 0     0 last if $self->{_closed} || $self->{_paused};
366              
367 0 0       0 my $cb = $self->{_on_message} or last;
368 0         0 my $result = $self->_invoke_user_callback($cb, $msg);
369              
370 0 0 0     0 if (blessed($result) && $result->isa('Future')) {
371 0         0 my $cb_ok = eval { await $result; 1 };
  0         0  
  0         0  
372 0 0       0 unless ($cb_ok) {
373 0         0 my $err = $@;
374 0 0 0     0 return if $self->{_closed} || $self->{_paused};
375 0         0 $self->_handle_fatal_error(
376             "on_message callback Future failed: $err"
377             );
378 0         0 return;
379             }
380             }
381              
382             # Periodic yield prevents stack blowup when pre-queued messages
383             # resolve await synchronously.
384 0 0       0 await Future::IO->sleep(0) if ++$iter % MAX_SYNC_DEPTH == 0;
385             }
386             }
387              
388             # Start the driver if not already running. Idempotent.
389             # Only starts when _on_message is set (callback mode). Iterator mode
390             # consumers call next() directly — no driver loop needed.
391             #
392             # Ownership: the driver Future is added to the client's Future::Selector
393             # ($redis->{_tasks}) and stored in $self->{_driver_step}. The selector
394             # owns the task; the slot is the dedup signal. on_ready clears the slot
395             # regardless of outcome. No ->retain.
396             sub _start_driver {
397 4     4   7 my ($self, $force) = @_;
398 4 50 33     9 return if $self->{_driver_step} && !$self->{_driver_step}->is_ready;
399 4 50       10 return unless $self->{_on_message}; # only callback mode needs a driver
400 0 0       0 return if $self->{_closed};
401 0 0       0 return if $self->{_paused};
402 0 0       0 return unless $self->channel_count > 0;
403              
404 0 0       0 my $redis = $self->{redis} or return;
405              
406 0         0 my $f = $self->_run_driver;
407 0         0 $self->{_driver_step} = $f;
408 0         0 $redis->{_tasks}->add(data => 'subscription-driver', f => $f);
409 0     0   0 $f->on_ready(sub { $self->{_driver_step} = undef });
  0         0  
410 0         0 return;
411             }
412              
413             # Backward-compatible wrapper
414 0     0 1 0 async sub next_message {
415 0         0 my ($self) = @_;
416 0         0 my $msg = await $self->next();
417 0 0       0 return undef unless $msg;
418              
419             # Convert new format to old format for compatibility
420             return {
421             channel => $msg->{channel},
422             message => $msg->{data},
423             pattern => $msg->{pattern},
424             type => $msg->{type},
425 0         0 };
426             }
427              
428             # Intentional teardown: marks the subscription closed and wakes any
429             # blocked next() with undef. Clears the parent _subscription slot
430             # with an identity guard so a stale _close cannot evict a newer
431             # subscription object that reused the same slot.
432             sub _close {
433 0     0   0 my ($self) = @_;
434 0 0       0 return if $self->{_closed};
435 0         0 $self->{_closed} = 1;
436              
437 0         0 $self->{_pending_messages} = [];
438              
439 0 0       0 if (my $w = delete $self->{_message_waiter}) {
440 0 0       0 $w->done unless $w->is_ready;
441             }
442 0 0       0 if (my $w = delete $self->{_slot_waiter}) {
443 0 0       0 $w->done unless $w->is_ready;
444             }
445              
446             # Identity-guarded parent-slot clear.
447 0         0 my $redis = $self->{redis};
448 0 0 0     0 if ($redis && defined $redis->{_subscription}
      0        
449             && refaddr($redis->{_subscription}) == refaddr($self)) {
450 0         0 delete $redis->{_subscription};
451             }
452              
453             # Cancel any running driver Future. The driver's await on _dequeue
454             # also unwinds because we resolved _message_waiter above, so this is
455             # belt-and-suspenders; either path exits the driver cleanly.
456 0 0       0 if (my $f = delete $self->{_driver_step}) {
457 0 0       0 $f->cancel unless $f->is_ready;
458             }
459             }
460              
461             # Unrecoverable failure: marks the subscription closed with a typed
462             # error. Any blocked next() will die with that error. The error is
463             # preserved for callers who call next() after the fact.
464             # In callback mode, fires on_error if registered; dies otherwise.
465             sub _fail_fatal {
466 0     0   0 my ($self, $typed_error) = @_;
467 0 0       0 return if $self->{_closed};
468 0         0 $self->{_closed} = 1;
469 0         0 $self->{_fatal_error} = $typed_error;
470              
471 0         0 $self->{_pending_messages} = [];
472              
473 0 0       0 if (my $w = delete $self->{_message_waiter}) {
474 0 0       0 $w->fail($typed_error) unless $w->is_ready;
475             }
476 0 0       0 if (my $w = delete $self->{_slot_waiter}) {
477 0 0       0 $w->done unless $w->is_ready;
478             }
479              
480             # Identity-guarded parent-slot clear.
481 0         0 my $redis = $self->{redis};
482 0 0 0     0 if ($redis && defined $redis->{_subscription}
      0        
483             && refaddr($redis->{_subscription}) == refaddr($self)) {
484 0         0 delete $redis->{_subscription};
485             }
486              
487             # Cancel any running driver Future. _message_waiter was failed with
488             # the typed error above, so driver's _dequeue also dies with the
489             # typed error; cancel is belt-and-suspenders.
490 0 0       0 if (my $f = delete $self->{_driver_step}) {
491 0 0       0 $f->cancel unless $f->is_ready;
492             }
493              
494             # Notify callback-mode consumers of the fatal error. In iterator mode
495             # the caller detects it via die from next(). Loud-by-default: if
496             # no on_error is registered in callback mode, die so silent death
497             # of a listener is impossible.
498 0 0       0 if (my $cb = $self->{_on_error}) {
499 0         0 local $@;
500 0         0 my $ok = eval { $cb->($self, $typed_error); 1 };
  0         0  
  0         0  
501 0 0       0 unless ($ok) {
502 0   0     0 Carp::carp("on_error callback died: " . ($@ // 'unknown error'));
503             }
504 0         0 return;
505             }
506             # In iterator mode (no callback), callers discover the error via next().
507             # In callback mode with no on_error, die loudly.
508 0 0       0 if ($self->{_on_message}) {
509 0         0 die $typed_error;
510             }
511             }
512              
513             # Called before a reconnect attempt. Does NOT mark the subscription
514             # closed — the reader has already exited (connection dropped). Channels
515             # and patterns remain in their tracking hashes for replay via
516             # _resume_after_reconnect.
517             #
518             # Fixes a latent "two drivers after reconnect" bug from the closure-based
519             # driver era: clearing the driver slot without cancelling left the old
520             # driver suspended on _dequeue. After _resume_after_reconnect started a
521             # new driver, both raced. The fix: cancel the Future explicitly, and
522             # wake _dequeue via the _paused flag so its await exits cleanly.
523             sub _pause_for_reconnect {
524 0     0   0 my ($self) = @_;
525 0         0 $self->{_paused} = 1;
526              
527             # Wake any suspended _dequeue so the driver's while-loop exits.
528 0 0       0 if (my $w = delete $self->{_message_waiter}) {
529 0 0       0 $w->done unless $w->is_ready;
530             }
531              
532             # Cancel the driver Future. F::AA's continuation stops; the Future
533             # becomes cancelled; the selector's on_ready fires and removes the
534             # item; our on_ready fires and clears _driver_step.
535 0 0       0 if (my $f = delete $self->{_driver_step}) {
536 0 0       0 $f->cancel unless $f->is_ready;
537             }
538 0         0 return;
539             }
540              
541             # Replays all tracked subscriptions on a freshly reconnected socket.
542             # Sets in_pubsub=1 BEFORE sending SUBSCRIBE/PSUBSCRIBE/SSUBSCRIBE so
543             # racing message frames classify correctly (mirrors initial-subscribe timing).
544 0     0   0 async sub _resume_after_reconnect {
545 0         0 my ($self) = @_;
546 0 0       0 my $redis = $self->{redis} or return;
547              
548             # Clear the paused flag so _dequeue and _run_driver don't immediately
549             # exit when the new driver starts.
550 0         0 $self->{_paused} = 0;
551              
552             # Set in_pubsub before issuing any commands so the unified reader
553             # classifies incoming message frames correctly (mirrors subscribe timing).
554 0         0 $redis->{in_pubsub} = 1;
555              
556 0         0 my @channels = keys %{$self->{channels}};
  0         0  
557 0         0 my @patterns = keys %{$self->{patterns}};
  0         0  
558 0         0 my @sharded = keys %{$self->{sharded_channels}};
  0         0  
559              
560             # Route each replay command through the write gate and unified reader
561             # so confirmations are matched via the inflight queue.
562 0         0 for my $ch (@channels) { await $redis->_pubsub_command('SUBSCRIBE', $ch) }
  0         0  
563 0         0 for my $p (@patterns) { await $redis->_pubsub_command('PSUBSCRIBE', $p) }
  0         0  
564 0         0 for my $ch (@sharded) { await $redis->_pubsub_command('SSUBSCRIBE', $ch) }
  0         0  
565              
566 0 0       0 if (my $cb = $self->{_on_reconnect}) {
567 0         0 $cb->($self);
568             }
569              
570             # Restart the driver in whichever mode the subscription is in.
571 0 0       0 $self->_start_driver($self->{_on_message} ? 0 : 1);
572             }
573              
574             # Unsubscribe from specific channels
575 0     0 1 0 async sub unsubscribe {
576 0         0 my ($self, @channels) = @_;
577              
578 0 0       0 return if $self->{_closed};
579              
580 0         0 my $redis = $self->{redis};
581              
582             # Resolve the list of channels to unsubscribe: explicit list or all.
583 0 0       0 my @to_remove = @channels ? @channels : $self->channels;
584              
585             # Issue one UNSUBSCRIBE per channel through the write gate and unified
586             # reader so each confirmation is properly matched to an inflight entry.
587 0         0 for my $ch (@to_remove) {
588 0         0 await $redis->_pubsub_command('UNSUBSCRIBE', $ch);
589 0         0 $self->_remove_channel($ch);
590             }
591              
592             # If no subscriptions remain, close and exit pubsub mode
593 0 0       0 if ($self->channel_count == 0) {
594 0         0 $self->_close;
595             }
596              
597 0         0 return $self;
598             }
599              
600             # Unsubscribe from patterns
601 0     0 1 0 async sub punsubscribe {
602 0         0 my ($self, @patterns) = @_;
603              
604 0 0       0 return if $self->{_closed};
605              
606 0         0 my $redis = $self->{redis};
607              
608 0 0       0 my @to_remove = @patterns ? @patterns : $self->patterns;
609              
610 0         0 for my $p (@to_remove) {
611 0         0 await $redis->_pubsub_command('PUNSUBSCRIBE', $p);
612 0         0 $self->_remove_pattern($p);
613             }
614              
615 0 0       0 if ($self->channel_count == 0) {
616 0         0 $self->_close;
617             }
618              
619 0         0 return $self;
620             }
621              
622             # Unsubscribe from sharded channels
623 0     0 1 0 async sub sunsubscribe {
624 0         0 my ($self, @channels) = @_;
625              
626 0 0       0 return if $self->{_closed};
627              
628 0         0 my $redis = $self->{redis};
629              
630 0 0       0 my @to_remove = @channels ? @channels : $self->sharded_channels;
631              
632 0         0 for my $ch (@to_remove) {
633 0         0 await $redis->_pubsub_command('SUNSUBSCRIBE', $ch);
634 0         0 $self->_remove_sharded_channel($ch);
635             }
636              
637 0 0       0 if ($self->channel_count == 0) {
638 0         0 $self->_close;
639             }
640              
641 0         0 return $self;
642             }
643              
644              
645 0     0 1 0 sub is_closed { shift->{_closed} }
646              
647             # Get all subscriptions for reconnect replay
648             sub get_replay_commands {
649 2     2 0 7 my ($self) = @_;
650              
651 2         4 my @commands;
652              
653 2         6 my @channels = $self->channels;
654 2 50       7 push @commands, ['SUBSCRIBE', @channels] if @channels;
655              
656 2         7 my @patterns = $self->patterns;
657 2 100       4 push @commands, ['PSUBSCRIBE', @patterns] if @patterns;
658              
659 2         6 my @sharded = $self->sharded_channels;
660 2 50       4 push @commands, ['SSUBSCRIBE', @sharded] if @sharded;
661              
662 2         6 return @commands;
663             }
664              
665             1;
666              
667             __END__