File Coverage

lib/PAGI/Test/SSE.pm
Criterion Covered Total %
statement 65 68 95.5
branch 14 24 58.3
condition 8 11 72.7
subroutine 14 14 100.0
pod 5 5 100.0
total 106 122 86.8


line stmt bran cond sub pod time code
1             package PAGI::Test::SSE;
2             $PAGI::Test::SSE::VERSION = '0.002000';
3 5     5   27 use strict;
  5         6  
  5         172  
4 5     5   15 use warnings;
  5         7  
  5         235  
5 5     5   19 use Future::AsyncAwait;
  5         5  
  5         33  
6 5     5   202 use Future;
  5         6  
  5         106  
7 5     5   16 use Carp qw(croak);
  5         5  
  5         5366  
8              
9              
10             sub new {
11 12     12 1 27 my ($class, %args) = @_;
12              
13 12 50       23 croak "app is required" unless $args{app};
14 12 50       21 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 12         52 recv_queue => [], # Events from app -> test
20             closed => 0,
21             started => 0,
22             }, $class;
23             }
24              
25             sub _start {
26 12     12   16 my ($self) = @_;
27              
28             # Create receive coderef for the app (always returns disconnect when closed)
29 2     2   47 my $receive = async sub {
30 2 50       3 if ($self->{closed}) {
31 0         0 return { type => 'sse.disconnect' };
32             }
33              
34             # SSE only receives disconnects from client, so we wait indefinitely
35             # until the connection is closed
36 2         4 my $future = Future->new;
37             # This future will be resolved when close() is called
38 2   50     7 push @{$self->{_pending_receives} //= []}, $future;
  2         6  
39 2         3 return await $future;
40 12         43 };
41              
42             # Create send coderef for the app
43 31     31   803 my $send = async sub {
44 31         50 my ($event) = @_;
45              
46 31 100       66 if ($event->{type} eq 'sse.start') {
    100          
47 12         19 $self->{started} = 1;
48 12   50     30 $self->{status} = $event->{status} // 200;
49 12   100     31 $self->{headers} = $event->{headers} // [];
50             }
51             elsif ($event->{type} eq 'sse.send') {
52 18         16 push @{$self->{recv_queue}}, $event;
  18         26  
53             }
54              
55 31         488 return;
56 12         30 };
57              
58             # Start the app future but don't block on it
59 12         47 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
60              
61             # Wait for sse.start (this should complete immediately)
62             # We need to let the app run until it starts
63 12         691 $self->_pump_app;
64              
65 12 50       18 croak "SSE connection not started" unless $self->{started};
66              
67 12         116 return $self;
68             }
69              
70             sub _pump_app {
71 24     24   27 my ($self) = @_;
72              
73             # If closed and there are pending receives, resolve them with disconnect
74 24 100 100     69 if ($self->{closed} && $self->{_pending_receives}) {
75 2         2 while (my $future = shift @{$self->{_pending_receives}}) {
  4         361  
76 2 50       5 $future->done({ type => 'sse.disconnect' }) unless $future->is_ready;
77             }
78             }
79             }
80              
81             sub receive_event {
82 14     14 1 1343 my ($self, %opts) = @_;
83 14   50     41 my $timeout = $opts{timeout} // 5;
84              
85             # Check if we have an event already waiting
86 14 50       12 if (@{$self->{recv_queue}}) {
  14         26  
87 14         15 my $event = shift @{$self->{recv_queue}};
  14         18  
88              
89             # Extract SSE event fields
90             return {
91             event => $event->{event},
92             data => $event->{data},
93             id => $event->{id},
94             retry => $event->{retry},
95 14         68 };
96             }
97              
98             # Check if connection closed
99 0 0       0 return undef if $self->{closed};
100              
101             # No event available yet
102 0         0 croak "Timeout waiting for SSE event";
103             }
104              
105             sub receive_json {
106 1     1 1 7 my ($self, %opts) = @_;
107              
108 1         2 my $event = $self->receive_event(%opts);
109 1 50       3 return undef unless defined $event;
110              
111 1         433 require JSON::MaybeXS;
112 1         8292 return JSON::MaybeXS::decode_json($event->{data});
113             }
114              
115             sub close {
116 12     12 1 325 my ($self) = @_;
117              
118 12 50       21 return if $self->{closed};
119              
120 12         22 $self->{closed} = 1;
121              
122             # Pump the app to let it process the disconnect
123 12         22 $self->_pump_app;
124              
125 12         20 return $self;
126             }
127              
128             sub is_closed {
129 11     11 1 15 my ($self) = @_;
130 11         40 return $self->{closed};
131             }
132              
133             1;
134              
135             __END__