File Coverage

blib/lib/Async/Redis/Subscription.pm
Criterion Covered Total %
statement 38 157 24.2
branch 5 60 8.3
condition 0 8 0.0
subroutine 13 26 50.0
pod 0 13 0.0
total 56 264 21.2


line stmt bran cond sub pod time code
1             package Async::Redis::Subscription;
2              
3 73     73   519 use strict;
  73         173  
  73         3561  
4 73     73   408 use warnings;
  73         150  
  73         3957  
5 73     73   1288 use 5.018;
  73         317  
6              
7 73     73   404 use Future;
  73         194  
  73         1883  
8 73     73   391 use Future::AsyncAwait;
  73         187  
  73         521  
9              
10             our $VERSION = '0.001';
11              
12             sub new {
13 2     2 0 26 my ($class, %args) = @_;
14              
15             return bless {
16             redis => $args{redis},
17 2         24 channels => {}, # channel => 1 (for regular subscribe)
18             patterns => {}, # pattern => 1 (for psubscribe)
19             sharded_channels => {}, # channel => 1 (for ssubscribe)
20             _message_queue => [], # Buffer for messages
21             _waiters => [], # Futures waiting for messages
22             _on_reconnect => undef, # Callback for reconnect notification
23             _closed => 0,
24             }, $class;
25             }
26              
27             # Set/get reconnect callback
28             sub on_reconnect {
29 3     3 0 13 my ($self, $cb) = @_;
30 3 100       12 $self->{_on_reconnect} = $cb if @_ > 1;
31 3         12 return $self->{_on_reconnect};
32             }
33              
34             # Track a channel subscription
35             sub _add_channel {
36 2     2   17 my ($self, $channel) = @_;
37 2         12 $self->{channels}{$channel} = 1;
38             }
39              
40             sub _add_pattern {
41 1     1   8 my ($self, $pattern) = @_;
42 1         5 $self->{patterns}{$pattern} = 1;
43             }
44              
45             sub _add_sharded_channel {
46 0     0   0 my ($self, $channel) = @_;
47 0         0 $self->{sharded_channels}{$channel} = 1;
48             }
49              
50             sub _remove_channel {
51 0     0   0 my ($self, $channel) = @_;
52 0         0 delete $self->{channels}{$channel};
53             }
54              
55             sub _remove_pattern {
56 0     0   0 my ($self, $pattern) = @_;
57 0         0 delete $self->{patterns}{$pattern};
58             }
59              
60             sub _remove_sharded_channel {
61 0     0   0 my ($self, $channel) = @_;
62 0         0 delete $self->{sharded_channels}{$channel};
63             }
64              
65             # List subscribed channels/patterns
66 1     1 0 2 sub channels { keys %{shift->{channels}} }
  1         6  
67 1     1 0 2 sub patterns { keys %{shift->{patterns}} }
  1         6  
68 1     1 0 2 sub sharded_channels { keys %{shift->{sharded_channels}} }
  1         3  
