File Coverage

lib/PAGI/Test/SSE.pm
Criterion Covered Total %
statement 65 68 95.5
branch 14 24 58.3
condition 8 11 72.7
subroutine 14 14 100.0
pod 5 5 100.0
total 106 122 86.8


line stmt bran cond sub pod time code
1             package PAGI::Test::SSE;
2             $PAGI::Test::SSE::VERSION = '0.002001';
3 5     5   28 use strict;
  5         8  
  5         201  
4 5     5   19 use warnings;
  5         6  
  5         257  
5 5     5   21 use Future::AsyncAwait;
  5         9  
  5         32  
6 5     5   268 use Future;
  5         5  
  5         122  
7 5     5   19 use Carp qw(croak);
  5         7  
  5         5792  
8              
9              
10             sub new {
11 12     12 1 40 my ($class, %args) = @_;
12              
13 12 50       25 croak "app is required" unless $args{app};
14 12 50       94 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 12         68 recv_queue => [], # Events from app -> test
20             closed => 0,
21             started => 0,
22             }, $class;
23             }
24              
25             sub _start {
26 12     12   38 my ($self) = @_;
27              
28             # Create receive coderef for the app (always returns disconnect when closed)
29 2     2   65 my $receive = async sub {
30 2 50       5 if ($self->{closed}) {
31 0         0 return { type => 'sse.disconnect' };
32             }
33              
34             # SSE only receives disconnects from client, so we wait indefinitely
35             # until the connection is closed
36 2         4 my $future = Future->new;
37             # This future will be resolved when close() is called
38 2   50     10 push @{$self->{_pending_receives} //= []}, $future;
  2         23  
39 2         6 return await $future;
40 12         50 };
41              
42             # Create send coderef for the app
43 31     31   891 my $send = async sub {
44 31         63 my ($event) = @_;
45              
46 31 100       79 if ($event->{type} eq 'sse.start') {
    100          
47 12         21 $self->{started} = 1;
48 12   50     32 $self->{status} = $event->{status} // 200;
49 12   100     43 $self->{headers} = $event->{headers} // [];
50             }
51             elsif ($event->{type} eq 'sse.send') {
52 18         21 push @{$self->{recv_queue}}, $event;
  18         30  
53             }
54              
55 31         164 return;
56 12         35 };
57              
58             # Start the app future but don't block on it
59 12         60 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
60              
61             # Wait for sse.start (this should complete immediately)
62             # We need to let the app run until it starts
63 12         838 $self->_pump_app;
64              
65 12 50       26 croak "SSE connection not started" unless $self->{started};
66              
67 12         98 return $self;
68             }
69              
70             sub _pump_app {
71 24     24   34 my ($self) = @_;
72              
73             # If closed and there are pending receives, resolve them with disconnect
74 24 100 100     85 if ($self->{closed} && $self->{_pending_receives}) {
75 2         2 while (my $future = shift @{$self->{_pending_receives}}) {
  4         489  
76 2 50       5 $future->done({ type => 'sse.disconnect' }) unless $future->is_ready;
77             }
78             }
79             }
80              
81             sub receive_event {
82 14     14 1 1375 my ($self, %opts) = @_;
83 14   50     52 my $timeout = $opts{timeout} // 5;
84              
85             # Check if we have an event already waiting
86 14 50       18 if (@{$self->{recv_queue}}) {
  14         34  
87 14         16 my $event = shift @{$self->{recv_queue}};
  14         25  
88              
89             # Extract SSE event fields
90             return {
91             event => $event->{event},
92             data => $event->{data},
93             id => $event->{id},
94             retry => $event->{retry},
95 14         85 };
96             }
97              
98             # Check if connection closed
99 0 0       0 return undef if $self->{closed};
100              
101             # No event available yet
102 0         0 croak "Timeout waiting for SSE event";
103             }
104              
105             sub receive_json {
106 1     1 1 4 my ($self, %opts) = @_;
107              
108 1         3 my $event = $self->receive_event(%opts);
109 1 50       2 return undef unless defined $event;
110              
111 1         432 require JSON::MaybeXS;
112 1         7941 return JSON::MaybeXS::decode_json($event->{data});
113             }
114              
115             sub close {
116 12     12 1 346 my ($self) = @_;
117              
118 12 50       28 return if $self->{closed};
119              
120 12         17 $self->{closed} = 1;
121              
122             # Pump the app to let it process the disconnect
123 12         28 $self->_pump_app;
124              
125 12         30 return $self;
126             }
127              
128             sub is_closed {
129 11     11 1 42 my ($self) = @_;
130 11         86 return $self->{closed};
131             }
132              
133             1;
134              
135             __END__