File Coverage

blib/lib/Async/Redis/Subscription.pm
Criterion Covered Total %
statement 14 142 9.8
branch 0 50 0.0
condition 0 5 0.0
subroutine 5 25 20.0
pod 0 12 0.0
total 19 234 8.1


line stmt bran cond sub pod time code
1             package Async::Redis::Subscription;
2              
3 68     68   474 use strict;
  68         175  
  68         3239  
4 68     68   369 use warnings;
  68         152  
  68         4249  
5 68     68   1193 use 5.018;
  68         265  
6              
7 68     68   372 use Future;
  68         152  
  68         1774  
8 68     68   337 use Future::AsyncAwait;
  68         170  
  68         454  
9              
10             our $VERSION = '0.001';
11              
12             sub new {
13 0     0 0   my ($class, %args) = @_;
14              
15             return bless {
16             redis => $args{redis},
17 0           channels => {}, # channel => 1 (for regular subscribe)
18             patterns => {}, # pattern => 1 (for psubscribe)
19             sharded_channels => {}, # channel => 1 (for ssubscribe)
20             _message_queue => [], # Buffer for messages
21             _waiters => [], # Futures waiting for messages
22             _closed => 0,
23             }, $class;
24             }
25              
26             # Track a channel subscription
27             sub _add_channel {
28 0     0     my ($self, $channel) = @_;
29 0           $self->{channels}{$channel} = 1;
30             }
31              
32             sub _add_pattern {
33 0     0     my ($self, $pattern) = @_;
34 0           $self->{patterns}{$pattern} = 1;
35             }
36              
37             sub _add_sharded_channel {
38 0     0     my ($self, $channel) = @_;
39 0           $self->{sharded_channels}{$channel} = 1;
40             }
41              
42             sub _remove_channel {
43 0     0     my ($self, $channel) = @_;
44 0           delete $self->{channels}{$channel};
45             }
46              
47             sub _remove_pattern {
48 0     0     my ($self, $pattern) = @_;
49 0           delete $self->{patterns}{$pattern};
50             }
51              
52             sub _remove_sharded_channel {
53 0     0     my ($self, $channel) = @_;
54 0           delete $self->{sharded_channels}{$channel};
55             }
56              
57             # List subscribed channels/patterns
58 0     0 0   sub channels { keys %{shift->{channels}} }
  0            
59 0     0 0   sub patterns { keys %{shift->{patterns}} }
  0            
60 0     0 0   sub sharded_channels { keys %{shift->{sharded_channels}} }
  0            
