File Coverage

lib/PAGI/Test/SSE.pm
Criterion Covered Total %
statement 65 68 95.5
branch 14 24 58.3
condition 8 11 72.7
subroutine 14 14 100.0
pod 5 5 100.0
total 106 122 86.8


line stmt bran cond sub pod time code
1             package PAGI::Test::SSE;
2              
3 5     5   26 use strict;
  5         8  
  5         172  
4 5     5   16 use warnings;
  5         26  
  5         216  
5 5     5   17 use Future::AsyncAwait;
  5         5  
  5         34  
6 5     5   185 use Future;
  5         11  
  5         113  
7 5     5   17 use Carp qw(croak);
  5         4  
  5         5136  
8              
9              
10             sub new {
11 12     12 1 26 my ($class, %args) = @_;
12              
13 12 50       22 croak "app is required" unless $args{app};
14 12 50       19 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 12         48 recv_queue => [], # Events from app -> test
20             closed => 0,
21             started => 0,
22             }, $class;
23             }
24              
25             sub _start {
26 12     12   43 my ($self) = @_;
27              
28             # Create receive coderef for the app (always returns disconnect when closed)
29 2     2   50 my $receive = async sub {
30 2 50       4 if ($self->{closed}) {
31 0         0 return { type => 'sse.disconnect' };
32             }
33              
34             # SSE only receives disconnects from client, so we wait indefinitely
35             # until the connection is closed
36 2         2 my $future = Future->new;
37             # This future will be resolved when close() is called
38 2   50     8 push @{$self->{_pending_receives} //= []}, $future;
  2         7  
39 2         3 return await $future;
40 12         39 };
41              
42             # Create send coderef for the app
43 31     31   710 my $send = async sub {
44 31         31 my ($event) = @_;
45              
46 31 100       59 if ($event->{type} eq 'sse.start') {
    100          
47 12         15 $self->{started} = 1;
48 12   50     23 $self->{status} = $event->{status} // 200;
49 12   100     36 $self->{headers} = $event->{headers} // [];
50             }
51             elsif ($event->{type} eq 'sse.send') {
52 18         14 push @{$self->{recv_queue}}, $event;
  18         27  
53             }
54              
55 31         162 return;
56 12         23 };
57              
58             # Start the app future but don't block on it
59 12         46 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
60              
61             # Wait for sse.start (this should complete immediately)
62             # We need to let the app run until it starts
63 12         820 $self->_pump_app;
64              
65 12 50       18 croak "SSE connection not started" unless $self->{started};
66              
67 12         79 return $self;
68             }
69              
70             sub _pump_app {
71 24     24   31 my ($self) = @_;
72              
73             # If closed and there are pending receives, resolve them with disconnect
74 24 100 100     75 if ($self->{closed} && $self->{_pending_receives}) {
75 2         3 while (my $future = shift @{$self->{_pending_receives}}) {
  4         323  
76 2 50       5 $future->done({ type => 'sse.disconnect' }) unless $future->is_ready;
77             }
78             }
79             }
80              
81             sub receive_event {
82 14     14 1 1286 my ($self, %opts) = @_;
83 14   50     59 my $timeout = $opts{timeout} // 5;
84              
85             # Check if we have an event already waiting
86 14 50       12 if (@{$self->{recv_queue}}) {
  14         24  
87 14         11 my $event = shift @{$self->{recv_queue}};
  14         20  
88              
89             # Extract SSE event fields
90             return {
91             event => $event->{event},
92             data => $event->{data},
93             id => $event->{id},
94             retry => $event->{retry},
95 14         67 };
96             }
97              
98             # Check if connection closed
99 0 0       0 return undef if $self->{closed};
100              
101             # No event available yet
102 0         0 croak "Timeout waiting for SSE event";
103             }
104              
105             sub receive_json {
106 1     1 1 4 my ($self, %opts) = @_;
107              
108 1         2 my $event = $self->receive_event(%opts);
109 1 50       3 return undef unless defined $event;
110              
111 1         380 require JSON::MaybeXS;
112 1         7727 return JSON::MaybeXS::decode_json($event->{data});
113             }
114              
115             sub close {
116 12     12 1 348 my ($self) = @_;
117              
118 12 50       20 return if $self->{closed};
119              
120 12         14 $self->{closed} = 1;
121              
122             # Pump the app to let it process the disconnect
123 12         21 $self->_pump_app;
124              
125 12         22 return $self;
126             }
127              
128             sub is_closed {
129 11     11 1 12 my ($self) = @_;
130 11         41 return $self->{closed};
131             }
132              
133             1;
134              
135             __END__