File Coverage

blib/lib/PAGI/Endpoint/SSE.pm
Criterion Covered Total %
statement 39 40 97.5
branch 6 8 75.0
condition 1 2 50.0
subroutine 11 11 100.0
pod 3 5 60.0
total 60 66 90.9


line stmt bran cond sub pod time code
1             package PAGI::Endpoint::SSE;
2             $PAGI::Endpoint::SSE::VERSION = '0.002001';
3 4     4   365278 use strict;
  4         7  
  4         181  
4 4     4   22 use warnings;
  4         13  
  4         204  
5              
6 4     4   15 use Future::AsyncAwait;
  4         5  
  4         21  
7 4     4   191 use Carp qw(croak);
  4         7  
  4         2716  
8              
9             # Factory class method - override in subclass for customization
10 7     7 1 2937 sub context_class { 'PAGI::Context' }
11              
12             # Keepalive interval in seconds (0 = disabled)
13 2     2 1 1317 sub keepalive_interval { 0 }
14              
15             sub new {
16 4     4 0 247818 my ($class, %args) = @_;
17 4         13 return bless \%args, $class;
18             }
19              
20 3     3 0 5 async sub handle {
21 3         6 my ($self, $ctx) = @_;
22 3         13 my $sse = $ctx->sse;
23              
24             # Configure keepalive if specified. keepalive() is async (it sends an
25             # sse.keepalive event); await it so the send completes rather than being a
26             # fire-and-forget Future (PAGI applications must await all $send calls).
27 3         15 my $keepalive = $self->keepalive_interval;
28 3 100       12 if ($keepalive > 0) {
29 2         4 await $sse->keepalive($keepalive);
30             }
31              
32             # Register disconnect callback
33 3 100       67 if ($self->can('on_disconnect')) {
34             $sse->on_close(sub {
35 2     2   4 $self->on_disconnect($ctx);
36 2         8 });
37             }
38              
39             # Call on_connect if defined
40 3 50       11 if ($self->can('on_connect')) {
41 3         7 await $self->on_connect($ctx);
42             } else {
43             # Default: just start the stream
44 0         0 await $sse->start;
45             }
46              
47             # Wait for disconnect
48 3         146 await $sse->run;
49             }
50              
51             sub to_app {
52 5     5 1 186390 my ($class) = @_;
53 5         27 my $context_class = $class->context_class;
54              
55 3     3   44 return async sub {
56 3         8 my ($scope, $receive, $send) = @_;
57              
58 3   50     11 my $type = $scope->{type} // '';
59 3 50       10 croak "Expected sse scope, got '$type'" unless $type eq 'sse';
60              
61 3         20 require PAGI::Context;
62 3         14 my $endpoint = $class->new;
63 3         17 my $ctx = $context_class->new($scope, $receive, $send);
64              
65 3         15 await $endpoint->handle($ctx);
66 5         31 };
67             }
68              
69             1;
70              
71             __END__