File Coverage

blib/lib/Async/Redis/Subscription.pm
Criterion Covered Total %
statement 66 264 25.0
branch 9 110 8.1
condition 1 45 2.2
subroutine 19 38 50.0
pod 3 15 20.0
total 98 472 20.7


line stmt bran cond sub pod time code
1             package Async::Redis::Subscription;
2              
3 75     75   454 use strict;
  75         153  
  75         2520  
4 75     75   316 use warnings;
  75         116  
  75         2834  
5 75     75   961 use 5.018;
  75         227  
6              
7 75     75   288 use Carp ();
  75         106  
  75         1084  
8 75     75   214 use Future;
  75         123  
  75         1309  
9 75     75   242 use Future::AsyncAwait;
  75         128  
  75         417  
10 75     75   3166 use Future::IO;
  75         145  
  75         2939  
11 75     75   297 use Scalar::Util ();
  75         147  
  75         4152  
12              
13             our $VERSION = '0.001';
14              
15             # Synchronous recursion depth for the callback driver loop. See
16             # _start_driver. Package-level so local() can scope it dynamically —
17             # local() cannot be applied to lexicals.
18             our $SYNC_DEPTH = 0;
19 75     75   330 use constant MAX_SYNC_DEPTH => 32;
  75         127  
  75         297356  