61              
62             sub channel_count {
63 0     0 0   my ($self) = @_;
64 0           return scalar(keys %{$self->{channels}})
65 0           + scalar(keys %{$self->{patterns}})
66 0           + scalar(keys %{$self->{sharded_channels}});
  0            
67             }
68              
69             # Receive next message (async iterator pattern)
70 0     0 0   async sub next {
71 0           my ($self) = @_;
72              
73             # Check if subscription is closed
74 0 0         return undef if $self->{_closed};
75              
76             # Return buffered message if available
77 0 0         if (@{$self->{_message_queue}}) {
  0            
78 0           return shift @{$self->{_message_queue}};
  0            
79             }
80              
81             # Read directly from redis
82 0           my $redis = $self->{redis};
83              
84 0           while (1) {
85 0           my $frame = await $redis->_read_pubsub_frame();
86              
87 0 0 0       last unless $frame && ref $frame eq 'ARRAY';
88              
89 0   0       my $type = $frame->[0] // '';
90              
91 0 0         if ($type eq 'message') {
    0          
    0          
    0          
92             return {
93 0           type => 'message',
94             channel => $frame->[1],
95             data => $frame->[2],
96             };
97             }
98             elsif ($type eq 'pmessage') {
99             return {
100 0           type => 'pmessage',
101             pattern => $frame->[1],
102             channel => $frame->[2],
103             data => $frame->[3],
104             };
105             }
106             elsif ($type eq 'smessage') {
107             return {
108 0           type => 'smessage',
109             channel => $frame->[1],
110             data => $frame->[2],
111             };
112             }
113             elsif ($type =~ /^(un)?p?s?subscribe$/) {
114             # Subscription confirmation - continue reading
115 0           next;
116             }
117             else {
118             # Unknown - continue
119 0           next;
120             }
121             }
122              
123 0           return undef;
124             }
125              
126             # Backward-compatible wrapper
127 0     0 0   async sub next_message {
128 0           my ($self) = @_;
129 0           my $msg = await $self->next();
130 0 0         return undef unless $msg;
131              
132             # Convert new format to old format for compatibility
133             return {
134             channel => $msg->{channel},
135             message => $msg->{data},
136             pattern => $msg->{pattern},
137             type => $msg->{type},
138 0           };
139             }
140              
141             # Internal: called when message arrives
142             sub _deliver_message {
143 0     0     my ($self, $msg) = @_;
144              
145 0 0         if (@{$self->{_waiters}}) {
  0            
146             # Someone is waiting - deliver directly
147 0           my $waiter = shift @{$self->{_waiters}};
  0            
148 0           $waiter->done($msg);
149             }
150             else {
151             # Buffer the message
152 0           push @{$self->{_message_queue}}, $msg;
  0            
153             }
154             }
155              
156             # Unsubscribe from specific channels
157 0     0 0   async sub unsubscribe {
158 0           my ($self, @channels) = @_;
159              
160 0 0         return if $self->{_closed};
161              
162 0           my $redis = $self->{redis};
163              
164 0 0         if (@channels) {
165             # Partial unsubscribe
166 0           await $redis->_send_command('UNSUBSCRIBE', @channels);
167              
168             # Read confirmations
169 0           for my $ch (@channels) {
170 0           my $msg = await $redis->_read_pubsub_frame();
171 0           $self->_remove_channel($ch);
172             }
173             }
174             else {
175             # Full unsubscribe - all channels
176 0           my @all_channels = $self->channels;
177              
178 0 0         if (@all_channels) {
179 0           await $redis->_send_command('UNSUBSCRIBE');
180              
181             # Read all confirmations
182 0           for my $ch (@all_channels) {
183 0           my $msg = await $redis->_read_pubsub_frame();
184 0           $self->_remove_channel($ch);
185             }
186             }
187             }
188              
189             # If no subscriptions remain, close and exit pubsub mode
190 0 0         if ($self->channel_count == 0) {
191 0           $self->_close;
192             }
193              
194 0           return $self;
195             }
196              
197             # Unsubscribe from patterns
198 0     0 0   async sub punsubscribe {
199 0           my ($self, @patterns) = @_;
200              
201 0 0         return if $self->{_closed};
202              
203 0           my $redis = $self->{redis};
204              
205 0 0         if (@patterns) {
206 0           await $redis->_send_command('PUNSUBSCRIBE', @patterns);
207              
208 0           for my $p (@patterns) {
209 0           my $msg = await $redis->_read_pubsub_frame();
210 0           $self->_remove_pattern($p);
211             }
212             }
213             else {
214 0           my @all_patterns = $self->patterns;
215              
216 0 0         if (@all_patterns) {
217 0           await $redis->_send_command('PUNSUBSCRIBE');
218              
219 0           for my $p (@all_patterns) {
220 0           my $msg = await $redis->_read_pubsub_frame();
221 0           $self->_remove_pattern($p);
222             }
223             }
224             }
225              
226 0 0         if ($self->channel_count == 0) {
227 0           $self->_close;
228             }
229              
230 0           return $self;
231             }
232              
233             # Unsubscribe from sharded channels
234 0     0 0   async sub sunsubscribe {
235 0           my ($self, @channels) = @_;
236              
237 0 0         return if $self->{_closed};
238              
239 0           my $redis = $self->{redis};
240              
241 0 0         if (@channels) {
242 0           await $redis->_send_command('SUNSUBSCRIBE', @channels);
243              
244 0           for my $ch (@channels) {
245 0           my $msg = await $redis->_read_pubsub_frame();
246 0           $self->_remove_sharded_channel($ch);
247             }
248             }
249             else {
250 0           my @all = $self->sharded_channels;
251              
252 0 0         if (@all) {
253 0           await $redis->_send_command('SUNSUBSCRIBE');
254              
255 0           for my $ch (@all) {
256 0           my $msg = await $redis->_read_pubsub_frame();
257 0           $self->_remove_sharded_channel($ch);
258             }
259             }
260             }
261              
262 0 0         if ($self->channel_count == 0) {
263 0           $self->_close;
264             }
265              
266 0           return $self;
267             }
268              
269             # Close subscription
270             sub _close {
271 0     0     my ($self) = @_;
272              
273 0           $self->{_closed} = 1;
274 0           $self->{redis}{in_pubsub} = 0;
275              
276             # Cancel any waiters
277 0           for my $waiter (@{$self->{_waiters}}) {
  0            
278 0 0         $waiter->done(undef) unless $waiter->is_ready;
279             }
280 0           $self->{_waiters} = [];
281             }
282              
283 0     0 0   sub is_closed { shift->{_closed} }
284              
285             # Get all subscriptions for reconnect replay
286             sub get_replay_commands {
287 0     0 0   my ($self) = @_;
288              
289 0           my @commands;
290              
291 0           my @channels = $self->channels;
292 0 0         push @commands, ['SUBSCRIBE', @channels] if @channels;
293              
294 0           my @patterns = $self->patterns;
295 0 0         push @commands, ['PSUBSCRIBE', @patterns] if @patterns;
296              
297 0           my @sharded = $self->sharded_channels;
298 0 0         push @commands, ['SSUBSCRIBE', @sharded] if @sharded;
299              
300 0           return @commands;
301             }
302              
303             1;
304              
305             __END__