69              
70             sub channel_count {
71 0     0 0 0 my ($self) = @_;
72 0         0 return scalar(keys %{$self->{channels}})
73 0         0 + scalar(keys %{$self->{patterns}})
74 0         0 + scalar(keys %{$self->{sharded_channels}});
  0         0  
75             }
76              
77             # Receive next message (async iterator pattern)
78 0     0 0 0 async sub next {
79 0         0 my ($self) = @_;
80              
81             # Check if subscription is closed
82 0 0       0 return undef if $self->{_closed};
83              
84             # Return buffered message if available
85 0 0       0 if (@{$self->{_message_queue}}) {
  0         0  
86 0         0 return shift @{$self->{_message_queue}};
  0         0  
87             }
88              
89             # Read directly from redis
90 0         0 my $redis = $self->{redis};
91              
92 0         0 while (1) {
93 0         0 my $frame;
94 0         0 eval {
95 0         0 $frame = await $redis->_read_pubsub_frame();
96             };
97              
98 0 0       0 if (my $error = $@) {
99             # Connection error — attempt reconnect if enabled
100 0 0 0     0 if ($redis->{reconnect} && $self->channel_count > 0) {
101 0         0 eval {
102 0         0 await $redis->_reconnect_pubsub();
103             };
104 0 0       0 if ($@) {
105             # Reconnect failed — propagate the original error
106 0         0 die $error;
107             }
108              
109             # Fire reconnect callback if registered
110 0 0       0 if ($self->{_on_reconnect}) {
111 0         0 $self->{_on_reconnect}->($self);
112             }
113              
114             # Continue reading — next iteration will read
115             # from the new connection
116 0         0 next;
117             }
118              
119             # No reconnect — propagate error
120 0         0 die $error;
121             }
122              
123 0 0 0     0 last unless $frame && ref $frame eq 'ARRAY';
124              
125 0   0     0 my $type = $frame->[0] // '';
126              
127 0 0       0 if ($type eq 'message') {
    0          
    0          
    0          
128             return {
129 0         0 type => 'message',
130             channel => $frame->[1],
131             data => $frame->[2],
132             };
133             }
134             elsif ($type eq 'pmessage') {
135             return {
136 0         0 type => 'pmessage',
137             pattern => $frame->[1],
138             channel => $frame->[2],
139             data => $frame->[3],
140             };
141             }
142             elsif ($type eq 'smessage') {
143             return {
144 0         0 type => 'smessage',
145             channel => $frame->[1],
146             data => $frame->[2],
147             };
148             }
149             elsif ($type =~ /^(un)?p?s?subscribe$/) {
150             # Subscription confirmation - continue reading
151 0         0 next;
152             }
153             else {
154             # Unknown - continue
155 0         0 next;
156             }
157             }
158              
159 0         0 return undef;
160             }
161              
162             # Backward-compatible wrapper
163 0     0 0 0 async sub next_message {
164 0         0 my ($self) = @_;
165 0         0 my $msg = await $self->next();
166 0 0       0 return undef unless $msg;
167              
168             # Convert new format to old format for compatibility
169             return {
170             channel => $msg->{channel},
171             message => $msg->{data},
172             pattern => $msg->{pattern},
173             type => $msg->{type},
174 0         0 };
175             }
176              
177             # Internal: called when message arrives
178             sub _deliver_message {
179 0     0   0 my ($self, $msg) = @_;
180              
181 0 0       0 if (@{$self->{_waiters}}) {
  0         0  
182             # Someone is waiting - deliver directly
183 0         0 my $waiter = shift @{$self->{_waiters}};
  0         0  
184 0         0 $waiter->done($msg);
185             }
186             else {
187             # Buffer the message
188 0         0 push @{$self->{_message_queue}}, $msg;
  0         0  
189             }
190             }
191              
192             # Unsubscribe from specific channels
193 0     0 0 0 async sub unsubscribe {
194 0         0 my ($self, @channels) = @_;
195              
196 0 0       0 return if $self->{_closed};
197              
198 0         0 my $redis = $self->{redis};
199              
200 0 0       0 if (@channels) {
201             # Partial unsubscribe
202 0         0 await $redis->_send_command('UNSUBSCRIBE', @channels);
203              
204             # Read confirmations
205 0         0 for my $ch (@channels) {
206 0         0 my $msg = await $redis->_read_pubsub_frame();
207 0         0 $self->_remove_channel($ch);
208             }
209             }
210             else {
211             # Full unsubscribe - all channels
212 0         0 my @all_channels = $self->channels;
213              
214 0 0       0 if (@all_channels) {
215 0         0 await $redis->_send_command('UNSUBSCRIBE');
216              
217             # Read all confirmations
218 0         0 for my $ch (@all_channels) {
219 0         0 my $msg = await $redis->_read_pubsub_frame();
220 0         0 $self->_remove_channel($ch);
221             }
222             }
223             }
224              
225             # If no subscriptions remain, close and exit pubsub mode
226 0 0       0 if ($self->channel_count == 0) {
227 0         0 $self->_close;
228             }
229              
230 0         0 return $self;
231             }
232              
233             # Unsubscribe from patterns
234 0     0 0 0 async sub punsubscribe {
235 0         0 my ($self, @patterns) = @_;
236              
237 0 0       0 return if $self->{_closed};
238              
239 0         0 my $redis = $self->{redis};
240              
241 0 0       0 if (@patterns) {
242 0         0 await $redis->_send_command('PUNSUBSCRIBE', @patterns);
243              
244 0         0 for my $p (@patterns) {
245 0         0 my $msg = await $redis->_read_pubsub_frame();
246 0         0 $self->_remove_pattern($p);
247             }
248             }
249             else {
250 0         0 my @all_patterns = $self->patterns;
251              
252 0 0       0 if (@all_patterns) {
253 0         0 await $redis->_send_command('PUNSUBSCRIBE');
254              
255 0         0 for my $p (@all_patterns) {
256 0         0 my $msg = await $redis->_read_pubsub_frame();
257 0         0 $self->_remove_pattern($p);
258             }
259             }
260             }
261              
262 0 0       0 if ($self->channel_count == 0) {
263 0         0 $self->_close;
264             }
265              
266 0         0 return $self;
267             }
268              
269             # Unsubscribe from sharded channels
270 0     0 0 0 async sub sunsubscribe {
271 0         0 my ($self, @channels) = @_;
272              
273 0 0       0 return if $self->{_closed};
274              
275 0         0 my $redis = $self->{redis};
276              
277 0 0       0 if (@channels) {
278 0         0 await $redis->_send_command('SUNSUBSCRIBE', @channels);
279              
280 0         0 for my $ch (@channels) {
281 0         0 my $msg = await $redis->_read_pubsub_frame();
282 0         0 $self->_remove_sharded_channel($ch);
283             }
284             }
285             else {
286 0         0 my @all = $self->sharded_channels;
287              
288 0 0       0 if (@all) {
289 0         0 await $redis->_send_command('SUNSUBSCRIBE');
290              
291 0         0 for my $ch (@all) {
292 0         0 my $msg = await $redis->_read_pubsub_frame();
293 0         0 $self->_remove_sharded_channel($ch);
294             }
295             }
296             }
297              
298 0 0       0 if ($self->channel_count == 0) {
299 0         0 $self->_close;
300             }
301              
302 0         0 return $self;
303             }
304              
305             # Close subscription
306             sub _close {
307 0     0   0 my ($self) = @_;
308              
309 0         0 $self->{_closed} = 1;
310 0         0 $self->{redis}{in_pubsub} = 0;
311              
312             # Cancel any waiters
313 0         0 for my $waiter (@{$self->{_waiters}}) {
  0         0  
314 0 0       0 $waiter->done(undef) unless $waiter->is_ready;
315             }
316 0         0 $self->{_waiters} = [];
317             }
318              
319 0     0 0 0 sub is_closed { shift->{_closed} }
320              
321             # Get all subscriptions for reconnect replay
322             sub get_replay_commands {
323 1     1 0 7 my ($self) = @_;
324              
325 1         2 my @commands;
326              
327 1         5 my @channels = $self->channels;
328 1 50       6 push @commands, ['SUBSCRIBE', @channels] if @channels;
329              
330 1         4 my @patterns = $self->patterns;
331 1 50       4 push @commands, ['PSUBSCRIBE', @patterns] if @patterns;
332              
333 1         6 my @sharded = $self->sharded_channels;
334 1 50       7 push @commands, ['SSUBSCRIBE', @sharded] if @sharded;
335              
336 1         6 return @commands;
337             }
338              
339             1;
340              
341             __END__