20              
21             sub new {
22 3     3 1 26 my ($class, %args) = @_;
23              
24             return bless {
25             redis => $args{redis},
26 3         34 channels => {}, # channel => 1 (for regular subscribe)
27             patterns => {}, # pattern => 1 (for psubscribe)
28             sharded_channels => {}, # channel => 1 (for ssubscribe)
29             _message_queue => [], # Buffer for messages
30             _waiters => [], # Futures waiting for messages
31             _on_reconnect => undef, # Callback for reconnect notification
32             _on_message => undef, # Message-arrived callback (Task 3)
33             _on_error => undef, # Fatal-error callback (Task 3)
34             _driver_step => undef, # Running driver loop closure (callback mode)
35             _current_read => undef, # Strong ref to in-flight read Future (F::AA GC pin)
36             _closed => 0,
37             }, $class;
38             }
39              
40             # Set/get reconnect callback
41             sub on_reconnect {
42 3     3 0 10 my ($self, $cb) = @_;
43 3 100       7 $self->{_on_reconnect} = $cb if @_ > 1;
44 3         9 return $self->{_on_reconnect};
45             }
46              
47             # Set/get message-arrived callback. Once set, next() croaks — the
48             # subscription is in callback mode for the rest of its lifetime.
49             # $cb->($sub, $msg) receives the Subscription and the message hashref.
50             sub on_message {
51 0     0 1 0 my ($self, $cb) = @_;
52 0 0       0 if (@_ > 1) {
53 0 0 0     0 if (!$cb && $self->{_on_message}) {
54 0         0 Carp::croak(
55             "on_message is sticky; cannot clear once set "
56             . "(construct a new Subscription for iterator mode)"
57             );
58             }
59 0         0 $self->{_on_message} = $cb;
60             # If the subscription already has channels and is open, start
61             # the driver. If not, it'll be started when channels are added.
62 0 0       0 $self->_start_driver if $cb;
63             }
64 0         0 return $self->{_on_message};
65             }
66              
67             # Set/get fatal-error callback. Fires once per fatal error; default
68             # (when unset) is to die so silent death is impossible.
69             # $cb->($sub, $err) receives the Subscription and the error.
70             sub on_error {
71 0     0 0 0 my ($self, $cb) = @_;
72 0 0       0 if (@_ > 1) {
73 0         0 $self->{_on_error} = $cb;
74             }
75 0         0 return $self->{_on_error};
76             }
77              
78             # Invoke a user-supplied callback with the standard exception-handling
79             # policy: save/restore $@, use eval-and-check-boolean idiom to survive
80             # DESTROY side effects, and route die to the fatal-error handler.
81             # Returns the callback's return value. Task 7 wires backpressure: if
82             # the return is a Future the driver will await it before the next
83             # read. Task 6's driver does not yet consume the return value.
84             sub _invoke_user_callback {
85 0     0   0 my ($self, $cb, $msg) = @_;
86 0         0 local $@;
87 0         0 my $result;
88 0         0 my $ok = eval {
89 0         0 $result = $cb->($self, $msg);
90 0         0 1;
91             };
92 0 0       0 unless ($ok) {
93 0   0     0 my $err = $@ // 'unknown error';
94 0         0 $self->_handle_fatal_error("on_message callback died: $err");
95 0         0 return undef;
96             }
97 0         0 return $result;
98             }
99              
100             # Single chokepoint for fatal errors from either the read loop or the
101             # callback path. Closes the subscription, fires on_error if registered,
102             # and dies loudly if not. Loud-by-default prevents silent zombies.
103             sub _handle_fatal_error {
104 0     0   0 my ($self, $err) = @_;
105 0         0 $self->_close;
106 0 0       0 if (my $cb = $self->{_on_error}) {
107 0         0 local $@;
108 0         0 my $ok = eval { $cb->($self, $err); 1 };
  0         0  
  0         0  
109 0 0       0 unless ($ok) {
110 0   0     0 Carp::carp("on_error callback died: " . ($@ // 'unknown error'));
111             }
112 0         0 return;
113             }
114 0         0 die $err;
115             }
116              
117             # Track a channel subscription
118             sub _add_channel {
119 2     2   10 my ($self, $channel) = @_;
120 2         5 $self->{channels}{$channel} = 1;
121 2         5 $self->_start_driver;
122             }
123              
124             sub _add_pattern {
125 1     1   6 my ($self, $pattern) = @_;
126 1         2 $self->{patterns}{$pattern} = 1;
127 1         3 $self->_start_driver;
128             }
129              
130             sub _add_sharded_channel {
131 0     0   0 my ($self, $channel) = @_;
132 0         0 $self->{sharded_channels}{$channel} = 1;
133 0         0 $self->_start_driver;
134             }
135              
136             sub _remove_channel {
137 0     0   0 my ($self, $channel) = @_;
138 0         0 delete $self->{channels}{$channel};
139             }
140              
141             sub _remove_pattern {
142 0     0   0 my ($self, $pattern) = @_;
143 0         0 delete $self->{patterns}{$pattern};
144             }
145              
146             sub _remove_sharded_channel {
147 0     0   0 my ($self, $channel) = @_;
148 0         0 delete $self->{sharded_channels}{$channel};
149             }
150              
151             # List subscribed channels/patterns
152 1     1 0 2 sub channels { keys %{shift->{channels}} }
  1         4  
153 1     1 0 101 sub patterns { keys %{shift->{patterns}} }
  1         5  
154 1     1 0 2 sub sharded_channels { keys %{shift->{sharded_channels}} }
  1         3  
155              
156             sub channel_count {
157 0     0 0 0 my ($self) = @_;
158 0         0 return scalar(keys %{$self->{channels}})
159 0         0 + scalar(keys %{$self->{patterns}})
160 0         0 + scalar(keys %{$self->{sharded_channels}});
  0         0  
161             }
162              
163             # Receive next message (async iterator pattern)
164 0     0 1 0 async sub next {
165 0         0 my ($self) = @_;
166              
167 0 0       0 return undef if $self->{_closed};
168              
169             # Exclusivity check: callback mode disables iterator mode.
170             # (The _on_message slot is initialized in new(); inert until Task 3.)
171 0 0       0 if ($self->{_on_message}) {
172 0         0 Carp::croak("Cannot call next() on a callback-driven subscription");
173             }
174              
175 0 0       0 if (@{$self->{_message_queue}}) {
  0         0  
176 0         0 return shift @{$self->{_message_queue}};
  0         0  
177             }
178              
179 0         0 while (1) {
180 0         0 my $frame = await $self->_read_frame_with_reconnect;
181 0 0       0 last unless $frame;
182 0         0 $self->_dispatch_frame($frame);
183             # _dispatch_frame buffers the message into _message_queue (when
184             # on_message is unset — which it must be in this branch since
185             # the exclusivity check above throws otherwise). Pull from queue.
186 0 0       0 if (@{$self->{_message_queue}}) {
  0         0  
187 0         0 return shift @{$self->{_message_queue}};
  0         0  
188             }
189             # Otherwise it was a non-message frame; loop for another.
190             }
191              
192 0         0 return undef;
193             }
194              
195             # Read one pub/sub frame from the underlying connection. On transient
196             # read error, attempt reconnect if enabled and fire on_reconnect on
197             # success; on unrecoverable failure, propagate the error.
198             # Returns a Future resolving to the raw frame (arrayref) or undef if
199             # the connection is gone and no more frames are available.
200             # Shared by next() and the callback driver loop added in a later task.
201 1     1   383 async sub _read_frame_with_reconnect {
202 1         3 my ($self) = @_;
203 1         2 my $redis = $self->{redis};
204              
205 1         2 while (1) {
206 1         1 my $frame;
207 1         2 my $ok = eval {
208 1         5 $frame = await $redis->_read_pubsub_frame;
209 0         0 1;
210             };
211              
212 1 50       90 unless ($ok) {
213 1         2 my $error = $@;
214 1 50 33     3 if ($redis->{reconnect} && $self->channel_count > 0) {
215 0         0 my $reconnect_ok = eval {
216 0         0 await $redis->_reconnect_pubsub;
217 0         0 1;
218             };
219 0 0       0 unless ($reconnect_ok) {
220 0         0 die $error;
221             }
222              
223 0 0       0 if ($self->{_on_reconnect}) {
224 0         0 $self->{_on_reconnect}->($self);
225             }
226              
227 0         0 next;
228             }
229 1         4 die $error;
230             }
231              
232 0         0 return $frame;
233             }
234             }
235              
236             # Convert a raw RESP pub/sub frame into a message hashref and deliver it.
237             # When on_message is set (callback mode), invoke the callback via
238             # _invoke_user_callback and return its result (which may be a Future
239             # for consumer-side backpressure). Otherwise buffer the message via
240             # _deliver_message for next()/iterator consumers and return undef.
241             #
242             # Non-message frames (subscribe confirmations, etc.) return undef and
243             # take no action — the caller's loop will read another frame.
244             sub _dispatch_frame {
245 0     0   0 my ($self, $frame) = @_;
246 0 0 0     0 return unless $frame && ref $frame eq 'ARRAY';
247              
248 0   0     0 my $type = $frame->[0] // '';
249 0         0 my $msg;
250              
251 0 0       0 if ($type eq 'message') {
    0          
    0          
252 0         0 $msg = {
253             type => 'message',
254             channel => $frame->[1],
255             pattern => undef,
256             data => $frame->[2],
257             };
258             }
259             elsif ($type eq 'pmessage') {
260 0         0 $msg = {
261             type => 'pmessage',
262             pattern => $frame->[1],
263             channel => $frame->[2],
264             data => $frame->[3],
265             };
266             }
267             elsif ($type eq 'smessage') {
268 0         0 $msg = {
269             type => 'smessage',
270             channel => $frame->[1],
271             pattern => undef,
272             data => $frame->[2],
273             };
274             }
275             else {
276 0         0 return undef; # non-message frame
277             }
278              
279 0 0       0 if (my $cb = $self->{_on_message}) {
280 0         0 return $self->_invoke_user_callback($cb, $msg);
281             }
282              
283 0         0 $self->_deliver_message($msg);
284 0         0 return undef;
285             }
286              
287             # Start the callback driver loop if not already running. Idempotent.
288             # Runs while: on_message is set AND channel_count > 0 AND !_closed.
289             # Uses weak refs on $self and $step to break cycles so DESTROY fires
290             # promptly when external refs drop. Uses local($SYNC_DEPTH) to bound
291             # synchronous recursion depth when on_done fires synchronously (as
292             # it does when the underlying read buffer has multiple frames ready);
293             # past MAX_SYNC_DEPTH iterations, yields to the loop via Future::IO->later.
294             sub _start_driver {
295 3     3   4 my ($self) = @_;
296 3 50       6 return if $self->{_driver_step};
297 3 50       5 return unless $self->{_on_message};
298 0 0       0 return if $self->{_closed};
299 0 0       0 return unless $self->channel_count > 0;
300              
301 0         0 Scalar::Util::weaken(my $weak = $self);
302              
303 0         0 my $step;
304             my $weak_step;
305             $step = sub {
306 0 0 0 0   0 return unless $weak && !$weak->{_closed};
307              
308             # Trampoline: once 32 synchronous iterations deep, yield to the
309             # loop. Prevents stack overflow when a single TCP recv delivers
310             # many buffered frames whose Futures are already ready.
311 0 0       0 if ($SYNC_DEPTH >= MAX_SYNC_DEPTH) {
312             Future::IO->later(sub {
313 0 0 0     0 $weak_step->() if $weak_step && $weak && !$weak->{_closed};
      0        
314 0         0 });
315 0         0 return;
316             }
317 0         0 local $SYNC_DEPTH = $SYNC_DEPTH + 1;
318              
319             # Keep a strong ref to the in-flight read Future on the
320             # subscription so the async sub behind _read_frame_with_reconnect
321             # isn't GC'd mid-suspension (F::AA's "lost returning future").
322 0         0 my $f = $weak->{_current_read} = $weak->_read_frame_with_reconnect;
323              
324             $f->on_done(sub {
325 0 0 0     0 return unless $weak && !$weak->{_closed};
326 0         0 $weak->{_current_read} = undef;
327 0         0 my $cb_result = $weak->_dispatch_frame($_[0]);
328 0 0 0     0 if (Scalar::Util::blessed($cb_result) && $cb_result->isa('Future')) {
329             # Consumer-opted backpressure: wait for their Future
330             # before reading the next frame. Failures route to
331             # on_error (same path as a raised callback exception).
332             $cb_result->on_ready(sub {
333 0 0 0     0 return unless $weak && !$weak->{_closed};
334 0         0 my $res = shift;
335 0 0       0 if ($res->is_failed) {
336 0         0 $weak->_handle_fatal_error(
337             "on_message callback Future failed: " . $res->failure
338             );
339 0         0 return;
340             }
341 0 0 0     0 $weak_step->() if $weak_step && $weak && !$weak->{_closed};
      0        
342 0         0 });
343             } else {
344 0 0 0     0 $weak_step->() if $weak_step && $weak && !$weak->{_closed};
      0        
345             }
346 0         0 });
347              
348             $f->on_fail(sub {
349 0 0       0 return unless $weak;
350 0         0 $weak->{_current_read} = undef;
351             # If the user closed the subscription (or the underlying
352             # client disconnected) while a read was in flight, a
353             # "Connection closed by server" failure is expected, not
354             # fatal. Short-circuit so we don't die through _handle_fatal_error.
355 0 0       0 return if $weak->{_closed};
356 0         0 $weak->_handle_fatal_error($_[0]);
357 0         0 });
358 0         0 };
359              
360 0         0 Scalar::Util::weaken($weak_step = $step);
361              
362 0         0 $self->{_driver_step} = $step;
363 0         0 $step->();
364 0         0 return;
365             }
366              
367             # Backward-compatible wrapper
368 0     0 0 0 async sub next_message {
369 0         0 my ($self) = @_;
370 0         0 my $msg = await $self->next();
371 0 0       0 return undef unless $msg;
372              
373             # Convert new format to old format for compatibility
374             return {
375             channel => $msg->{channel},
376             message => $msg->{data},
377             pattern => $msg->{pattern},
378             type => $msg->{type},
379 0         0 };
380             }
381              
382             # Internal: called when message arrives
383             sub _deliver_message {
384 0     0   0 my ($self, $msg) = @_;
385              
386 0 0       0 if (@{$self->{_waiters}}) {
  0         0  
387             # Someone is waiting - deliver directly
388 0         0 my $waiter = shift @{$self->{_waiters}};
  0         0  
389 0         0 $waiter->done($msg);
390             }
391             else {
392             # Buffer the message
393 0         0 push @{$self->{_message_queue}}, $msg;
  0         0  
394             }
395             }
396              
397             # Unsubscribe from specific channels
398 0     0 0 0 async sub unsubscribe {
399 0         0 my ($self, @channels) = @_;
400              
401 0 0       0 return if $self->{_closed};
402              
403 0         0 my $redis = $self->{redis};
404              
405 0 0       0 if (@channels) {
406             # Partial unsubscribe
407 0         0 await $redis->_send_command('UNSUBSCRIBE', @channels);
408              
409             # Read confirmations
410 0         0 for my $ch (@channels) {
411 0         0 my $msg = await $redis->_read_pubsub_frame();
412 0         0 $self->_remove_channel($ch);
413             }
414             }
415             else {
416             # Full unsubscribe - all channels
417 0         0 my @all_channels = $self->channels;
418              
419 0 0       0 if (@all_channels) {
420 0         0 await $redis->_send_command('UNSUBSCRIBE');
421              
422             # Read all confirmations
423 0         0 for my $ch (@all_channels) {
424 0         0 my $msg = await $redis->_read_pubsub_frame();
425 0         0 $self->_remove_channel($ch);
426             }
427             }
428             }
429              
430             # If no subscriptions remain, close and exit pubsub mode
431 0 0       0 if ($self->channel_count == 0) {
432 0         0 $self->_close;
433             }
434              
435 0         0 return $self;
436             }
437              
438             # Unsubscribe from patterns
439 0     0 0 0 async sub punsubscribe {
440 0         0 my ($self, @patterns) = @_;
441              
442 0 0       0 return if $self->{_closed};
443              
444 0         0 my $redis = $self->{redis};
445              
446 0 0       0 if (@patterns) {
447 0         0 await $redis->_send_command('PUNSUBSCRIBE', @patterns);
448              
449 0         0 for my $p (@patterns) {
450 0         0 my $msg = await $redis->_read_pubsub_frame();
451 0         0 $self->_remove_pattern($p);
452             }
453             }
454             else {
455 0         0 my @all_patterns = $self->patterns;
456              
457 0 0       0 if (@all_patterns) {
458 0         0 await $redis->_send_command('PUNSUBSCRIBE');
459              
460 0         0 for my $p (@all_patterns) {
461 0         0 my $msg = await $redis->_read_pubsub_frame();
462 0         0 $self->_remove_pattern($p);
463             }
464             }
465             }
466              
467 0 0       0 if ($self->channel_count == 0) {
468 0         0 $self->_close;
469             }
470              
471 0         0 return $self;
472             }
473              
474             # Unsubscribe from sharded channels
475 0     0 0 0 async sub sunsubscribe {
476 0         0 my ($self, @channels) = @_;
477              
478 0 0       0 return if $self->{_closed};
479              
480 0         0 my $redis = $self->{redis};
481              
482 0 0       0 if (@channels) {
483 0         0 await $redis->_send_command('SUNSUBSCRIBE', @channels);
484              
485 0         0 for my $ch (@channels) {
486 0         0 my $msg = await $redis->_read_pubsub_frame();
487 0         0 $self->_remove_sharded_channel($ch);
488             }
489             }
490             else {
491 0         0 my @all = $self->sharded_channels;
492              
493 0 0       0 if (@all) {
494 0         0 await $redis->_send_command('SUNSUBSCRIBE');
495              
496 0         0 for my $ch (@all) {
497 0         0 my $msg = await $redis->_read_pubsub_frame();
498 0         0 $self->_remove_sharded_channel($ch);
499             }
500             }
501             }
502              
503 0 0       0 if ($self->channel_count == 0) {
504 0         0 $self->_close;
505             }
506              
507 0         0 return $self;
508             }
509              
510             # Close subscription
511             sub _close {
512 0     0   0 my ($self) = @_;
513              
514 0         0 $self->{_closed} = 1;
515 0         0 $self->{redis}{in_pubsub} = 0;
516              
517             # Cancel any waiters
518 0         0 for my $waiter (@{$self->{_waiters}}) {
  0         0  
519 0 0       0 $waiter->done(undef) unless $waiter->is_ready;
520             }
521 0         0 $self->{_waiters} = [];
522              
523             # Release the driver closure; weak refs already broke the cycle,
524             # so this is hygienic rather than required for GC.
525             # Do NOT clear _current_read — the in-flight read Future must stay
526             # pinned until it resolves, or F::AA will warn "lost its returning
527             # future". on_done/on_fail will clear it when the read completes
528             # and will see _closed first so they won't re-enter the driver.
529 0         0 $self->{_driver_step} = undef;
530             }
531              
532 0     0 0 0 sub is_closed { shift->{_closed} }
533              
534             # Get all subscriptions for reconnect replay
535             sub get_replay_commands {
536 1     1 0 5 my ($self) = @_;
537              
538 1         2 my @commands;
539              
540 1         3 my @channels = $self->channels;
541 1 50       5 push @commands, ['SUBSCRIBE', @channels] if @channels;
542              
543 1         3 my @patterns = $self->patterns;
544 1 50       3 push @commands, ['PSUBSCRIBE', @patterns] if @patterns;
545              
546 1         4 my @sharded = $self->sharded_channels;
547 1 50       3 push @commands, ['SSUBSCRIBE', @sharded] if @sharded;
548              
549 1         5 return @commands;
550             }
551              
552             1;
553              
554             __END__