File Coverage

blib/lib/PAGI/SSE.pm
Criterion Covered Total %
statement 211 319 66.1
branch 70 146 47.9
condition 32 76 42.1
subroutine 42 58 72.4
pod 37 41 90.2
total 392 640 61.2


line stmt bran cond sub pod time code
1             package PAGI::SSE;
2 16     16   2059446 use strict;
  16         23  
  16         563  
3 16     16   63 use warnings;
  16         69  
  16         825  
4 16     16   64 use Carp qw(croak);
  16         21  
  16         706  
5 16     16   4995 use Hash::MultiValue;
  16         30647  
  16         760  
6 16     16   1654 use Future::AsyncAwait;
  16         38749  
  16         76  
7 16     16   658 use Future;
  16         17  
  16         276  
8 16     16   4404 use JSON::MaybeXS ();
  16         119219  
  16         589  
9 16     16   100 use Scalar::Util qw(blessed);
  16         22  
  16         903  
10 16     16   4833 use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
  16         180241  
  16         77152  
11              
12              
13             sub new {
14 50     50 1 1907493 my ($class, $scope, $receive, $send) = @_;
15              
16 50 100 66     752 croak "PAGI::SSE requires scope hashref"
17             unless $scope && ref($scope) eq 'HASH';
18 49 100 66     362 croak "PAGI::SSE requires receive coderef"
19             unless $receive && ref($receive) eq 'CODE';
20 48 100 66     297 croak "PAGI::SSE requires send coderef"
21             unless $send && ref($send) eq 'CODE';
22             croak "PAGI::SSE requires scope type 'sse', got '$scope->{type}'"
23 47 100 50     281 unless ($scope->{type} // '') eq 'sse';
24              
25             # Return existing SSE object if one was already created for this scope
26             # This ensures consistent state (is_started, is_closed, callbacks) if
27             # multiple code paths create SSE objects from the same scope.
28 46 50       88 return $scope->{'pagi.sse'} if $scope->{'pagi.sse'};
29              
30 46         290 my $self = bless {
31             scope => $scope,
32             receive => $receive,
33             send => $send,
34             _state => 'pending', # pending -> started -> closed
35             _on_close => [],
36             _on_error => [],
37             _disconnect_reason => undef, # Set when disconnect received
38             }, $class;
39              
40             # Cache in scope for reuse (weakened to avoid circular reference leak)
41 46         81 $scope->{'pagi.sse'} = $self;
42 46         66 Scalar::Util::weaken($scope->{'pagi.sse'});
43              
44 46         118 return $self;
45             }
46              
47             # Scope property accessors
48 1     1 1 7 sub scope { shift->{scope} }
49 1   50 1 1 13 sub path { shift->{scope}{path} // '/' }
50 0   0 0 1 0 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} // '/' }
  0   0     0  
51 1   50 1 1 7 sub query_string { shift->{scope}{query_string} // '' }
52              
53             # URL decode helper (handles + as space per application/x-www-form-urlencoded)
54             sub _url_decode {
55 0     0   0 my ($str) = @_;
56 0 0       0 return '' unless defined $str;
57 0         0 $str =~ s/\+/ /g;
58 0         0 $str =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge;
  0         0  
59 0         0 return $str;
60             }
61              
62             # UTF-8 decode helper with strict/lenient mode
63             sub _decode_utf8 {
64 0     0   0 my ($str, $strict) = @_;
65 0 0       0 return '' unless defined $str;
66 0 0       0 my $flag = $strict ? FB_CROAK : FB_DEFAULT;
67 0         0 $flag |= LEAVE_SRC;
68 0         0 return decode('UTF-8', $str, $flag);
69             }
70              
71             # Query params as Hash::MultiValue (cached in scope)
72             # Options: strict => 1 (croak on invalid UTF-8), raw => 1 (skip UTF-8 decoding)
73             sub query_params {
74 0     0 1 0 my ($self, %opts) = @_;
75 0   0     0 my $strict = delete $opts{strict} // 0;
76 0   0     0 my $raw = delete $opts{raw} // 0;
77 0 0       0 croak("Unknown options to query_params: " . join(', ', keys %opts)) if %opts;
78              
79 0 0       0 my $cache_key = $raw ? 'pagi.sse.query.raw' : ($strict ? 'pagi.sse.query.strict' : 'pagi.sse.query');
    0          
80 0 0       0 return $self->{scope}{$cache_key} if $self->{scope}{$cache_key};
81              
82 0         0 my $qs = $self->query_string;
83 0         0 my @pairs;
84              
85 0         0 for my $part (split /[&;]/, $qs) {
86 0 0       0 next unless length $part;
87 0         0 my ($key, $val) = split /=/, $part, 2;
88 0   0     0 $key //= '';
89 0   0     0 $val //= '';
90              
91             # URL decode (handles + as space)
92 0         0 my $key_decoded = _url_decode($key);
93 0         0 my $val_decoded = _url_decode($val);
94              
95             # UTF-8 decode unless raw mode
96 0 0       0 my $key_final = $raw ? $key_decoded : _decode_utf8($key_decoded, $strict);
97 0 0       0 my $val_final = $raw ? $val_decoded : _decode_utf8($val_decoded, $strict);
98              
99 0         0 push @pairs, $key_final, $val_final;
100             }
101              
102 0         0 $self->{scope}{$cache_key} = Hash::MultiValue->new(@pairs);
103 0         0 return $self->{scope}{$cache_key};
104             }
105              
106             # Raw query params (no UTF-8 decoding)
107             sub raw_query_params {
108 0     0 1 0 my $self = shift;
109 0         0 return $self->query_params(raw => 1);
110             }
111              
112             # Single query param accessor
113             sub query_param {
114 0     0 1 0 my ($self, $name, %opts) = @_;
115 0         0 return $self->query_params(%opts)->get($name);
116             }
117              
118             # Raw single query param
119             sub raw_query_param {
120 0     0 1 0 my ($self, $name) = @_;
121 0         0 return $self->query_param($name, raw => 1);
122             }
123              
124 1   50 1 1 6 sub scheme { shift->{scope}{scheme} // 'http' }
125 0   0 0 1 0 sub http_version { shift->{scope}{http_version} // '1.1' }
126 0     0 0 0 sub client { shift->{scope}{client} }
127 0     0 0 0 sub server { shift->{scope}{server} }
128              
129             # Per-connection storage - lives in scope, shared across Request/Response/WebSocket/SSE
130             # See PAGI::Request for detailed design notes on why stash is scope-based.
131             sub stash {
132 8     8 1 28 my ($self) = @_;
133 8   100     59 return $self->{scope}{'pagi.stash'} //= {};
134             }
135              
136             # Application state (injected by PAGI::Lifespan, read-only)
137             sub state {
138 5     5 1 13 my $self = shift;
139 5   100     50 return $self->{scope}{state} // {};
140             }
141              
142             # Path parameter accessors - captured from URL path by router
143             # Stored in scope->{path_params} for router-agnostic access
144             sub path_params {
145 2     2 1 5 my ($self) = @_;
146 2   100     19 return $self->{scope}{path_params} // {};
147             }
148              
149             sub path_param {
150 5     5 1 27 my ($self, $name) = @_;
151 5   100     19 my $params = $self->{scope}{path_params} // {};
152 5         19 return $params->{$name};
153             }
154              
155             # Connection state accessors
156 4     4 1 39 sub connection_state { shift->{_state} }
157              
158             sub is_started {
159 60     60 1 1237 my $self = shift;
160 60         188 return $self->{_state} eq 'started';
161             }
162              
163             sub is_closed {
164 70     70 1 795 my $self = shift;
165 70         335 return $self->{_state} eq 'closed';
166             }
167              
168             # Disconnect reason - why the connection closed
169             # Common values: 'client_closed', 'write_error', 'send_timeout', 'idle_timeout'
170             sub disconnect_reason {
171 0     0 1 0 my $self = shift;
172 0         0 return $self->{_disconnect_reason};
173             }
174              
175             # Internal state setters
176             sub _set_state {
177 24     24   47 my ($self, $state) = @_;
178 24         37 $self->{_state} = $state;
179             }
180              
181             sub _set_closed {
182 10     10   22 my ($self) = @_;
183 10         18 $self->{_state} = 'closed';
184             }
185              
186             # Start the SSE stream
187 24     24 1 145 async sub start {
188 24         37 my ($self, %opts) = @_;
189              
190             # Idempotent - don't start twice
191 24 100 66     44 return $self if $self->is_started || $self->is_closed;
192              
193             my $event = {
194             type => 'sse.start',
195 22   100     107 status => $opts{status} // 200,
196             };
197 22 100       41 $event->{headers} = $opts{headers} if exists $opts{headers};
198              
199 22         55 await $self->{send}->($event);
200 22         1043 $self->_set_state('started');
201              
202 22         124 return $self;
203             }
204              
205             # Enable keepalive - sends sse.keepalive event to server
206             # Server handles the timer; this is loop-agnostic
207 5     5 1 17 async sub keepalive {
208 5         9 my ($self, $interval, $comment) = @_;
209              
210 5         48 await $self->{send}->({
211             type => 'sse.keepalive',
212             interval => $interval // 0,
213             comment => $comment // '',
214             });
215              
216 5         261 return $self;
217             }
218              
219             # Single header lookup (case-insensitive, returns last value)
220             sub header {
221 8     8 1 22 my ($self, $name) = @_;
222 8         15 $name = lc($name);
223 8         12 my $value;
224 8   50     10 for my $pair (@{$self->{scope}{headers} // []}) {
  8         36  
225 14 100       38 if (lc($pair->[0]) eq $name) {
226 5         9 $value = $pair->[1];
227             }
228             }
229 8         41 return $value;
230             }
231              
232             # All headers as Hash::MultiValue (cached in scope)
233             sub headers {
234 1     1 1 1 my $self = shift;
235 1 50       4 return $self->{scope}{'pagi.request.headers'} if $self->{scope}{'pagi.request.headers'};
236              
237 1         1 my @pairs;
238 1   50     1 for my $pair (@{$self->{scope}{headers} // []}) {
  1         4  
239 3         7 push @pairs, lc($pair->[0]), $pair->[1];
240             }
241              
242 1         8 $self->{scope}{'pagi.request.headers'} = Hash::MultiValue->new(@pairs);
243 1         49 return $self->{scope}{'pagi.request.headers'};
244             }
245              
246             # All values for a header
247             sub header_all {
248 1     1 1 3 my ($self, $name) = @_;
249 1         3 return $self->headers->get_all(lc($name));
250             }
251              
252             # Get Last-Event-ID header from client (for reconnection)
253             sub last_event_id {
254 5     5 1 19 my ($self) = @_;
255 5         18 return $self->header('last-event-id');
256             }
257              
258             # Send data-only event
259 9     9 1 73 async sub send {
260 9         10 my ($self, $data) = @_;
261              
262 9 100       28 croak "Cannot send on closed SSE connection" if $self->is_closed;
263              
264             # Auto-start if not started
265 8 100       10 await $self->start unless $self->is_started;
266              
267 8         36 await $self->{send}->({
268             type => 'sse.send',
269             data => $data,
270             });
271              
272 8         172 return $self;
273             }
274              
275             # Send JSON-encoded data
276 2     2 1 49 async sub send_json {
277 2         5 my ($self, $data) = @_;
278              
279 2 50       5 croak "Cannot send on closed SSE connection" if $self->is_closed;
280              
281 2 50       5 await $self->start unless $self->is_started;
282              
283 2         16 my $json = JSON::MaybeXS::encode_json($data);
284              
285 2         9 await $self->{send}->({
286             type => 'sse.send',
287             data => $json,
288             });
289              
290 2         45 return $self;
291             }
292              
293             # Send full SSE event with all fields
294 10     10 1 161 async sub send_event {
295 10         35 my ($self, %opts) = @_;
296              
297 10 50       19 croak "Cannot send on closed SSE connection" if $self->is_closed;
298 10 50       23 croak "send_event requires 'data' parameter" unless exists $opts{data};
299              
300 10 100       21 await $self->start unless $self->is_started;
301              
302             # Auto-encode hashref/arrayref data as JSON
303 10         75 my $data = $opts{data};
304 10 100       18 if (ref $data) {
305 8         140 $data = JSON::MaybeXS::encode_json($data);
306             }
307              
308 10         35 my $event = {
309             type => 'sse.send',
310             data => $data,
311             };
312              
313 10 100       26 $event->{event} = $opts{event} if defined $opts{event};
314 10 100       27 $event->{id} = "$opts{id}" if defined $opts{id};
315 10 100       19 $event->{retry} = int($opts{retry}) if defined $opts{retry};
316              
317 10         24 await $self->{send}->($event);
318              
319 10         263 return $self;
320             }
321              
322             # Safe send - returns bool instead of throwing
323 3     3 1 34 async sub try_send {
324 3         4 my ($self, $data) = @_;
325 3 100       6 return 0 if $self->is_closed;
326              
327 2         3 eval {
328 2 50       2 await $self->start unless $self->is_started;
329 2         6 await $self->{send}->({
330             type => 'sse.send',
331             data => $data,
332             });
333             };
334 2 100       302 if ($@) {
335 1         4 $self->_set_closed;
336 1         5 return 0;
337             }
338 1         3 return 1;
339             }
340              
341 1     1 1 25 async sub try_send_json {
342 1         2 my ($self, $data) = @_;
343 1 50       2 return 0 if $self->is_closed;
344              
345 1         2 eval {
346 1 50       2 await $self->start unless $self->is_started;
347 1         8 my $json = JSON::MaybeXS::encode_json($data);
348 1         13 await $self->{send}->({
349             type => 'sse.send',
350             data => $json,
351             });
352             };
353 1 50       26 if ($@) {
354 0         0 $self->_set_closed;
355 0         0 return 0;
356             }
357 1         3 return 1;
358             }
359              
360             # Send SSE comment (doesn't trigger onmessage in browser)
361 0     0 0 0 async sub send_comment {
362 0         0 my ($self, $comment) = @_;
363              
364 0 0       0 croak "Cannot send on closed SSE connection" if $self->is_closed;
365              
366 0 0       0 await $self->start unless $self->is_started;
367              
368 0         0 await $self->{send}->({
369             type => 'sse.comment',
370             comment => $comment,
371             });
372              
373 0         0 return $self;
374             }
375              
376 0     0 0 0 async sub try_send_comment {
377 0         0 my ($self, $comment) = @_;
378 0 0       0 return 0 if $self->is_closed;
379              
380 0         0 eval {
381 0 0       0 await $self->start unless $self->is_started;
382 0         0 await $self->{send}->({
383             type => 'sse.comment',
384             comment => $comment,
385             });
386             };
387 0 0       0 if ($@) {
388 0         0 $self->_set_closed;
389 0         0 return 0;
390             }
391 0         0 return 1;
392             }
393              
394 1     1 1 24 async sub try_send_event {
395 1         4 my ($self, %opts) = @_;
396 1 50       3 return 0 if $self->is_closed;
397              
398 1         2 eval {
399 1 50       2 await $self->start unless $self->is_started;
400              
401 1   50     4 my $data = $opts{data} // '';
402 1 50       2 if (ref $data) {
403 0         0 $data = JSON::MaybeXS::encode_json($data);
404             }
405              
406 1         3 my $event = {
407             type => 'sse.send',
408             data => $data,
409             };
410 1 50       4 $event->{event} = $opts{event} if defined $opts{event};
411 1 50       3 $event->{id} = "$opts{id}" if defined $opts{id};
412 1 50       3 $event->{retry} = int($opts{retry}) if defined $opts{retry};
413              
414 1         2 await $self->{send}->($event);
415             };
416 1 50       24 if ($@) {
417 0         0 $self->_set_closed;
418 0         0 return 0;
419             }
420 1         3 return 1;
421             }
422              
423             # Register close callback
424             sub on_close {
425 10     10 1 134 my ($self, $callback) = @_;
426 10         11 push @{$self->{_on_close}}, $callback;
  10         19  
427 10         14 return $self;
428             }
429              
430             # Register error callback
431             sub on_error {
432 0     0 1 0 my ($self, $callback) = @_;
433 0         0 push @{$self->{_on_error}}, $callback;
  0         0  
434 0         0 return $self;
435             }
436              
437             # Internal: run all on_close callbacks
438 6     6   7 async sub _run_close_callbacks {
439 6         6 my ($self) = @_;
440              
441             # Only run once
442 6 50       17 return if $self->{_close_callbacks_ran};
443 6         7 $self->{_close_callbacks_ran} = 1;
444              
445 6   100     14 my $reason = $self->{_disconnect_reason} // 'unknown';
446              
447 6         5 for my $cb (@{$self->{_on_close}}) {
  6         12  
448 8         8 eval {
449 8         10 my $r = $cb->($self, $reason);
450 8 100 66     49 if (blessed($r) && $r->isa('Future')) {
451 1         3 await $r;
452             }
453             };
454 8 50       56 if ($@) {
455 0         0 warn "PAGI::SSE on_close callback error: $@";
456             }
457             }
458             }
459              
460             # Close the connection
461             sub close {
462 1     1 1 5 my ($self) = @_;
463              
464 1 50       2 return $self if $self->is_closed;
465              
466 1         3 $self->_set_closed;
467 1         3 $self->_run_close_callbacks->get;
468              
469 1         20 return $self;
470             }
471              
472             # Wait for disconnect
473 5     5 1 48 async sub run {
474 5         7 my ($self) = @_;
475              
476 5 50       8 await $self->start unless $self->is_started;
477              
478 5         7 while (!$self->is_closed) {
479 5         10 my $event = await $self->{receive}->();
480 5   50     139 my $type = $event->{type} // '';
481              
482 5 50       10 if ($type eq 'sse.disconnect') {
483 5   50     17 $self->{_disconnect_reason} = $event->{reason} // 'client_closed';
484 5         11 $self->_set_closed;
485 5         9 await $self->_run_close_callbacks;
486 5         113 last;
487             }
488             }
489              
490 5         10 return;
491             }
492              
493             # Iterate over items and send events
494 3     3 1 93 async sub each {
495 3         5 my ($self, $source, $callback) = @_;
496              
497 3 50       6 await $self->start unless $self->is_started;
498              
499 3         4 my $index = 0;
500              
501             # Handle arrayref
502 3 100       9 if (ref $source eq 'ARRAY') {
    50          
503 2         3 for my $item (@$source) {
504 5 50       28 last if $self->is_closed;
505              
506 5         8 my $result = await $callback->($item, $index++);
507              
508             # If callback returns a hashref, treat as event spec
509 5 100       191 if (ref $result eq 'HASH') {
510 2         14 await $self->send_event(%$result);
511             }
512             }
513             }
514             # Handle coderef iterator
515             elsif (ref $source eq 'CODE') {
516 1         3 while (!$self->is_closed) {
517 4         7 my $item = $source->();
518 4 100       14 last unless defined $item;
519              
520 3         6 my $result = await $callback->($item, $index++);
521              
522 3 50       119 if (ref $result eq 'HASH') {
523 0         0 await $self->send_event(%$result);
524             }
525             }
526             }
527             else {
528 0         0 croak "each() requires arrayref or coderef, got " . ref($source);
529             }
530              
531 3         24 return $self;
532             }
533              
534             # Periodic callback execution with interval delay
535             # Requires Future::IO to be installed
536 1     1 1 3 async sub every {
537 1         4 my ($self, $interval, $callback) = @_;
538              
539 1 50 33     8 croak "every() requires interval" unless defined $interval && $interval > 0;
540 1 50       4 croak "every() requires callback coderef" unless ref $callback eq 'CODE';
541              
542             # Future::IO is required for every() - fail clearly if not available
543 1 50       3 eval { require Future::IO; 1 }
  1         1051  
  1         38293  
544             or croak "every() requires Future::IO to be installed. "
545             . "Install it with: cpanm Future::IO";
546              
547             # Future::IO must be configured with a backend
548 16     16   209 no warnings 'once';
  16         22  
  16         10488  
549 1 50       358 croak "Future::IO backend not configured. Add to your app.pl:\n"
550             . " use Future::IO::Impl::IOAsync;\n"
551             unless $Future::IO::IMPL;
552              
553 0 0         await $self->start unless $self->is_started;
554              
555             # Start background disconnect monitor
556 0           my $disconnect_future = $self->_watch_for_disconnect;
557              
558 0           while (!$self->is_closed) {
559             # Execute the callback
560 0           my $ok = eval { await $callback->(); 1 };
  0            
  0            
561 0 0         unless ($ok) {
562 0           my $err = $@;
563             # Callback failed - connection likely closed or error occurred
564 0           $self->_set_closed;
565 0           await $self->_run_close_callbacks;
566 0           last;
567             }
568              
569             # Race between sleep and disconnect detection
570 0           my $sleep_future = Future::IO->sleep($interval);
571 0           my $winner = await Future->wait_any($sleep_future, $disconnect_future);
572              
573             # If disconnect won, exit the loop
574 0 0         if ($self->is_closed) {
575 0 0 0       $sleep_future->cancel if $sleep_future->can('cancel') && !$sleep_future->is_ready;
576 0           last;
577             }
578             }
579              
580             # Clean up disconnect monitor if still running
581 0 0 0       $disconnect_future->cancel if $disconnect_future->can('cancel') && !$disconnect_future->is_ready;
582              
583 0           return $self;
584             }
585              
586             # Background future that completes when disconnect is detected
587             sub _watch_for_disconnect {
588 0     0     my ($self) = @_;
589              
590 0           my $receive = $self->{receive};
591 0           require Scalar::Util;
592 0           my $weak_self = $self;
593 0           Scalar::Util::weaken($weak_self);
594              
595 0     0     return (async sub {
596 0   0       while ($weak_self && !$weak_self->is_closed) {
597 0           my $event = eval { await $receive->() };
  0            
598 0 0         last unless $event;
599              
600 0   0       my $type = $event->{type} // '';
601 0 0         if ($type eq 'sse.disconnect') {
602 0 0         if ($weak_self) {
603 0   0       $weak_self->{_disconnect_reason} = $event->{reason} // 'client_closed';
604 0           $weak_self->_set_closed;
605 0           await $weak_self->_run_close_callbacks;
606             }
607 0           last;
608             }
609             }
610 0           })->();
611             }
612              
613             1;
614              
615             __END__