File Coverage

lib/PAGI/Test/WebSocket.pm
Criterion Covered Total %
statement 116 124 93.5
branch 22 36 61.1
condition 16 28 57.1
subroutine 20 20 100.0
pod 11 11 100.0
total 185 219 84.4


line stmt bran cond sub pod time code
1             package PAGI::Test::WebSocket;
2              
3 5     5   25 use strict;
  5         8  
  5         185  
4 5     5   15 use warnings;
  5         7  
  5         219  
5 5     5   16 use Future::AsyncAwait;
  5         6  
  5         35  
6 5     5   212 use Future;
  5         10  
  5         131  
7 5     5   35 use Carp qw(croak);
  5         16  
  5         8948  
8              
9              
10             sub new {
11 14     14 1 35 my ($class, %args) = @_;
12              
13 14 50       29 croak "app is required" unless $args{app};
14 14 50       22 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 14         79 send_queue => [], # Messages from test -> app
20             recv_queue => [], # Messages from app -> test
21             closed => 0,
22             accepted => 0,
23             close_code => undef,
24             close_reason => '',
25             _pending_receives => [], # Pending receive futures
26             }, $class;
27             }
28              
29             sub _start {
30 14     14   15 my ($self) = @_;
31              
32             # Create receive coderef for the app
33 31     31   532 my $receive = async sub {
34             # First call returns websocket.connect
35 31 100       52 if (!$self->{_connect_sent}) {
36 12         15 $self->{_connect_sent} = 1;
37 12         67 return { type => 'websocket.connect' };
38             }
39              
40             # Return queued message if available
41 19 50       14 if (@{$self->{send_queue}}) {
  19         52  
42 0         0 return shift @{$self->{send_queue}};
  0         0  
43             }
44              
45             # Return disconnect if closed
46 19 50       30 if ($self->{closed}) {
47 0   0     0 return { type => 'websocket.disconnect', code => $self->{close_code} // 1000 };
48             }
49              
50             # Create a future that will be resolved when data arrives
51 19         29 my $future = Future->new;
52 19         92 push @{$self->{_pending_receives}}, $future;
  19         24  
53 19         34 return await $future;
54 14         49 };
55              
56             # Create send coderef for the app
57 28     28   1606 my $send = async sub {
58 28         58 my ($event) = @_;
59              
60 28 100       71 if ($event->{type} eq 'websocket.accept') {
    100          
    100          
61 14         23 $self->{accepted} = 1;
62             }
63             elsif ($event->{type} eq 'websocket.send') {
64 11         9 push @{$self->{recv_queue}}, $event;
  11         19  
65             }
66             elsif ($event->{type} eq 'websocket.close') {
67 2         4 $self->{closed} = 1;
68 2   50     5 $self->{close_code} = $event->{code} // 1000;
69 2   50     9 $self->{close_reason} = $event->{reason} // '';
70             }
71              
72 28         91 return;
73 14         34 };
74              
75             # Start the app future but don't block on it
76 14         75 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
77              
78             # Wait for acceptance (the first two awaits in the app should complete immediately)
79             # This is a bit hacky but works: we need to let the app run until it accepts
80 14         1158 $self->_pump_app;
81              
82 14 50       19 croak "WebSocket connection not accepted" unless $self->{accepted};
83              
84 14         36 return $self;
85             }
86              
87             sub _pump_app {
88 34     34   36 my ($self) = @_;
89              
90             # This pumps the app future by checking if it's waiting on a receive
91             # If there are pending receives and we have data, resolve them
92 34   100     30 while (@{$self->{_pending_receives}} && @{$self->{send_queue}}) {
  53         2616  
  38         76  
93 19         19 my $future = shift @{$self->{_pending_receives}};
  19         25  
94 19         17 my $event = shift @{$self->{send_queue}};
  19         19  
95 19         45 $future->done($event);
96             }
97              
98             # If closed and there are pending receives, resolve them with disconnect
99 34 50 66     109 if ($self->{closed} && @{$self->{_pending_receives}}) {
  14         41  
100 0         0 while (my $future = shift @{$self->{_pending_receives}}) {
  0         0  
101 0   0     0 $future->done({ type => 'websocket.disconnect', code => $self->{close_code} // 1000 });
102             }
103             }
104             }
105              
106             sub send_text {
107 8     8 1 36 my ($self, $text) = @_;
108              
109 8 100       170 croak "Cannot send on closed WebSocket" if $self->{closed};
110              
111 7         6 push @{$self->{send_queue}}, {
  7         17  
112             type => 'websocket.receive',
113             text => $text,
114             };
115              
116             # Pump the app to process this message
117 7         12 $self->_pump_app;
118              
119 7         11 return $self;
120             }
121              
122             sub send_bytes {
123 1     1 1 6 my ($self, $bytes) = @_;
124              
125 1 50       2 croak "Cannot send on closed WebSocket" if $self->{closed};
126              
127 1         1 push @{$self->{send_queue}}, {
  1         2  
128             type => 'websocket.receive',
129             bytes => $bytes,
130             };
131              
132             # Pump the app to process this message
133 1         2 $self->_pump_app;
134              
135 1         2 return $self;
136             }
137              
138             sub send_json {
139 1     1 1 6 my ($self, $data) = @_;
140              
141 1         411 require JSON::MaybeXS;
142 1         7823 my $text = JSON::MaybeXS::encode_json($data);
143              
144 1         3 return $self->send_text($text);
145             }
146              
147             sub receive_text {
148 11     11 1 40 my ($self, $timeout) = @_;
149 11   100     35 $timeout //= 5;
150              
151             # Check if we have a text message already waiting
152 11         12 for my $i (0 .. $#{$self->{recv_queue}}) {
  11         28  
153 10         14 my $event = $self->{recv_queue}[$i];
154 10 50 33     34 if ($event->{type} eq 'websocket.send' && exists $event->{text}) {
155 10         10 splice @{$self->{recv_queue}}, $i, 1;
  10         20  
156 10         45 return $event->{text};
157             }
158             }
159              
160             # Check if connection closed
161 1 50       3 return undef if $self->{closed};
162              
163             # No message available yet
164 1         96 croak "Timeout waiting for WebSocket text message";
165             }
166              
167             sub receive_bytes {
168 1     1 1 3 my ($self, $timeout) = @_;
169 1   50     5 $timeout //= 5;
170              
171             # Check if we have a bytes message waiting
172 1         1 for my $i (0 .. $#{$self->{recv_queue}}) {
  1         2  
173 1         2 my $event = $self->{recv_queue}[$i];
174 1 50 33     4 if ($event->{type} eq 'websocket.send' && exists $event->{bytes}) {
175 1         2 splice @{$self->{recv_queue}}, $i, 1;
  1         2  
176 1         26 return $event->{bytes};
177             }
178             }
179              
180             # Check if connection closed
181 0 0       0 return undef if $self->{closed};
182              
183             # No message available yet
184 0         0 croak "Timeout waiting for WebSocket bytes message";
185             }
186              
187             sub receive_json {
188 2     2 1 11 my ($self, $timeout) = @_;
189              
190 2         7 my $text = $self->receive_text($timeout);
191 2 50       5 return undef unless defined $text;
192              
193 2         14 require JSON::MaybeXS;
194 2         34 return JSON::MaybeXS::decode_json($text);
195             }
196              
197             sub close {
198 12     12 1 357 my ($self, $code, $reason) = @_;
199              
200 12 50       22 return if $self->{closed};
201              
202 12   100     40 $code //= 1000;
203 12   100     28 $reason //= '';
204              
205 12         11 $self->{closed} = 1;
206 12         15 $self->{close_code} = $code;
207 12         18 $self->{close_reason} = $reason;
208              
209             # Push disconnect event
210 12         11 push @{$self->{send_queue}}, {
  12         34  
211             type => 'websocket.disconnect',
212             code => $code,
213             reason => $reason,
214             };
215              
216             # Pump the app to let it process the disconnect
217 12         27 $self->_pump_app;
218              
219 12         29 return $self;
220             }
221              
222             sub close_code {
223 2     2 1 3 my ($self) = @_;
224 2         6 return $self->{close_code};
225             }
226              
227             sub close_reason {
228 1     1 1 2 my ($self) = @_;
229 1         3 return $self->{close_reason};
230             }
231              
232             sub is_closed {
233 14     14 1 20 my ($self) = @_;
234 14         76 return $self->{closed};
235             }
236              
237             1;
238              
239             __END__