File Coverage

lib/PAGI/App/SSE/Pubsub.pm
Criterion Covered Total %
statement 59 99 59.6
branch 10 38 26.3
condition 7 20 35.0
subroutine 11 12 91.6
pod 3 5 60.0
total 90 174 51.7


line stmt bran cond sub pod time code
1             package PAGI::App::SSE::Pubsub;
2              
3 1     1   198395 use strict;
  1         1  
  1         33  
4 1     1   3 use warnings;
  1         5  
  1         39  
5 1     1   3 use Future::AsyncAwait;
  1         1  
  1         9  
6 1     1   40 use Future;
  1         2  
  1         1608  
7              
8             =head1 NAME
9              
10             PAGI::App::SSE::Pubsub - Pub/sub Server-Sent Events
11              
12             =head1 SYNOPSIS
13              
14             use PAGI::App::SSE::Pubsub;
15              
16             my $app = PAGI::App::SSE::Pubsub->new->to_app;
17              
18             # From elsewhere, publish events
19             PAGI::App::SSE::Pubsub->publish('news', { data => 'Hello!' });
20              
21             =cut
22              
23             # Shared state
24             my %channels; # channel => { clients => { id => { send => cb, scope => scope } } }
25             my $next_id = 1;
26              
27             sub new {
28 1     1 0 1461 my ($class, %args) = @_;
29              
30             return bless {
31             channel => $args{channel} // 'default',
32             retry => $args{retry},
33             on_connect => $args{on_connect},
34             on_close => $args{on_close},
35             history => $args{history} // 0,
36 1   50     17 headers => $args{headers} // [],
      50        
      50        
37             }, $class;
38             }
39              
40             # Class method to publish events
41             sub publish {
42 1     1 1 1806 my ($class, $channel, $event) = @_;
43              
44 1 50       3 return unless $channels{$channel};
45              
46 0         0 my $data = _format_event($event);
47 0         0 my $clients = $channels{$channel}{clients};
48              
49 0         0 for my $id (keys %$clients) {
50 0         0 my $client = $clients->{$id};
51 0         0 eval {
52 0         0 $client->{send}->({
53             type => 'http.response.body',
54             body => $data,
55             more => 1,
56             });
57             };
58 0 0       0 if ($@) {
59 0         0 $client->{closed} = 1;
60 0         0 delete $clients->{$id};
61             }
62             }
63              
64             # Store in history if enabled
65 0 0       0 if ($channels{$channel}{history_size}) {
66 0         0 push @{$channels{$channel}{history}}, $event;
  0         0  
67 0         0 my $max = $channels{$channel}{history_size};
68 0 0       0 if (@{$channels{$channel}{history}} > $max) {
  0         0  
69 0         0 shift @{$channels{$channel}{history}};
  0         0  
70             }
71             }
72             }
73              
74             # Class method to get client count
75             sub client_count {
76 1     1 1 193778 my ($class, $channel) = @_;
77 1   50     6 $channel //= undef;
78              
79 1 50       2 if ($channel) {
80 0 0       0 return 0 unless $channels{$channel};
81 0         0 return scalar keys %{$channels{$channel}{clients}};
  0         0  
82             }
83 1         3 my $total = 0;
84 1         3 $total += scalar keys %{$_->{clients}} for values %channels;
  0         0  
85 1         2 return $total;
86             }
87              
88             # Class method to list channels
89             sub list_channels {
90 1     1 1 1245 my ($class) = @_;
91              
92 1         3 return keys %channels;
93             }
94              
95             sub to_app {
96 1     1 0 2 my ($self) = @_;
97              
98 1         4 my $channel = $self->{channel};
99 1         2 my $retry = $self->{retry};
100 1         2 my $on_connect = $self->{on_connect};
101 1         1 my $on_close = $self->{on_close};
102 1         2 my $history_size = $self->{history};
103 1         1 my $extra_headers = $self->{headers};
104              
105 1     1   28 return async sub {
106 1         2 my ($scope, $receive, $send) = @_;
107 1 50       4 die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';
108              
109             # Build headers
110 1         5 my @headers = (
111             ['content-type', 'text/event-stream'],
112             ['cache-control', 'no-cache'],
113             ['connection', 'keep-alive'],
114             @$extra_headers,
115             );
116              
117 1         5 await $send->({
118             type => 'http.response.start',
119             status => 200,
120             headers => \@headers,
121             });
122              
123             # Initialize channel
124 1   50     111 $channels{$channel} //= {
125             clients => {},
126             history => [],
127             history_size => $history_size,
128             };
129              
130 1         2 my $client_id = $next_id++;
131 1         3 my $client_data = {
132             send => $send,
133             scope => $scope,
134             closed => 0,
135             };
136 1         3 $channels{$channel}{clients}{$client_id} = $client_data;
137              
138             # Send retry hint
139 1 50       2 if (defined $retry) {
140 0         0 await $send->({
141             type => 'http.response.body',
142             body => "retry: $retry\n\n",
143             more => 1,
144             });
145             }
146              
147             # Send history if requested
148 1         4 my $last_event_id = _get_last_event_id($scope);
149 1 50 33     4 if ($history_size && defined $last_event_id) {
150 0         0 my $found = 0;
151 0         0 for my $event (@{$channels{$channel}{history}}) {
  0         0  
152 0 0 0     0 if ($found) {
    0          
153 0         0 await $send->({
154             type => 'http.response.body',
155             body => _format_event($event),
156             more => 1,
157             });
158             } elsif ($event->{id} && $event->{id} eq $last_event_id) {
159 0         0 $found = 1;
160             }
161             }
162             }
163              
164 1 50       2 $on_connect->($scope, $channel) if $on_connect;
165              
166             # Wait for disconnect
167 1         4 while (!$client_data->{closed}) {
168 1         2 my $event = await $receive->();
169 1 50       31 if ($event->{type} eq 'http.disconnect') {
170 1         2 last;
171             }
172             }
173              
174             # Cleanup
175 1         2 delete $channels{$channel}{clients}{$client_id};
176 1 50       1 delete $channels{$channel} if !keys %{$channels{$channel}{clients}};
  1         4  
177              
178 1 50       3 $on_close->($scope, $channel) if $on_close;
179              
180             # End response
181 1 50       3 unless ($client_data->{closed}) {
182 1         4 await $send->({
183             type => 'http.response.body',
184             body => '',
185             more => 0,
186             });
187             }
188 1         8 };
189             }
190              
191             sub _format_event {
192 0     0   0 my ($event) = @_;
193              
194 0         0 my $data = '';
195              
196 0 0       0 if ($event->{event}) {
197 0         0 $data .= "event: $event->{event}\n";
198             }
199 0 0       0 if ($event->{id}) {
200 0         0 $data .= "id: $event->{id}\n";
201             }
202              
203 0   0     0 my $content = $event->{data} // '';
204 0         0 for my $line (split /\n/, $content) {
205 0         0 $data .= "data: $line\n";
206             }
207              
208 0         0 return "$data\n";
209             }
210              
211             sub _get_last_event_id {
212 1     1   4 my ($scope) = @_;
213              
214 1   50     1 for my $h (@{$scope->{headers} // []}) {
  1         3  
215 0 0       0 if (lc($h->[0]) eq 'last-event-id') {
216 0         0 return $h->[1];
217             }
218             }
219 1         7 return undef;
220             }
221              
222             1;
223              
224             __END__