File Coverage

lib/PAGI/Test/SSE.pm
Criterion Covered Total %
statement 65 68 95.5
branch 14 24 58.3
condition 8 11 72.7
subroutine 14 14 100.0
pod 5 5 100.0
total 106 122 86.8


line stmt bran cond sub pod time code
1             package PAGI::Test::SSE;
2              
3 5     5   27 use strict;
  5         8  
  5         182  
4 5     5   13 use warnings;
  5         6  
  5         263  
5 5     5   19 use Future::AsyncAwait;
  5         7  
  5         39  
6 5     5   202 use Future;
  5         11  
  5         119  
7 5     5   16 use Carp qw(croak);
  5         6  
  5         5245  
8              
9              
10             sub new {
11 12     12 1 71 my ($class, %args) = @_;
12              
13 12 50       31 croak "app is required" unless $args{app};
14 12 50       21 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 12         123 recv_queue => [], # Events from app -> test
20             closed => 0,
21             started => 0,
22             }, $class;
23             }
24              
25             sub _start {
26 12     12   18 my ($self) = @_;
27              
28             # Create receive coderef for the app (always returns disconnect when closed)
29 2     2   48 my $receive = async sub {
30 2 50       3 if ($self->{closed}) {
31 0         0 return { type => 'sse.disconnect' };
32             }
33              
34             # SSE only receives disconnects from client, so we wait indefinitely
35             # until the connection is closed
36 2         3 my $future = Future->new;
37             # This future will be resolved when close() is called
38 2   50     19 push @{$self->{_pending_receives} //= []}, $future;
  2         8  
39 2         3 return await $future;
40 12         72 };
41              
42             # Create send coderef for the app
43 31     31   881 my $send = async sub {
44 31         41 my ($event) = @_;
45              
46 31 100       77 if ($event->{type} eq 'sse.start') {
    100          
47 12         22 $self->{started} = 1;
48 12   50     30 $self->{status} = $event->{status} // 200;
49 12   100     51 $self->{headers} = $event->{headers} // [];
50             }
51             elsif ($event->{type} eq 'sse.send') {
52 18         15 push @{$self->{recv_queue}}, $event;
  18         36  
53             }
54              
55 31         157 return;
56 12         34 };
57              
58             # Start the app future but don't block on it
59 12         54 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
60              
61             # Wait for sse.start (this should complete immediately)
62             # We need to let the app run until it starts
63 12         1041 $self->_pump_app;
64              
65 12 50       29 croak "SSE connection not started" unless $self->{started};
66              
67 12         104 return $self;
68             }
69              
70             sub _pump_app {
71 24     24   31 my ($self) = @_;
72              
73             # If closed and there are pending receives, resolve them with disconnect
74 24 100 100     89 if ($self->{closed} && $self->{_pending_receives}) {
75 2         2 while (my $future = shift @{$self->{_pending_receives}}) {
  4         350  
76 2 50       5 $future->done({ type => 'sse.disconnect' }) unless $future->is_ready;
77             }
78             }
79             }
80              
81             sub receive_event {
82 14     14 1 1368 my ($self, %opts) = @_;
83 14   50     54 my $timeout = $opts{timeout} // 5;
84              
85             # Check if we have an event already waiting
86 14 50       13 if (@{$self->{recv_queue}}) {
  14         35  
87 14         16 my $event = shift @{$self->{recv_queue}};
  14         21  
88              
89             # Extract SSE event fields
90             return {
91             event => $event->{event},
92             data => $event->{data},
93             id => $event->{id},
94             retry => $event->{retry},
95 14         89 };
96             }
97              
98             # Check if connection closed
99 0 0       0 return undef if $self->{closed};
100              
101             # No event available yet
102 0         0 croak "Timeout waiting for SSE event";
103             }
104              
105             sub receive_json {
106 1     1 1 4 my ($self, %opts) = @_;
107              
108 1         4 my $event = $self->receive_event(%opts);
109 1 50       3 return undef unless defined $event;
110              
111 1         401 require JSON::MaybeXS;
112 1         7980 return JSON::MaybeXS::decode_json($event->{data});
113             }
114              
115             sub close {
116 12     12 1 322 my ($self) = @_;
117              
118 12 50       30 return if $self->{closed};
119              
120 12         19 $self->{closed} = 1;
121              
122             # Pump the app to let it process the disconnect
123 12         28 $self->_pump_app;
124              
125 12         29 return $self;
126             }
127              
128             sub is_closed {
129 11     11 1 18 my ($self) = @_;
130 11         52 return $self->{closed};
131             }
132              
133             1;
134              
135             __END__