File Coverage

blib/lib/PAGI/Server/Connection.pm
Criterion Covered Total %
statement 1434 1784 80.3
branch 614 1066 57.6
condition 259 474 54.6
subroutine 121 131 92.3
pod 0 2 0.0
total 2428 3457 70.2


line stmt bran cond sub pod time code
1             package PAGI::Server::Connection;
2 87     87   3257527 use strict;
  87         166  
  87         2956  
3 87     87   314 use warnings;
  87         125  
  87         3599  
4 87     87   748 use Future;
  87         9207  
  87         1335  
5 87     87   1503 use Future::AsyncAwait;
  87         9346  
  87         412  
6 87     87   3999 use Scalar::Util qw(weaken refaddr);
  87         127  
  87         3890  
7 87     87   34270 use Protocol::WebSocket::Handshake::Server;
  87         1933153  
  87         3757  
8 87     87   610 use Protocol::WebSocket::Frame;
  87         135  
  87         1752  
9 87     87   336 use Digest::SHA qw(sha1_base64);
  87         154  
  87         6774  
10 87     87   357 use Encode;
  87         136  
  87         5689  
11 87     87   18604 use URI::Escape qw(uri_unescape);
  87         78770  
  87         4624  
12 87     87   19648 use IO::Async::Timer::Countdown;
  87         58099  
  87         2912  
13 87     87   5608 use IO::Async::Timer::Periodic;
  87         15752  
  87         2437  
14 87     87   339 use Time::HiRes qw(gettimeofday tv_interval);
  87         107  
  87         1061  
15 87     87   41983 use PAGI::Server::AsyncFile;
  87         273  
  87         5529  
16 87     87   38436 use PAGI::Server::ConnectionState;
  87         243  
  87         4256  
17              
18              
19 87     87   557 use constant FILE_CHUNK_SIZE => 65536; # 64KB chunks for file streaming
  87         124  
  87         85485  
20              
21             # Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
22             my $_cached_log_timestamp;
23             my $_cached_log_time = 0;
24              
25             # =============================================================================
26             # Unrecognized Event Type Handler (PAGI spec compliance)
27             # =============================================================================
28             # Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"
29              
30             sub _unrecognized_event_type {
31 0     0   0 my ($type, $protocol) = @_;
32 0         0 die "Unrecognized event type '$type' for $protocol protocol\n";
33             }
34              
35             # =============================================================================
36             # Header Validation (CRLF Injection Prevention)
37             # =============================================================================
38             # RFC 7230 Section 3.2.6: Field values MUST NOT contain CR or LF
39              
40             sub _validate_header_value {
41 37     37   79 my ($value) = @_;
42              
43 37 100       152 if ($value =~ /[\r\n\0]/) {
44 1         10 die "Invalid header value: contains CR, LF, or null byte\n";
45             }
46 36         273 return $value;
47             }
48              
49             sub _validate_header_name {
50 37     37   136 my ($name) = @_;
51              
52 37 50       202 if ($name =~ /[\r\n\0]/) {
53 0         0 die "Invalid header name: contains CR, LF, or null byte\n";
54             }
55 37 50       157 if ($name =~ /[[:cntrl:]]/) {
56 0         0 die "Invalid header name: contains control characters\n";
57             }
58 37         157 return $name;
59             }
60              
61             # RFC 6455 Section 11.3.4: Subprotocol must be a token (no whitespace, separators)
62             sub _validate_subprotocol {
63 1     1   2 my ($value) = @_;
64              
65 1 50       5 if ($value =~ /[\r\n\0\s]/) {
66 1         12 die "Invalid subprotocol: contains CR, LF, null, or whitespace\n";
67             }
68             # Token characters only (roughly)
69 0 0       0 if ($value !~ /^[\w\-\.]+$/) {
70 0         0 die "Invalid subprotocol: must be alphanumeric, dash, underscore, or dot\n";
71             }
72 0         0 return $value;
73             }
74              
75             =head1 NAME
76              
77             PAGI::Server::Connection - Per-connection state machine
78              
79             =head1 SYNOPSIS
80              
81             # Internal use by PAGI::Server
82             my $conn = PAGI::Server::Connection->new(
83             stream => $stream,
84             app => $app,
85             protocol => $protocol,
86             server => $server,
87             extensions => {},
88             );
89             $conn->start;
90              
91             =head1 DESCRIPTION
92              
93             PAGI::Server::Connection manages the state machine for a single client
94             connection. It handles:
95              
96             =over 4
97              
98             =item * Request parsing via Protocol::HTTP1
99              
100             =item * Scope creation for the application
101              
102             =item * Event queue management for $receive and $send
103              
104             =item * Protocol upgrades (WebSocket, SSE)
105              
106             =item * SSE over HTTP/1.1 and HTTP/2
107              
108             =item * Connection lifecycle and cleanup
109              
110             =back
111              
112             =cut
113              
114             sub new {
115 324     324 0 25869 my ($class, %args) = @_;
116              
117             my $self = bless {
118             stream => $args{stream},
119             app => $args{app},
120             protocol => $args{protocol},
121             server => $args{server},
122             extensions => $args{extensions} // {},
123             state => $args{state} // {},
124             tls_enabled => $args{tls_enabled} // 0,
125             timeout => $args{timeout} // 60, # Idle timeout in seconds
126             request_timeout => $args{request_timeout} // 0, # Request stall timeout in seconds (0 = disabled, default for performance)
127             ws_idle_timeout => $args{ws_idle_timeout} // 0, # WebSocket idle timeout (0 = disabled)
128             sse_idle_timeout => $args{sse_idle_timeout} // 0, # SSE idle timeout (0 = disabled)
129             max_body_size => $args{max_body_size}, # 0 = unlimited
130             access_log => $args{access_log}, # Filehandle for access logging
131             _access_log_formatter => $args{_access_log_formatter}, # Pre-compiled format closure
132             max_receive_queue => $args{max_receive_queue} // 1000, # Max WebSocket receive queue size
133             max_ws_frame_size => $args{max_ws_frame_size} // 65536, # Max WebSocket frame size in bytes
134             sync_file_threshold => $args{sync_file_threshold} // 65536, # Threshold for sync file reads (default 64KB)
135             validate_events => $args{validate_events} // 0, # Dev-mode event validation (0 = disabled)
136             # Send-side backpressure (watermarks in bytes)
137             # Defaults match Python asyncio: 64KB high, 16KB low (high/4)
138             write_high_watermark => $args{write_high_watermark} // 65536, # 64KB - pause sending above this
139             write_low_watermark => $args{write_low_watermark} // 16384, # 16KB - resume sending below this
140             _drain_waiters => [], # Pending Futures waiting for buffer drain
141             _drain_check_active => 0, # Flag to prevent redundant on_outgoing_empty setup
142             tls_info => undef, # Populated on first request if TLS
143             buffer => '',
144             closed => 0,
145             response_started => 0,
146             response_status => undef, # Track response status for logging
147             _response_size => 0, # Track response body bytes for logging
148             request_start => undef, # Track request start time for logging
149             idle_timer => undef, # IO::Async::Timer for idle timeout
150             stall_timer => undef, # IO::Async::Timer for request stall timeout
151             ws_idle_timer => undef, # IO::Async::Timer for WebSocket idle timeout
152             sse_idle_timer => undef, # IO::Async::Timer for SSE idle timeout
153             # Event queue for $receive
154             receive_queue => [],
155             receive_pending => undef,
156             # Track all pending receive Futures to cancel on close
157             receive_futures => [],
158             # Track request handling Future to prevent "lost future" warning
159             request_future => undef,
160             # Idempotency guard for disconnect handling
161             _disconnect_handled => 0,
162             # WebSocket state
163             websocket_mode => 0,
164             websocket_frame => undef, # Protocol::WebSocket::Frame for parsing
165             websocket_accepted => 0,
166             # SSE state
167             sse_mode => 0,
168             sse_started => 0,
169             sse_disconnect_reason => undef, # Reason for SSE disconnect (client_closed, write_error, etc.)
170             # Keepalive state (protocol-level ping/pong for WebSocket, comments for SSE)
171             ws_keepalive_timer => undef, # Periodic timer for sending WebSocket pings
172             ws_pong_timeout => undef, # Timeout timer for pong response
173             ws_waiting_pong => 0, # Flag: are we waiting for a pong?
174             ws_keepalive_interval => 0, # Current keepalive interval (0 = disabled)
175             ws_keepalive_timeout => 0, # Current pong timeout (0 = no timeout check)
176             sse_keepalive_timer => undef, # Periodic timer for sending SSE keepalive comments
177             sse_keepalive_comment => '', # Comment text to send
178             # HTTP/2 state
179             alpn_protocol => $args{alpn_protocol}, # ALPN-negotiated protocol (e.g. 'h2', 'http/1.1')
180             h2_protocol => $args{h2_protocol}, # PAGI::Server::Protocol::HTTP2 instance
181             h2c_enabled => $args{h2c_enabled} // 0, # Allow h2c preface detection on cleartext
182             is_h2 => 0, # Set during start() if HTTP/2 detected
183             h2_session => undef, # PAGI::Server::Protocol::HTTP2::Session
184             h2_streams => {}, # Per-stream state for HTTP/2
185             # Transport info (tcp or unix)
186             transport_type => $args{transport_type} // 'tcp',
187             transport_path => $args{transport_path}, # socket path for unix
188             # Cached connection info (populated in start(), used by _create_scope)
189 324   100     27141 client_host => '127.0.0.1',
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
190             client_port => 0,
191             server_host => '127.0.0.1',
192             server_port => 5000,
193             }, $class;
194              
195             # Extract TLS info if this is a TLS connection
196 324 100       1439 if ($self->{tls_enabled}) {
197 44         325 $self->_extract_tls_info;
198             }
199              
200 324         7660 return $self;
201             }
202              
203 87     87   654 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  87         183  
  87         2080524  
204              
205             sub start {
206 322     322 0 14676 my ($self) = @_;
207              
208 322         663 my $stream = $self->{stream};
209 322         682 weaken(my $weak_self = $self);
210              
211             # Enable TCP_NODELAY to reduce latency for small responses (TCP only)
212 322   33     1168 my $handle = $stream->write_handle // $stream->read_handle;
213 322 100 66     4976 if ($self->{transport_type} eq 'tcp' && $handle && $handle->can('setsockopt')) {
      100        
214 256         454 eval {
215 256         1170 $handle->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
216             };
217             # Ignore errors - not all sockets support this
218             }
219              
220             # Cache connection info once (avoids per-request socket method calls)
221 322 100 66     5935 if ($self->{transport_type} eq 'unix') {
    100          
222             # Unix socket: no peer IP/port, server is identified by path
223 2         38 $self->{client_host} = undef;
224 2         26 $self->{client_port} = undef;
225 2         11 $self->{server_host} = $self->{transport_path};
226 2         18 $self->{server_port} = undef;
227             } elsif ($handle && $handle->can('peerhost')) {
228 255         388 eval {
229 255   50     989 $self->{client_host} = $handle->peerhost // '127.0.0.1';
230 255   50     13712 $self->{client_port} = $handle->peerport // 0;
231 255   50     7469 $self->{server_host} = $handle->sockhost // '127.0.0.1';
232 255   50     7613 $self->{server_port} = $handle->sockport // 5000;
233             };
234             # Ignore errors - keep defaults if extraction fails
235             }
236              
237             # Detect HTTP/2 via ALPN negotiation
238 322 50 100     6985 if ($self->{alpn_protocol} && $self->{alpn_protocol} eq 'h2' && $self->{h2_protocol}) {
      66        
239 26         125 $self->_init_h2_session;
240             }
241              
242             # Set up idle timeout timer
243 322 50 33     2693 if ($self->{timeout} && $self->{timeout} > 0 && $self->{server}) {
      33        
244             my $timer = IO::Async::Timer::Countdown->new(
245             delay => $self->{timeout},
246             on_expire => sub {
247 0 0   0   0 return unless $weak_self;
248 0 0       0 return if $weak_self->{closed};
249             # Close idle connection
250 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
251             },
252 322         5294 );
253 322         24058 $self->{idle_timer} = $timer;
254 322         1424 $self->{server}->add_child($timer);
255 322         25032 $timer->start;
256             }
257              
258             # Set up read handler
259             $stream->configure(
260             on_read => sub {
261 551     551   17180110 my ($s, $buffref, $eof) = @_;
262 551 100       2647 return 0 unless $weak_self;
263              
264             # Reset idle timer on any read activity
265 549         3186 $weak_self->_reset_idle_timer;
266              
267             # Reset stall timer on read activity (if handling a request)
268 549 100       3657 $weak_self->_reset_stall_timer if $weak_self->{handling_request};
269              
270 549         1499 $weak_self->{buffer} .= $$buffref;
271 549         1123 $$buffref = '';
272              
273 549 100       1294 if ($eof) {
274             # EOF means client closed - handle disconnect and cleanup
275 13         123 $weak_self->_handle_disconnect_and_close('client_closed');
276 13         9789 return 0;
277             }
278              
279             # h2c detection: check if cleartext connection starts with HTTP/2 preface
280 536 100 66     1926 if ($weak_self->{h2c_enabled} && !$weak_self->{is_h2}) {
281 37 50       102 if (length($weak_self->{buffer}) >= 24) { # HTTP/2 preface is 24 bytes
282 37 100 66     310 if ($weak_self->{h2_protocol} && PAGI::Server::Protocol::HTTP2->detect_preface($weak_self->{buffer})) {
283 36         134 $weak_self->_init_h2_session;
284 36         91 $weak_self->{h2c_enabled} = 0; # Detection done
285             } else {
286 1         3 $weak_self->{h2c_enabled} = 0; # Not h2c, stop checking
287             }
288             } else {
289             # Not enough data yet to determine protocol, wait for more
290 0         0 return 0;
291             }
292             }
293              
294             # Wrap processing in eval to prevent exceptions from crashing the event loop
295             # This is critical - Protocol::WebSocket::Frame can throw exceptions for
296             # oversized payloads, and other parsing code may throw as well
297 536         853 eval {
298             # HTTP/2: feed data to session for frame processing
299 536 100       1459 if ($weak_self->{is_h2}) {
300 204         770 $weak_self->_h2_process_data;
301 204         460 return;
302             }
303              
304             # If in WebSocket mode, process WebSocket frames
305 332 100       937 if ($weak_self->{websocket_mode}) {
306 40         122 $weak_self->_process_websocket_frames;
307 39         781 return;
308             }
309              
310             # If we're waiting for body data, notify the receive handler
311 292 100 66     1214 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
312 12         54 my $f = $weak_self->{receive_pending};
313 12         18 $weak_self->{receive_pending} = undef;
314 12         25 $f->done;
315             }
316              
317 292         2038 $weak_self->_try_handle_request;
318             };
319 536 100       18973 if (my $error = $@) {
320             # Log the error and close the connection gracefully
321 1         38 warn "PAGI connection error: $error";
322 1 50       4 return 0 unless $weak_self;
323 1         5 $weak_self->_handle_disconnect_and_close('server_error');
324             }
325 536         2115 return 0;
326             },
327             on_closed => sub {
328 320 100   320   67276208 return unless $weak_self;
329             # Stream closed - handle disconnect and remove from connections hash
330 265         1278 $weak_self->_handle_disconnect_and_close('client_closed');
331             },
332 322         143707 );
333             }
334              
335             sub _reset_idle_timer {
336 549     549   1084 my ($self) = @_;
337              
338 549 100       1906 return unless $self->{idle_timer};
339 506         2618 $self->{idle_timer}->reset;
340 506 50       56846 $self->{idle_timer}->start unless $self->{idle_timer}->is_running;
341             }
342              
343             sub _stop_idle_timer {
344 357     357   772 my ($self) = @_;
345              
346 357 100       1196 return unless $self->{idle_timer};
347 322 50       1709 $self->{idle_timer}->stop if $self->{idle_timer}->is_running;
348             # Remove timer completely so _reset_idle_timer won't restart it
349             # This is important for long-lived connections (WebSocket, SSE)
350 322 50       14490 if ($self->{server}) {
351 322         1692 $self->{server}->remove_child($self->{idle_timer});
352             }
353 322         31093 $self->{idle_timer} = undef;
354             }
355              
356             # =============================================================================
357             # HTTP/2 Session Initialization
358             # =============================================================================
359              
360             sub _init_h2_session {
361 62     62   124 my ($self) = @_;
362              
363 62         138 $self->{is_h2} = 1;
364              
365 62         134 weaken(my $weak_self = $self);
366              
367             $self->{h2_session} = $self->{h2_protocol}->create_session(
368             on_request => sub {
369 66     66   184 my ($stream_id, $pseudo, $headers, $has_body) = @_;
370 66 50       174 return unless $weak_self;
371 66         338 $weak_self->_h2_on_request($stream_id, $pseudo, $headers, $has_body);
372             },
373             on_body => sub {
374 28     28   55 my ($stream_id, $data, $eof) = @_;
375 28 50       223 return unless $weak_self;
376 28         95 $weak_self->_h2_on_body($stream_id, $data, $eof);
377             },
378             on_close => sub {
379 43     43   88 my ($stream_id, $error_code) = @_;
380 43 50       95 return unless $weak_self;
381 43         169 $weak_self->_h2_on_close($stream_id, $error_code);
382             },
383 62         770 );
384              
385             # Send initial SETTINGS to client
386 62         284 $self->_h2_write_pending;
387             }
388              
389             sub _h2_process_data {
390 205     205   355 my ($self) = @_;
391 205 50       519 return unless $self->{h2_session};
392              
393 205 100       629 if (length($self->{buffer}) > 0) {
394 204         1115 $self->{h2_session}->feed($self->{buffer});
395 204         414 $self->{buffer} = '';
396             }
397              
398 205         544 $self->_h2_write_pending;
399              
400             # Close connection when session is done (GOAWAY received or sent)
401 205 100 66     1000 if ($self->{h2_session} && !$self->{h2_session}->want_read) {
402 2         5 $self->_h2_write_pending; # Flush any remaining output
403 2         10 $self->_handle_disconnect_and_close;
404             }
405             }
406              
407             sub _h2_write_pending {
408 438     438   799 my ($self) = @_;
409 438 50       980 return unless $self->{h2_session};
410 438         570 while (1) {
411 707         49534 my $data = $self->{h2_session}->extract;
412 707 100 66     3432 last unless defined $data && length($data) > 0;
413 269         1242 $self->{stream}->write($data);
414             }
415             }
416              
417             # =============================================================================
418             # HTTP/2 Stream Callbacks
419             # =============================================================================
420              
421             sub _h2_on_request {
422 67     67   905560 my ($self, $stream_id, $pseudo, $headers, $has_body) = @_;
423              
424             # Detect CONNECT method
425 67         122 my $is_websocket = 0;
426 67 100 50     323 if (($pseudo->{':method'} // '') eq 'CONNECT') {
427 14 100 100     73 if (($pseudo->{':protocol'} // '') eq 'websocket') {
428             # Extended CONNECT for WebSocket (RFC 8441)
429 13         37 $is_websocket = 1;
430             } else {
431             # Plain CONNECT not supported — defer response to avoid
432             # re-entrant nghttp2 calls (we're inside feed/mem_recv)
433 1         3 weaken(my $ws = $self);
434             $self->{server}->loop->later(sub {
435 1 50   1   105 return unless $ws;
436 1 50       5 return if $ws->{closed};
437 1         13 $ws->{h2_session}->submit_response($stream_id,
438             status => 501,
439             headers => [['content-type', 'text/plain']],
440             body => "CONNECT method not supported\n",
441             );
442 1         44 $ws->_h2_write_pending;
443 1         10 });
444 1         43 return;
445             }
446             }
447              
448             # Detect SSE (Accept: text/event-stream)
449 66         113 my $is_sse = 0;
450 66 100       192 if (!$is_websocket) {
451 53         117 for my $h (@$headers) {
452 61 100 100     326 if ($h->[0] eq 'accept' && $h->[1] =~ m{text/event-stream}) {
453 9         24 $is_sse = 1;
454 9         24 last;
455             }
456             }
457             }
458              
459             # Initialize per-stream state
460 66         1084 $self->{h2_streams}{$stream_id} = {
461             pseudo => $pseudo,
462             headers => $headers,
463             has_body => $has_body,
464             body => '',
465             body_complete => !$has_body,
466             body_pending => undef, # Future for body availability
467             receive_queue => [],
468             response_started => 0,
469             is_websocket => $is_websocket,
470             is_sse => $is_sse,
471             ws_accepted => 0,
472             ws_frame => undef, # Protocol::WebSocket::Frame for parsing
473             ws_connect_sent => 0,
474             };
475              
476             # Check Content-Length against max_body_size limit before dispatching
477             # (after stream init so _h2_on_body/_h2_on_close can find the stream)
478 66 100 100     362 if ($self->{max_body_size} && $has_body) {
479 6         14 for my $h (@$headers) {
480 12 100       34 if ($h->[0] eq 'content-length') {
481 1 50       5 if ($h->[1] > $self->{max_body_size}) {
482             # Defer response to avoid re-entrant nghttp2 calls
483 1         26 weaken(my $ws = $self);
484             $self->{server}->loop->later(sub {
485 1 50   1   53 return unless $ws;
486 1 50       4 return if $ws->{closed};
487 1         8 $ws->{h2_session}->submit_response($stream_id,
488             status => 413,
489             headers => [['content-type', 'text/plain']],
490             body => "Payload Too Large\n",
491             );
492 1         38 $ws->_h2_write_pending;
493 1         7 });
494 1         58 return;
495             }
496 0         0 last;
497             }
498             }
499             }
500              
501             # Defer dispatch to next event loop tick to prevent re-entrant nghttp2 calls
502 65         191 weaken(my $weak_self = $self);
503             $self->{server}->loop->later(sub {
504 65 50   65   3661 return unless $weak_self;
505 65 50       240 return if $weak_self->{closed};
506 65         343 $weak_self->_h2_dispatch_stream($stream_id);
507 65         319 });
508             }
509              
510             sub _h2_on_body {
511 28     28   61 my ($self, $stream_id, $data, $eof) = @_;
512              
513 28         55 my $stream = $self->{h2_streams}{$stream_id};
514 28 100       58 return unless $stream;
515              
516 27 50 66     95 if ($stream->{is_websocket} && $stream->{ws_accepted}) {
517             # WebSocket: DATA frames contain raw WebSocket frames
518 16 50       98 $self->_h2_process_ws_frames($stream_id, $stream, $data) if length($data);
519              
520 16 50       642 if ($eof) {
521             # END_STREAM = client closing the WebSocket stream
522 0         0 push @{$stream->{receive_queue}}, {
  0         0  
523             type => 'websocket.disconnect',
524             code => 1005,
525             reason => '',
526             };
527 0         0 $self->_h2_wake_pending($stream);
528             }
529 16         46 return;
530             }
531              
532 11 100       27 if (length($data) > 0) {
533 6         19 $stream->{body} .= $data;
534              
535             # Enforce max_body_size (0 = unlimited)
536 6 100 66     41 if ($self->{max_body_size} && length($stream->{body}) > $self->{max_body_size}) {
537 1         9 $self->{h2_session}->submit_response($stream_id,
538             status => 413,
539             headers => [['content-type', 'text/plain']],
540             body => 'Payload Too Large',
541             );
542             # No _h2_write_pending here — we're inside feed(); _h2_process_data
543             # flushes after feed() returns
544 1         37 delete $self->{h2_streams}{$stream_id};
545 1         5 return;
546             }
547             }
548              
549 10 100       17 if ($eof) {
550 5         9 $stream->{body_complete} = 1;
551             }
552              
553 10         18 $self->_h2_wake_pending($stream);
554             }
555              
556             sub _h2_wake_pending {
557 66     66   151 my ($self, $stream) = @_;
558 66 100 66     260 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
559 15         82 my $f = $stream->{body_pending};
560 15         46 $stream->{body_pending} = undef;
561 15         48 $f->done;
562             }
563             }
564              
565             sub _h2_on_close {
566 43     43   100 my ($self, $stream_id, $error_code) = @_;
567              
568 43         119 my $stream = $self->{h2_streams}{$stream_id};
569 43 100       159 return unless $stream;
570              
571             # Mark body complete to unblock any pending receive
572 41         95 $stream->{body_complete} = 1;
573              
574             # Enqueue disconnect event
575 41 50       121 if ($stream->{is_websocket}) {
    50          
576 0         0 push @{$stream->{receive_queue}}, {
  0         0  
577             type => 'websocket.disconnect',
578             code => 1006,
579             reason => '',
580             };
581             } elsif ($stream->{is_sse}) {
582 0         0 push @{$stream->{receive_queue}}, {
  0         0  
583             type => 'sse.disconnect',
584             reason => 'client_closed',
585             };
586             } else {
587 41         61 push @{$stream->{receive_queue}}, { type => 'http.disconnect' };
  41         162  
588             }
589              
590 41         149 $self->_h2_wake_pending($stream);
591              
592             # Clean up after a delay (let any pending futures resolve)
593 41         104 weaken(my $weak_self = $self);
594             $self->{server}->loop->later(sub {
595 41 50   41   20179 return unless $weak_self;
596 41         306 delete $weak_self->{h2_streams}{$stream_id};
597 41         183 });
598             }
599              
600             # =============================================================================
601             # HTTP/2 Stream Dispatch (scope/receive/send creation)
602             # =============================================================================
603              
604             sub _h2_dispatch_stream {
605 65     65   171 my ($self, $stream_id) = @_;
606              
607 65         147 my $stream_state = $self->{h2_streams}{$stream_id};
608 65 100       146 return unless $stream_state;
609              
610 64         130 my ($scope, $receive, $send);
611              
612 64 100       276 if ($stream_state->{is_websocket}) {
    100          
613 13         56 $scope = $self->_h2_create_websocket_scope($stream_id, $stream_state);
614 13         49 $receive = $self->_h2_create_websocket_receive($stream_id, $stream_state);
615 13         52 $send = $self->_h2_create_websocket_send($stream_id, $stream_state);
616             } elsif ($stream_state->{is_sse}) {
617 9         50 $scope = $self->_h2_create_sse_scope($stream_id, $stream_state);
618 9         40 $receive = $self->_h2_create_sse_receive($stream_id, $stream_state);
619 9         55 $send = $self->_h2_create_sse_send($stream_id, $stream_state);
620             } else {
621 42         177 $scope = $self->_h2_create_scope($stream_id, $stream_state);
622 42         191 $receive = $self->_h2_create_receive($stream_id, $stream_state);
623 42         164 $send = $self->_h2_create_send($stream_id, $stream_state);
624             }
625              
626 64         157 weaken(my $weak_self = $self);
627              
628 64     64   112 my $future = (async sub {
629 64         131 eval {
630 64         330 await $weak_self->{app}->($scope, $receive, $send);
631             };
632 64 100       5322 if (my $error = $@) {
633 2         69 warn "PAGI application error (HTTP/2 stream $stream_id): $error\n";
634 2 50 33     19 if ($weak_self && !$stream_state->{response_started}) {
635 2         4 eval {
636 2         12 $weak_self->{h2_session}->submit_response($stream_id,
637             status => 500,
638             headers => [['content-type', 'text/plain']],
639             body => "Internal Server Error\n",
640             );
641 2         65 $weak_self->_h2_write_pending;
642             };
643             }
644             }
645              
646             # Notify server that request completed (for max_requests tracking)
647 64 50 33     663 $weak_self->{server}->_on_request_complete if $weak_self && $weak_self->{server};
648 64         316 })->();
649              
650 64         4354 $self->{server}->adopt_future($future);
651             }
652              
653             sub _h2_create_scope {
654 42     42   96 my ($self, $stream_id, $stream_state) = @_;
655              
656 42         79 my $pseudo = $stream_state->{pseudo};
657 42         78 my $headers = $stream_state->{headers};
658              
659             # Parse path and query string from :path pseudo-header
660 42   50     137 my $full_path = $pseudo->{':path'} // '/';
661 42         188 my ($path, $query_string) = split(/\?/, $full_path, 2);
662 42   100     227 $query_string //= '';
663              
664             # Decode percent-encoded path for scope (keep raw_path as-is)
665             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
666 42         241 my $unescaped = uri_unescape($path);
667 42   33     452 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  42         635  
668             // $unescaped;
669              
670 42         3095 my $connection_state = PAGI::Server::ConnectionState->new(
671             connection => $self,
672             );
673              
674             return {
675             type => 'http',
676             pagi => {
677             version => '0.2',
678             spec_version => '0.2',
679             features => {},
680             },
681             http_version => '2',
682             method => $pseudo->{':method'} // 'GET',
683             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
684             path => $decoded_path,
685             raw_path => $path,
686             query_string => $query_string,
687             root_path => '',
688             headers => $headers,
689             (defined $self->{client_host}
690             ? (client => [$self->{client_host}, $self->{client_port}])
691             : ()
692             ),
693             server => [$self->{server_host}, $self->{server_port}],
694 42 50 50     584 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  42 50 33     308  
  0         0  
695             extensions => $self->_get_extensions_for_scope,
696             'pagi.connection' => $connection_state,
697             };
698             }
699              
700             sub _h2_create_receive {
701 42     42   105 my ($self, $stream_id, $stream_state) = @_;
702              
703 42         85 weaken(my $weak_self = $self);
704              
705             return sub {
706 40 50   40   350 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
707 40 50       138 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
708              
709 40         84 my $ss = $weak_self->{h2_streams}{$stream_id};
710 40 50       110 return Future->done({ type => 'http.disconnect' }) unless $ss;
711              
712 40         65 my $future = (async sub {
713 40 50       142 return { type => 'http.disconnect' } unless $weak_self;
714              
715 40         69 my $ss = $weak_self->{h2_streams}{$stream_id};
716 40 50       84 return { type => 'http.disconnect' } unless $ss;
717              
718             # Check queue first
719 40 50       59 if (@{$ss->{receive_queue}}) {
  40         141  
720 0         0 return shift @{$ss->{receive_queue}};
  0         0  
721             }
722              
723             # If body is already complete, return final body event
724 40 50       113 if ($ss->{body_complete}) {
725 40         88 my $body = $ss->{body};
726 40         95 $ss->{body} = '';
727             return {
728 40         638 type => 'http.request',
729             body => $body,
730             more => 0,
731             };
732             }
733              
734             # Wait for body data
735 0 0       0 if (!$ss->{body_pending}) {
736 0         0 $ss->{body_pending} = Future->new;
737             }
738 0         0 await $ss->{body_pending};
739              
740             # Re-fetch stream state (may have changed)
741 0         0 $ss = $weak_self->{h2_streams}{$stream_id};
742 0 0       0 return { type => 'http.disconnect' } unless $ss;
743              
744             # Check queue after waking
745 0 0       0 if (@{$ss->{receive_queue}}) {
  0         0  
746 0         0 return shift @{$ss->{receive_queue}};
  0         0  
747             }
748              
749 0         0 my $body = $ss->{body};
750 0         0 $ss->{body} = '';
751             return {
752             type => 'http.request',
753             body => $body,
754 0 0       0 more => $ss->{body_complete} ? 0 : 1,
755             };
756 40         241 })->();
757              
758 40         1834 return $future;
759 42         315 };
760             }
761              
762             sub _h2_create_send {
763 42     42   101 my ($self, $stream_id, $stream_state) = @_;
764              
765 42         82 weaken(my $weak_self = $self);
766              
767 42         128 my $status;
768             my @response_headers;
769              
770             # Streaming state for deferred data provider pattern
771 42         0 my @data_queue;
772 42         58 my $eof_pending = 0;
773 42         54 my $streaming_started = 0;
774              
775             # Data callback for nghttp2's streaming response.
776             # Returns ($data, $eof) when data is available, or undef to defer.
777             my $data_callback = sub {
778 123     123   177 my ($cb_stream_id, $max_len) = @_;
779              
780 123 100       238 if (@data_queue) {
781 88         120 my $chunk = shift @data_queue;
782             # Respect max_len — XS truncates without preserving remainder
783 88 100       159 if (length($chunk) > $max_len) {
784 11         27 unshift @data_queue, substr($chunk, $max_len);
785 11         33 $chunk = substr($chunk, 0, $max_len);
786             }
787 88 100 100     213 my $eof = (!@data_queue && $eof_pending) ? 1 : 0;
788 88         554 return ($chunk, $eof);
789             }
790              
791             # Queue empty but EOF pending — signal end of stream
792 35 100       70 if ($eof_pending) {
793 2         14 return ('', 1);
794             }
795              
796             # Queue empty, more data expected — defer (NGHTTP2_ERR_DEFERRED in C layer)
797 33         139 return undef;
798 42         244 };
799              
800 149     149   5133 return async sub {
801 149         220 my ($event) = @_;
802 149 50       270 return unless $weak_self;
803 149 50       309 return if $weak_self->{closed};
804              
805 149   50     318 my $type = $event->{type} // '';
806              
807 149 100       368 if ($type eq 'http.response.start') {
    50          
808 39 50       128 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
809 39 50       108 return if $ss->{response_started};
810 39         76 $ss->{response_started} = 1;
811              
812 39   50     94 $status = $event->{status} // 200;
813             @response_headers = map {
814 33         186 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
815 39   50     63 } @{$event->{headers} // []};
  39         154  
816             }
817             elsif ($type eq 'http.response.body') {
818 110 50       301 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
819 110 50       243 return unless $ss->{response_started};
820              
821 110   50     228 my $body = $event->{body} // '';
822 110   100     289 my $more = $event->{more} // 0;
823              
824 110 100       205 if ($more) {
825 71 100       109 if (!$streaming_started) {
826             # First streaming chunk — submit with data callback
827 8         14 $streaming_started = 1;
828 8 50       38 push @data_queue, $body if length($body);
829             $weak_self->{h2_session}->submit_response_streaming(
830 8         63 $stream_id,
831             status => $status,
832             headers => \@response_headers,
833             data_callback => $data_callback,
834             );
835 8         336 $weak_self->_h2_write_pending;
836             } else {
837             # Subsequent chunk — backpressure check then push and resume
838 63 100       126 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
839 1         6 await $weak_self->_wait_for_drain;
840 1 50       180 return unless $weak_self;
841 1 50       6 return if $weak_self->{closed};
842 1 50       6 return unless $weak_self->{h2_streams}{$stream_id};
843             }
844 63 50       146 push @data_queue, $body if length($body);
845 63         208 $weak_self->{h2_session}->resume_stream($stream_id);
846 63         464 $weak_self->_h2_write_pending;
847             }
848             } else {
849 39 100       87 if ($streaming_started) {
850             # Final chunk on an already-streaming response
851 8 50       25 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
852 0         0 await $weak_self->_wait_for_drain;
853 0 0       0 return unless $weak_self;
854 0 0       0 return if $weak_self->{closed};
855 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
856             }
857 8         41 $eof_pending = 1;
858 8 100       25 push @data_queue, $body if length($body);
859 8         37 $weak_self->{h2_session}->resume_stream($stream_id);
860 8         202 $weak_self->_h2_write_pending;
861             } else {
862             # Non-streaming: single response (unchanged one-shot path)
863 31         239 $weak_self->{h2_session}->submit_response($stream_id,
864             status => $status,
865             headers => \@response_headers,
866             body => $body,
867             );
868 31         1261 $weak_self->_h2_write_pending;
869             }
870             }
871             }
872             else {
873 0         0 _unrecognized_event_type($type, 'http');
874             }
875 42         441 };
876             }
877              
878             # =============================================================================
879             # HTTP/2 WebSocket over HTTP/2 (RFC 8441)
880             # =============================================================================
881              
882             sub _h2_create_websocket_scope {
883 13     13   34 my ($self, $stream_id, $stream_state) = @_;
884              
885 13         46 my $pseudo = $stream_state->{pseudo};
886 13         29 my $headers = $stream_state->{headers};
887              
888 13   50     47 my $full_path = $pseudo->{':path'} // '/';
889 13         64 my ($path, $query_string) = split(/\?/, $full_path, 2);
890 13   50     84 $query_string //= '';
891              
892             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
893 13         92 my $unescaped = uri_unescape($path);
894 13   33     140 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  13         222  
895             // $unescaped;
896              
897             # Extract subprotocols from headers
898 13         821 my @subprotocols;
899 13         32 for my $header (@$headers) {
900 28         60 my ($name, $value) = @$header;
901 28 100       79 if ($name eq 'sec-websocket-protocol') {
902 1         9 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  2         12  
903             }
904             }
905              
906 13         146 my $connection_state = PAGI::Server::ConnectionState->new(
907             connection => $self,
908             );
909              
910             return {
911             type => 'websocket',
912             pagi => {
913             version => '0.2',
914             spec_version => '0.2',
915             features => {},
916             },
917             http_version => '2',
918             scheme => $self->_get_ws_scheme,
919             path => $decoded_path,
920             raw_path => $path,
921             query_string => $query_string,
922             root_path => '',
923             headers => $headers,
924             (defined $self->{client_host}
925             ? (client => [$self->{client_host}, $self->{client_port}])
926             : ()
927             ),
928             server => [$self->{server_host}, $self->{server_port}],
929             subprotocols => \@subprotocols,
930 13 50       109 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  13 50       76  
  0         0  
931             extensions => $self->_get_extensions_for_scope,
932             'pagi.connection' => $connection_state,
933             };
934             }
935              
936             sub _h2_create_websocket_receive {
937 13     13   38 my ($self, $stream_id, $stream_state) = @_;
938              
939 13         31 weaken(my $weak_self = $self);
940              
941             return sub {
942 28 50   28   1239 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
943             unless $weak_self;
944             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
945 28 50       72 if $weak_self->{closed};
946              
947 28         56 my $ss = $weak_self->{h2_streams}{$stream_id};
948 28 50       45 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
949             unless $ss;
950              
951 28         39 my $future = (async sub {
952 28 50       56 return { type => 'websocket.disconnect', code => 1006, reason => '' }
953             unless $weak_self;
954              
955 28         49 my $ss = $weak_self->{h2_streams}{$stream_id};
956 28 50       74 return { type => 'websocket.disconnect', code => 1006, reason => '' }
957             unless $ss;
958              
959             # Check queue first
960 28 50       106 if (@{$ss->{receive_queue}}) {
  28         71  
961 0         0 return shift @{$ss->{receive_queue}};
  0         0  
962             }
963              
964             # First call returns websocket.connect
965 28 100       76 if (!$ss->{ws_connect_sent}) {
966 12         29 $ss->{ws_connect_sent} = 1;
967 12         77 return { type => 'websocket.connect' };
968             }
969              
970             # Wait for events
971 16         41 while (1) {
972 38 100       55 if (@{$ss->{receive_queue}}) {
  38         96  
973 9         17 return shift @{$ss->{receive_queue}};
  9         48  
974             }
975              
976             return { type => 'websocket.disconnect', code => 1006, reason => '' }
977 29 100       140 if $weak_self->{closed};
978              
979 22 50       72 if (!$ss->{body_pending}) {
980 22         49 $ss->{body_pending} = Future->new;
981             }
982 22         156 await $ss->{body_pending};
983              
984 22         2250 $ss = $weak_self->{h2_streams}{$stream_id};
985 22 50       58 return { type => 'websocket.disconnect', code => 1006, reason => '' }
986             unless $ss;
987             }
988 28         164 })->();
989              
990 28         1333 return $future;
991 13         71 };
992             }
993              
994             sub _h2_create_websocket_send {
995 13     13   32 my ($self, $stream_id, $stream_state) = @_;
996              
997 13         28 weaken(my $weak_self = $self);
998              
999 19     19   504 return async sub {
1000 19         41 my ($event) = @_;
1001 19 50       74 return unless $weak_self;
1002 19 50       55 return if $weak_self->{closed};
1003              
1004 19         94 my $ss = $weak_self->{h2_streams}{$stream_id};
1005 19 50       38 return unless $ss;
1006              
1007 19   50     57 my $type = $event->{type} // '';
1008              
1009 19 100       73 if ($type eq 'websocket.accept') {
    100          
    50          
1010 12 50       49 return if $ss->{ws_accepted};
1011              
1012             # HTTP/2 WebSocket: respond with 200 (not 101)
1013 12         17 my @headers;
1014 12 50       34 if (my $subprotocol = $event->{subprotocol}) {
1015 0         0 $subprotocol = _validate_subprotocol($subprotocol);
1016 0         0 push @headers, ['sec-websocket-protocol', $subprotocol];
1017             }
1018 12 50       35 if (my $extra = $event->{headers}) {
1019             push @headers, map {
1020 0         0 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
  0         0  
1021             } @$extra;
1022             }
1023              
1024 12         25 $ss->{ws_accepted} = 1;
1025 12         27 $ss->{response_started} = 1;
1026             $ss->{ws_frame} = Protocol::WebSocket::Frame->new(
1027             max_payload_size => $weak_self->{max_ws_frame_size},
1028 12         150 );
1029              
1030             # Submit 200 response with streaming body that defers
1031             $weak_self->{h2_session}->submit_response($stream_id,
1032             status => 200,
1033             headers => \@headers,
1034 12         57 body => sub { return undef }, # defer until submit_data
1035 12         735 );
1036 12         410 $weak_self->_h2_write_pending;
1037              
1038             # Process any data that arrived before accept
1039 12 50       46 if (length($ss->{body}) > 0) {
1040 0         0 my $buffered = $ss->{body};
1041 0         0 $ss->{body} = '';
1042 0         0 $weak_self->_h2_process_ws_frames($stream_id, $ss, $buffered);
1043             }
1044             }
1045             elsif ($type eq 'websocket.send') {
1046 5 50       11 return unless $ss->{ws_accepted};
1047              
1048 5         120 my $frame;
1049 5 100       18 if (defined $event->{text}) {
    50          
1050             $frame = Protocol::WebSocket::Frame->new(
1051             buffer => $event->{text},
1052 4         15 type => 'text',
1053             );
1054             }
1055             elsif (defined $event->{bytes}) {
1056             $frame = Protocol::WebSocket::Frame->new(
1057             buffer => $event->{bytes},
1058 1         6 type => 'binary',
1059             );
1060             }
1061             else {
1062 0         0 return;
1063             }
1064              
1065 5         368 my $bytes = $frame->to_bytes;
1066 5         182 $weak_self->{h2_session}->submit_data($stream_id, $bytes, 0);
1067 5         16 $weak_self->_h2_write_pending;
1068             }
1069             elsif ($type eq 'websocket.close') {
1070 2 100       5 if (!$ss->{ws_accepted}) {
1071             # Reject: send 403
1072 1         8 $weak_self->{h2_session}->submit_response($stream_id,
1073             status => 403,
1074             headers => [['content-type', 'text/plain']],
1075             body => 'Forbidden',
1076             );
1077 1         38 $weak_self->_h2_write_pending;
1078 1         11 return;
1079             }
1080              
1081 1   50     3 my $code = $event->{code} // 1000;
1082 1   50     3 my $reason = $event->{reason} // '';
1083              
1084 1         8 my $frame = Protocol::WebSocket::Frame->new(
1085             type => 'close',
1086             buffer => pack('n', $code) . $reason,
1087             );
1088              
1089             # Send close frame + END_STREAM
1090 1         36 $weak_self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1091 1         4 $weak_self->_h2_write_pending;
1092             }
1093              
1094 18         248 return;
1095 13         109 };
1096             }
1097              
1098             # =============================================================================
1099             # HTTP/2 SSE (Server-Sent Events over HTTP/2)
1100             # =============================================================================
1101              
1102             sub _h2_create_sse_scope {
1103 9     9   28 my ($self, $stream_id, $stream_state) = @_;
1104              
1105 9         24 my $pseudo = $stream_state->{pseudo};
1106 9         22 my $headers = $stream_state->{headers};
1107              
1108 9   50     38 my $full_path = $pseudo->{':path'} // '/';
1109 9         51 my ($path, $query_string) = split(/\?/, $full_path, 2);
1110 9   50     55 $query_string //= '';
1111              
1112             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
1113 9         64 my $unescaped = uri_unescape($path);
1114 9   33     96 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  9         179  
1115             // $unescaped;
1116              
1117 9         748 my $connection_state = PAGI::Server::ConnectionState->new(
1118             connection => $self,
1119             );
1120              
1121             return {
1122             type => 'sse',
1123             pagi => {
1124             version => '0.2',
1125             spec_version => '0.2',
1126             features => {},
1127             },
1128             http_version => '2',
1129             method => $pseudo->{':method'} // 'GET',
1130             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
1131             path => $decoded_path,
1132             raw_path => $path,
1133             query_string => $query_string,
1134             root_path => '',
1135             headers => $headers,
1136             (defined $self->{client_host}
1137             ? (client => [$self->{client_host}, $self->{client_port}])
1138             : ()
1139             ),
1140             server => [$self->{server_host}, $self->{server_port}],
1141 9 50 50     168 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  9 50 33     133  
  0         0  
1142             extensions => $self->_get_extensions_for_scope,
1143             'pagi.connection' => $connection_state,
1144             };
1145             }
1146              
1147             sub _h2_create_sse_receive {
1148 9     9   25 my ($self, $stream_id, $stream_state) = @_;
1149              
1150 9         23 weaken(my $weak_self = $self);
1151              
1152             my $sse_disconnect = sub {
1153             return {
1154 2     2   18 type => 'sse.disconnect',
1155             reason => 'client_closed',
1156             };
1157 9         44 };
1158              
1159             return sub {
1160 10 50   10   186 return Future->done($sse_disconnect->()) unless $weak_self;
1161 10 50       51 return Future->done($sse_disconnect->()) if $weak_self->{closed};
1162              
1163 10         24 my $ss = $weak_self->{h2_streams}{$stream_id};
1164 10 50       42 return Future->done($sse_disconnect->()) unless $ss;
1165              
1166 10         42 my $future = (async sub {
1167 10 50       25 return $sse_disconnect->() unless $weak_self;
1168              
1169 10         19 my $ss = $weak_self->{h2_streams}{$stream_id};
1170 10 50       32 return $sse_disconnect->() unless $ss;
1171              
1172             # Check queue first
1173 10 50       20 if (@{$ss->{receive_queue}}) {
  10         25  
1174 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1175             }
1176              
1177             # First call returns sse.request with body
1178 10 100       33 if (!$ss->{sse_request_sent}) {
1179 8         19 $ss->{sse_request_sent} = 1;
1180             return {
1181             type => 'sse.request',
1182             body => $ss->{body},
1183 8         171 more => 0,
1184             };
1185             }
1186              
1187             # Wait for disconnect
1188 2         3 while (1) {
1189 4 50       6 if (@{$ss->{receive_queue}}) {
  4         10  
1190 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1191             }
1192              
1193             return $sse_disconnect->()
1194 4 100       45 if $weak_self->{closed};
1195              
1196 2 50       5 if (!$ss->{body_pending}) {
1197 2         4 $ss->{body_pending} = Future->new;
1198             }
1199 2         14 await $ss->{body_pending};
1200              
1201 2         254 $ss = $weak_self->{h2_streams}{$stream_id};
1202 2 50       6 return $sse_disconnect->() unless $ss;
1203             }
1204 10         56 })->();
1205              
1206 10         632 return $future;
1207 9         56 };
1208             }
1209              
1210             sub _h2_create_sse_send {
1211 9     9   25 my ($self, $stream_id, $stream_state) = @_;
1212              
1213 9         27 weaken(my $weak_self = $self);
1214              
1215             # Streaming state for data provider pattern
1216 9         16 my @data_queue;
1217 9         16 my $streaming_started = 0;
1218              
1219             my $data_callback = sub {
1220 59     59   117 my ($cb_stream_id, $max_len) = @_;
1221              
1222 59 100       125 if (@data_queue) {
1223 25         42 my $chunk = shift @data_queue;
1224 25 50       72 if (length($chunk) > $max_len) {
1225 0         0 unshift @data_queue, substr($chunk, $max_len);
1226 0         0 $chunk = substr($chunk, 0, $max_len);
1227             }
1228 25         129 return ($chunk, 0); # SSE streams never EOF via data_callback
1229             }
1230              
1231             # Queue empty — defer
1232 34         118 return undef;
1233 9         50 };
1234              
1235 30     30   102594 return async sub {
1236 30         67 my ($event) = @_;
1237 30 50       112 return unless $weak_self;
1238 30 50       72 return if $weak_self->{closed};
1239              
1240 30         58 my $ss = $weak_self->{h2_streams}{$stream_id};
1241 30 50       55 return unless $ss;
1242              
1243             # Reset SSE idle timer on send activity
1244 30         167 $weak_self->_reset_sse_idle_timer;
1245              
1246 30   50     88 my $type = $event->{type} // '';
1247              
1248             # Dev-mode event validation (PAGI spec compliance)
1249 30 50       68 if ($weak_self->{validate_events}) {
1250 0         0 require PAGI::Server::EventValidator;
1251 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
1252             }
1253              
1254 30 100       99 if ($type eq 'sse.start') {
    100          
    100          
    50          
1255 9 50       27 return if $ss->{response_started};
1256 9         18 $ss->{response_started} = 1;
1257              
1258 9   50     27 my $status = $event->{status} // 200;
1259 9   100     47 my $headers = $event->{headers} // [];
1260              
1261             # Ensure Content-Type is text/event-stream
1262 9         19 my $has_content_type = 0;
1263 9         26 for my $h (@$headers) {
1264 2 100       10 if (lc($h->[0]) eq 'content-type') {
1265 1         3 $has_content_type = 1;
1266 1         4 last;
1267             }
1268             }
1269              
1270 9         17 my @final_headers;
1271 9         19 for my $h (@$headers) {
1272 2         10 push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
1273             }
1274 9 100       36 if (!$has_content_type) {
1275 8         32 push @final_headers, ['content-type', 'text/event-stream'];
1276             }
1277 9         25 push @final_headers, ['cache-control', 'no-cache'];
1278              
1279 9         17 $streaming_started = 1;
1280             $weak_self->{h2_session}->submit_response_streaming(
1281 9         77 $stream_id,
1282             status => $status,
1283             headers => \@final_headers,
1284             data_callback => $data_callback,
1285             );
1286 9         356 $weak_self->_h2_write_pending;
1287              
1288             # Set protocol-specific keepalive writer (HTTP/2 DATA frames)
1289             $weak_self->{sse_keepalive_writer} = sub {
1290 6         19 my ($text) = @_;
1291 6 50       25 return unless $weak_self;
1292 6 50       21 return if $weak_self->{closed};
1293 6 50       39 return unless $weak_self->{h2_streams}{$stream_id};
1294 6         18 push @data_queue, $text;
1295 6         69 $weak_self->{h2_session}->resume_stream($stream_id);
1296 6         141 $weak_self->_h2_write_pending;
1297 9         59 };
1298              
1299             # Start SSE idle timer if configured
1300 9         42 $weak_self->_start_sse_idle_timer;
1301             }
1302             elsif ($type eq 'sse.send') {
1303 18 50       55 return unless $ss->{response_started};
1304              
1305             # Backpressure check
1306 18 50       54 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
1307 0         0 await $weak_self->_wait_for_drain;
1308 0 0       0 return Future->done unless $weak_self;
1309 0 0       0 return Future->done if $weak_self->{closed};
1310 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
1311             }
1312              
1313 18         59 my $sse_data = _format_sse_event($event);
1314 18         38 push @data_queue, $sse_data;
1315 18         167 $weak_self->{h2_session}->resume_stream($stream_id);
1316 18         178 $weak_self->_h2_write_pending;
1317             }
1318             elsif ($type eq 'sse.comment') {
1319 1 50       3 return unless $ss->{response_started};
1320              
1321 1         3 my $comment = _format_sse_comment($event);
1322 1         3 push @data_queue, $comment;
1323 1         3 $weak_self->{h2_session}->resume_stream($stream_id);
1324 1         7 $weak_self->_h2_write_pending;
1325             }
1326             elsif ($type eq 'sse.keepalive') {
1327 2   50     8 my $interval = $event->{interval} // 0;
1328 2         20 my $comment = $event->{comment};
1329              
1330 2 50       8 if ($interval > 0) {
1331 2         35 $weak_self->_start_sse_keepalive($interval, $comment);
1332             }
1333             else {
1334 0         0 $weak_self->_stop_sse_keepalive;
1335             }
1336             }
1337             else {
1338 0         0 _unrecognized_event_type($type, 'sse');
1339             }
1340              
1341 30         461 return;
1342 9         98 };
1343             }
1344              
1345             sub _h2_process_ws_frames {
1346 16     16   30 my ($self, $stream_id, $stream, $data) = @_;
1347              
1348 16         38 my $frame = $stream->{ws_frame};
1349 16 50       35 return unless $frame;
1350              
1351 16         65 $frame->append($data);
1352              
1353 16         217 while (defined(my $bytes = $frame->next_bytes)) {
1354 10         1085 my $opcode = $frame->opcode;
1355              
1356 10 100       77 if ($opcode == 1) {
    100          
    50          
    0          
1357             # Text frame
1358 5         10 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  5         73  
1359 5 100       323 unless (defined $text) {
1360 1         5 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8');
1361 1         3 return;
1362             }
1363 4         7 push @{$stream->{receive_queue}}, {
  4         24  
1364             type => 'websocket.receive',
1365             text => $text,
1366             };
1367             }
1368             elsif ($opcode == 2) {
1369             # Binary frame
1370 1         1 push @{$stream->{receive_queue}}, {
  1         7  
1371             type => 'websocket.receive',
1372             bytes => $bytes,
1373             };
1374             }
1375             elsif ($opcode == 8) {
1376             # Close frame
1377 4         13 my ($code, $reason) = (1005, '');
1378              
1379             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
1380 4 100       18 if (length($bytes) == 1) {
1381 1         8 $self->_h2_ws_close($stream_id, 1002, 'Invalid close frame');
1382 1         1 push @{$stream->{receive_queue}}, {
  1         6  
1383             type => 'websocket.disconnect',
1384             code => 1002,
1385             reason => 'Invalid close frame',
1386             };
1387 1         4 $self->_h2_wake_pending($stream);
1388 1         159 return;
1389             }
1390              
1391 3 50       14 if (length($bytes) >= 2) {
1392 3         13 $code = unpack('n', substr($bytes, 0, 2));
1393 3   50     52 $reason = substr($bytes, 2) // '';
1394              
1395             # RFC 6455 Section 7.4.1: Validate close code
1396 3         7 my $valid_code = 0;
1397 3 100 66     52 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    50 66        
    50 33        
      33        
      33        
1398 2         5 $valid_code = 1;
1399             }
1400             elsif ($code >= 1007 && $code <= 1011) {
1401 0         0 $valid_code = 1;
1402             }
1403             elsif ($code >= 3000 && $code <= 4999) {
1404 0         0 $valid_code = 1;
1405             }
1406 3 100       10 unless ($valid_code) {
1407 1         4 $self->_h2_ws_close($stream_id, 1002, 'Invalid close code');
1408 1         1 push @{$stream->{receive_queue}}, {
  1         5  
1409             type => 'websocket.disconnect',
1410             code => 1002,
1411             reason => 'Invalid close code',
1412             };
1413 1         5 $self->_h2_wake_pending($stream);
1414 1         104 return;
1415             }
1416              
1417             # RFC 6455: Close reason must be valid UTF-8
1418 2 50       10 if (length($reason) > 0) {
1419 2         5 my $reason_copy = $reason;
1420 2         5 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  2         23  
1421 2 100       149 unless (defined $decoded) {
1422 1         32 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8 in close reason');
1423 1         1 push @{$stream->{receive_queue}}, {
  1         7  
1424             type => 'websocket.disconnect',
1425             code => 1007,
1426             reason => 'Invalid UTF-8 in close reason',
1427             };
1428 1         6 $self->_h2_wake_pending($stream);
1429 1         132 return;
1430             }
1431             }
1432             }
1433              
1434             # Send close frame back + END_STREAM
1435 1         8 my $close_frame = Protocol::WebSocket::Frame->new(
1436             type => 'close',
1437             buffer => pack('n', $code) . $reason,
1438             );
1439 1         37 $self->{h2_session}->submit_data($stream_id, $close_frame->to_bytes, 1);
1440             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1441              
1442 1         2 push @{$stream->{receive_queue}}, {
  1         9  
1443             type => 'websocket.disconnect',
1444             code => $code,
1445             reason => $reason,
1446             };
1447             }
1448             elsif ($opcode == 9) {
1449             # Ping — respond with pong
1450 0         0 my $pong = Protocol::WebSocket::Frame->new(
1451             type => 'pong',
1452             buffer => $bytes,
1453             );
1454 0         0 $self->{h2_session}->submit_data($stream_id, $pong->to_bytes, 0);
1455             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1456             }
1457             # Opcode 10 (pong) — ignore
1458             }
1459              
1460 12         437 $self->_h2_wake_pending($stream);
1461             }
1462              
1463             sub _h2_ws_close {
1464 4     4   13 my ($self, $stream_id, $code, $reason) = @_;
1465              
1466 4   50     36 my $frame = Protocol::WebSocket::Frame->new(
1467             type => 'close',
1468             buffer => pack('n', $code) . ($reason // ''),
1469             );
1470 4         153 $self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1471             # No _h2_write_pending — called from inside feed(); flushed by _h2_process_data
1472             }
1473              
1474             # Request stall timeout - closes connection if no I/O activity during request processing
1475             sub _start_stall_timer {
1476 239     239   444 my ($self) = @_;
1477              
1478 239 50 33     887 return unless $self->{request_timeout} && $self->{request_timeout} > 0;
1479 0 0       0 return unless $self->{server};
1480 0 0       0 return if $self->{stall_timer}; # Already running
1481              
1482 0         0 weaken(my $weak_self = $self);
1483              
1484             my $timer = IO::Async::Timer::Countdown->new(
1485             delay => $self->{request_timeout},
1486             on_expire => sub {
1487 0 0   0   0 return unless $weak_self;
1488 0 0       0 return if $weak_self->{closed};
1489             # Log the timeout
1490 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1491 0         0 $weak_self->{server}->_log(warn =>
1492             "Request stall timeout ($weak_self->{request_timeout}s) - closing connection");
1493             }
1494 0         0 $weak_self->_handle_disconnect_and_close('client_timeout');
1495             },
1496 0         0 );
1497 0         0 $self->{stall_timer} = $timer;
1498 0         0 $self->{server}->add_child($timer);
1499 0         0 $timer->start;
1500             }
1501              
1502             sub _reset_stall_timer {
1503 525     525   773 my ($self) = @_;
1504              
1505 525 50       1279 return unless $self->{stall_timer};
1506 0         0 $self->{stall_timer}->reset;
1507 0 0       0 $self->{stall_timer}->start unless $self->{stall_timer}->is_running;
1508             }
1509              
1510             sub _stop_stall_timer {
1511 545     545   1005 my ($self) = @_;
1512              
1513 545 50       1478 return unless $self->{stall_timer};
1514 0 0       0 $self->{stall_timer}->stop if $self->{stall_timer}->is_running;
1515 0 0       0 if ($self->{server}) {
1516 0         0 $self->{server}->remove_child($self->{stall_timer});
1517             }
1518 0         0 $self->{stall_timer} = undef;
1519             }
1520              
1521             # WebSocket idle timeout - closes connection if no activity
1522             sub _start_ws_idle_timer {
1523 21     21   44 my ($self) = @_;
1524              
1525 21 50 33     72 return unless $self->{ws_idle_timeout} && $self->{ws_idle_timeout} > 0;
1526 0 0       0 return unless $self->{server};
1527 0 0       0 return if $self->{ws_idle_timer};
1528              
1529 0         0 weaken(my $weak_self = $self);
1530              
1531             my $timer = IO::Async::Timer::Countdown->new(
1532             delay => $self->{ws_idle_timeout},
1533             on_expire => sub {
1534 0 0   0   0 return unless $weak_self;
1535 0 0       0 return if $weak_self->{closed};
1536 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1537 0         0 $weak_self->{server}->_log(warn =>
1538             "WebSocket idle timeout ($weak_self->{ws_idle_timeout}s) - closing connection");
1539             }
1540 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1541             },
1542 0         0 );
1543 0         0 $self->{ws_idle_timer} = $timer;
1544 0         0 $self->{server}->add_child($timer);
1545 0         0 $timer->start;
1546             }
1547              
1548             sub _reset_ws_idle_timer {
1549 82     82   110 my ($self) = @_;
1550              
1551 82 50       162 return unless $self->{ws_idle_timer};
1552 0         0 $self->{ws_idle_timer}->reset;
1553 0 0       0 $self->{ws_idle_timer}->start unless $self->{ws_idle_timer}->is_running;
1554             }
1555              
1556             sub _stop_ws_idle_timer {
1557 322     322   637 my ($self) = @_;
1558              
1559 322 50       936 return unless $self->{ws_idle_timer};
1560 0 0       0 $self->{ws_idle_timer}->stop if $self->{ws_idle_timer}->is_running;
1561 0 0       0 if ($self->{server}) {
1562 0         0 $self->{server}->remove_child($self->{ws_idle_timer});
1563             }
1564 0         0 $self->{ws_idle_timer} = undef;
1565             }
1566              
1567             # SSE idle timeout - closes connection if no activity
1568             sub _start_sse_idle_timer {
1569 23     23   46 my ($self) = @_;
1570              
1571 23 50 33     129 return unless $self->{sse_idle_timeout} && $self->{sse_idle_timeout} > 0;
1572 0 0       0 return unless $self->{server};
1573 0 0       0 return if $self->{sse_idle_timer};
1574              
1575 0         0 weaken(my $weak_self = $self);
1576              
1577             my $timer = IO::Async::Timer::Countdown->new(
1578             delay => $self->{sse_idle_timeout},
1579             on_expire => sub {
1580 0 0   0   0 return unless $weak_self;
1581 0 0       0 return if $weak_self->{closed};
1582 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1583 0         0 $weak_self->{server}->_log(warn =>
1584             "SSE idle timeout ($weak_self->{sse_idle_timeout}s) - closing connection");
1585             }
1586 0         0 $weak_self->{sse_disconnect_reason} = 'idle_timeout';
1587 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1588             },
1589 0         0 );
1590 0         0 $self->{sse_idle_timer} = $timer;
1591 0         0 $self->{server}->add_child($timer);
1592 0         0 $timer->start;
1593             }
1594              
1595             sub _reset_sse_idle_timer {
1596 62     62   97 my ($self) = @_;
1597              
1598 62 50       160 return unless $self->{sse_idle_timer};
1599 0         0 $self->{sse_idle_timer}->reset;
1600 0 0       0 $self->{sse_idle_timer}->start unless $self->{sse_idle_timer}->is_running;
1601             }
1602              
1603             sub _stop_sse_idle_timer {
1604 322     322   644 my ($self) = @_;
1605              
1606 322 50       817 return unless $self->{sse_idle_timer};
1607 0 0       0 $self->{sse_idle_timer}->stop if $self->{sse_idle_timer}->is_running;
1608 0 0       0 if ($self->{server}) {
1609 0         0 $self->{server}->remove_child($self->{sse_idle_timer});
1610             }
1611 0         0 $self->{sse_idle_timer} = undef;
1612             }
1613              
1614             # ============================================================================
1615             # Send-side backpressure support
1616             # ============================================================================
1617             #
1618             # Prevents unbounded memory growth when apps send faster than slow clients
1619             # can receive. Uses watermark-based flow control:
1620             # - High watermark (default 1MB): pause sending when buffer exceeds this
1621             # - Low watermark (default 256KB): resume sending when buffer drops below this
1622             #
1623             # The $send->() Future will block (await) when high watermark is exceeded,
1624             # and resolve when buffer drains below low watermark.
1625              
1626             sub _get_write_buffer_size {
1627 361     361   589 my ($self) = @_;
1628              
1629 361 50       881 return 0 unless $self->{stream};
1630              
1631             # Access IO::Async::Stream's internal write queue
1632             # IO::Async doesn't expose a public API for buffer size, so we access internals
1633 361   50     900 my $queue = $self->{stream}{writequeue} // [];
1634 361         545 my $total = 0;
1635              
1636 361         858 for my $writer (@$queue) {
1637 586         11412 my $data = $writer->data;
1638 586 50 33     4117 if (defined $data && !ref $data) {
1639 586         1066 $total += length($data);
1640             }
1641             }
1642              
1643 361         1179 return $total;
1644             }
1645              
1646             sub _check_drain_waiters {
1647 2     2   3 my ($self) = @_;
1648              
1649 2 100       2 return unless @{$self->{_drain_waiters}};
  2         6  
1650 1 50       4 return unless $self->{stream};
1651              
1652 1         3 my $buffered = $self->_get_write_buffer_size;
1653              
1654             # Resolve all waiters if we've drained below low watermark
1655 1 50       3 if ($buffered < $self->{write_low_watermark}) {
1656 1         2 my @waiters = splice @{$self->{_drain_waiters}};
  1         3  
1657 1         2 for my $f (@waiters) {
1658 1 50       3 $f->done unless $f->is_ready;
1659             }
1660             # Disable drain checking until next high watermark hit
1661 1         123 $self->{_drain_check_active} = 0;
1662             }
1663             }
1664              
1665             sub _setup_drain_detection {
1666 1     1   3 my ($self) = @_;
1667              
1668             # Avoid redundant setup
1669 1 50       5 return if $self->{_drain_check_active};
1670 1         3 $self->{_drain_check_active} = 1;
1671              
1672 1         4 weaken(my $weak_self = $self);
1673              
1674             # Primary mechanism: check when write queue empties
1675             # This guarantees we notice drain even for fast-draining connections
1676             # Store previous handler to chain if needed
1677 1         3 my $prev_on_empty = $self->{_prev_on_outgoing_empty};
1678              
1679             $self->{stream}->configure(
1680             on_outgoing_empty => sub {
1681 2 50   2   1837 return unless $weak_self;
1682 2         8 $weak_self->_check_drain_waiters;
1683             # Call previous handler if any
1684 2 50       8 $prev_on_empty->(@_) if $prev_on_empty;
1685             },
1686 1         10 );
1687             }
1688              
1689             sub _wait_for_drain {
1690 1     1   3 my ($self) = @_;
1691              
1692             # Fast path: already below low watermark
1693 1         4 my $buffered = $self->_get_write_buffer_size;
1694 1 50       6 if ($buffered < $self->{write_low_watermark}) {
1695 0         0 return Future->done;
1696             }
1697              
1698             # Create Future to be resolved when drained
1699 1         6 my $f = $self->{server}->loop->new_future;
1700 1         2097 push @{$self->{_drain_waiters}}, $f;
  1         5  
1701              
1702             # Ensure drain detection is active
1703 1         6 $self->_setup_drain_detection;
1704              
1705 1         136 return $f;
1706             }
1707              
1708             sub _cancel_drain_waiters {
1709 644     644   1378 my ($self, $reason) = @_;
1710 644   100     1448 $reason //= 'connection closed';
1711              
1712 644         896 my @waiters = splice @{$self->{_drain_waiters}};
  644         1583  
1713 644         1215 for my $f (@waiters) {
1714             # Resolve (not fail) - app should check connection state after await
1715 0 0       0 $f->done unless $f->is_ready;
1716             }
1717 644         1450 $self->{_drain_check_active} = 0;
1718             }
1719              
1720             # ============================================================================
1721              
1722             # WebSocket keepalive - sends protocol-level ping frames (RFC 6455)
1723             sub _start_ws_keepalive {
1724 0     0   0 my ($self, $interval, $timeout) = @_;
1725              
1726             # Stop existing timers first
1727 0         0 $self->_stop_ws_keepalive;
1728              
1729 0 0 0     0 return unless $interval && $interval > 0;
1730 0 0       0 return unless $self->{server};
1731              
1732 0         0 $self->{ws_keepalive_interval} = $interval;
1733 0   0     0 $self->{ws_keepalive_timeout} = $timeout // 0;
1734              
1735 0         0 weaken(my $weak_self = $self);
1736              
1737             my $timer = IO::Async::Timer::Periodic->new(
1738             interval => $interval,
1739             on_tick => sub {
1740 0 0   0   0 return unless $weak_self;
1741 0 0       0 return if $weak_self->{closed};
1742 0 0       0 return unless $weak_self->{websocket_mode};
1743              
1744             # Send ping frame
1745 0         0 my $ping = Protocol::WebSocket::Frame->new(
1746             type => 'ping',
1747             buffer => '',
1748             );
1749 0         0 $weak_self->{stream}->write($ping->to_bytes);
1750              
1751             # Start pong timeout if configured
1752 0 0       0 if ($weak_self->{ws_keepalive_timeout} > 0) {
1753 0         0 $weak_self->{ws_waiting_pong} = 1;
1754 0         0 $weak_self->_start_ws_pong_timeout;
1755             }
1756             },
1757 0         0 );
1758              
1759 0         0 $self->{ws_keepalive_timer} = $timer;
1760 0         0 $self->{server}->add_child($timer);
1761 0         0 $timer->start;
1762             }
1763              
1764             sub _start_ws_pong_timeout {
1765 0     0   0 my ($self) = @_;
1766              
1767             # Don't start another timeout if one is running
1768 0 0       0 return if $self->{ws_pong_timeout};
1769 0 0       0 return unless $self->{ws_keepalive_timeout} > 0;
1770 0 0       0 return unless $self->{server};
1771              
1772 0         0 weaken(my $weak_self = $self);
1773              
1774             my $timer = IO::Async::Timer::Countdown->new(
1775             delay => $self->{ws_keepalive_timeout},
1776             on_expire => sub {
1777 0 0   0   0 return unless $weak_self;
1778 0 0       0 return if $weak_self->{closed};
1779              
1780 0 0       0 if ($weak_self->{ws_waiting_pong}) {
1781             # No pong received within timeout - close connection
1782 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1783 0         0 $weak_self->{server}->_log(warn =>
1784             "WebSocket keepalive timeout - no pong received within $weak_self->{ws_keepalive_timeout}s");
1785             }
1786 0         0 $weak_self->_handle_disconnect_and_close('keepalive_timeout');
1787             }
1788             },
1789 0         0 );
1790              
1791 0         0 $self->{ws_pong_timeout} = $timer;
1792 0         0 $self->{server}->add_child($timer);
1793 0         0 $timer->start;
1794             }
1795              
1796             sub _cancel_ws_pong_timeout {
1797 322     322   674 my ($self) = @_;
1798              
1799 322         770 $self->{ws_waiting_pong} = 0;
1800              
1801 322 50       954 return unless $self->{ws_pong_timeout};
1802 0 0       0 $self->{ws_pong_timeout}->stop if $self->{ws_pong_timeout}->is_running;
1803 0 0       0 if ($self->{server}) {
1804 0         0 $self->{server}->remove_child($self->{ws_pong_timeout});
1805             }
1806 0         0 $self->{ws_pong_timeout} = undef;
1807             }
1808              
1809             sub _stop_ws_keepalive {
1810 322     322   681 my ($self) = @_;
1811              
1812             # Stop pong timeout first
1813 322         1498 $self->_cancel_ws_pong_timeout;
1814              
1815 322 50       898 return unless $self->{ws_keepalive_timer};
1816 0 0       0 $self->{ws_keepalive_timer}->stop if $self->{ws_keepalive_timer}->is_running;
1817 0 0       0 if ($self->{server}) {
1818 0         0 $self->{server}->remove_child($self->{ws_keepalive_timer});
1819             }
1820 0         0 $self->{ws_keepalive_timer} = undef;
1821 0         0 $self->{ws_keepalive_interval} = 0;
1822 0         0 $self->{ws_keepalive_timeout} = 0;
1823             }
1824              
1825             # SSE keepalive - sends comment lines to prevent proxy timeouts
1826             sub _start_sse_keepalive {
1827 2     2   8 my ($self, $interval, $comment) = @_;
1828              
1829             # Stop existing timer first
1830 2         11 $self->_stop_sse_keepalive;
1831              
1832 2 50 33     12 return unless $interval && $interval > 0;
1833 2 50       8 return unless $self->{server};
1834              
1835 2   50     7 $self->{sse_keepalive_comment} = $comment // '';
1836              
1837 2         5 weaken(my $weak_self = $self);
1838              
1839             my $timer = IO::Async::Timer::Periodic->new(
1840             interval => $interval,
1841             on_tick => sub {
1842 6 50   6   894849 return unless $weak_self;
1843 6 50       45 return if $weak_self->{closed};
1844              
1845 6         30 my $text = $weak_self->{sse_keepalive_comment};
1846 6 50       55 $text = ":$text" unless $text =~ /^:/;
1847 6         20 my $formatted = "$text\n\n";
1848              
1849 6 50       31 if (my $writer = $weak_self->{sse_keepalive_writer}) {
1850 6         26 $writer->($formatted);
1851             }
1852             },
1853 2         50 );
1854              
1855 2         211 $self->{sse_keepalive_timer} = $timer;
1856 2         12 $self->{server}->add_child($timer);
1857 2         239 $timer->start;
1858             }
1859              
1860             sub _stop_sse_keepalive {
1861 324     324   687 my ($self) = @_;
1862              
1863 324 100       809 return unless $self->{sse_keepalive_timer};
1864 2 50       10 $self->{sse_keepalive_timer}->stop if $self->{sse_keepalive_timer}->is_running;
1865 2 50       103 if ($self->{server}) {
1866 2         7 $self->{server}->remove_child($self->{sse_keepalive_timer});
1867             }
1868 2         159 $self->{sse_keepalive_timer} = undef;
1869 2         6 $self->{sse_keepalive_comment} = '';
1870             }
1871              
1872             sub _try_handle_request {
1873 293     293   552 my ($self) = @_;
1874              
1875 293 50       994 return if $self->{closed};
1876 293 100       958 return if $self->{handling_request};
1877              
1878             # Try to parse a request from the buffer
1879 282         2069 my ($request, $consumed) = $self->{protocol}->parse_request($self->{buffer});
1880              
1881 282 100       761 return unless $request;
1882              
1883             # Remove consumed bytes from buffer
1884 280         914 substr($self->{buffer}, 0, $consumed) = '';
1885              
1886             # Handle parse errors (malformed request, header too large)
1887 280 100       760 if ($request->{error}) {
1888             # Mark connection as disconnected with protocol_error reason (PAGI spec compliance)
1889 5         20 $self->_handle_disconnect('protocol_error');
1890 5         111 $self->_send_error_response($request->{error}, $request->{message});
1891 5         23 $self->_close;
1892 5         107 return;
1893             }
1894              
1895             # Check Content-Length against max_body_size limit (0 = unlimited)
1896 275 100 100     1398 if ($self->{max_body_size} && defined $request->{content_length}) {
1897 8 100       27 if ($request->{content_length} > $self->{max_body_size}) {
1898 1         5 $self->_handle_disconnect('body_too_large');
1899 1         26 $self->_send_error_response(413, 'Payload Too Large');
1900 1         5 $self->_close;
1901 1         23 return;
1902             }
1903             }
1904              
1905             # Check if this is a WebSocket upgrade request
1906 274         1048 my $is_websocket = $self->_is_websocket_upgrade($request);
1907              
1908             # Check if this is an SSE request
1909 274   100     1222 my $is_sse = !$is_websocket && $self->_is_sse_request($request);
1910              
1911             # Handle the request - store the Future to prevent "lost future" warning
1912 274         664 $self->{handling_request} = 1;
1913 274         1133 $self->{request_start} = [gettimeofday];
1914 274         646 $self->{current_request} = $request; # Store for access logging
1915              
1916 274 100       691 if ($is_websocket) {
    100          
1917 21         120 $self->{request_future} = $self->_handle_websocket_request($request);
1918             } elsif ($is_sse) {
1919 14         49 $self->{request_future} = $self->_handle_sse_request($request);
1920             } else {
1921             # Start stall timer for HTTP requests (WebSocket/SSE have their own handling)
1922 239         1056 $self->_start_stall_timer;
1923 239         1156 $self->{request_future} = $self->_handle_request($request);
1924             }
1925              
1926             # Use adopt_future for proper error tracking instead of retain
1927             # This ensures errors are propagated to the server's error handling
1928 274         18030 $self->{server}->adopt_future($self->{request_future});
1929             }
1930              
1931             sub _is_websocket_upgrade {
1932 274     274   647 my ($self, $request) = @_;
1933              
1934             # Check for WebSocket upgrade headers
1935 274         414 my $has_upgrade = 0;
1936 274         349 my $has_connection_upgrade = 0;
1937 274         522 my $has_ws_key = 0;
1938              
1939 274         638 for my $header (@{$request->{headers}}) {
  274         1108  
1940 850         1436 my ($name, $value) = @$header;
1941 850 100 66     2795 if ($name eq 'upgrade' && lc($value) eq 'websocket') {
    100          
    100          
1942 21         34 $has_upgrade = 1;
1943             }
1944             elsif ($name eq 'connection') {
1945             # Connection header can have multiple values
1946 258 100       1296 $has_connection_upgrade = 1 if lc($value) =~ /upgrade/;
1947             }
1948             elsif ($name eq 'sec-websocket-key') {
1949 21         39 $has_ws_key = 1;
1950             }
1951             }
1952              
1953 274   66     970 return $has_upgrade && $has_connection_upgrade && $has_ws_key;
1954             }
1955              
1956             sub _is_sse_request {
1957 253     253   499 my ($self, $request) = @_;
1958              
1959             # SSE detection per spec:
1960             # - Accept header includes text/event-stream
1961             # - Request has not been upgraded to WebSocket (already checked)
1962             # Note: SSE works with any HTTP method (GET, POST, etc.) to support
1963             # modern patterns like htmx 4 and datastar using fetch-event-source
1964              
1965 253         362 for my $header (@{$request->{headers}}) {
  253         715  
1966 723         1071 my ($name, $value) = @$header;
1967 723 100       1343 if ($name eq 'accept') {
1968             # Check if Accept header includes text/event-stream
1969 15 100       80 return 1 if $value =~ m{text/event-stream};
1970             }
1971             }
1972              
1973 239         674 return 0;
1974             }
1975              
1976 239     239   435 async sub _handle_request {
1977 239         535 my ($self, $request) = @_;
1978              
1979 239         948 my $scope = $self->_create_scope($request);
1980 239         867 my $receive = $self->_create_receive($request);
1981 239         899 my $send = $self->_create_send($request);
1982              
1983 239         428 eval {
1984 239         1206 await $self->{app}->($scope, $receive, $send);
1985             };
1986              
1987 238 100       17012 if (my $error = $@) {
1988             # Handle application error - always close connection after exception
1989             # If response already started, we can't send error page (3.17)
1990 15 100       55 if ($self->{response_started}) {
1991 2         52 warn "PAGI application error (after response started): $error\n";
1992             } else {
1993 13         65 $self->_send_error_response(500, "Internal Server Error");
1994 13         988 warn "PAGI application error: $error\n";
1995             }
1996             # Write access log before closing
1997 15         96 $self->_write_access_log;
1998             # Notify server that request completed (for max_requests tracking)
1999 15 50       168 $self->{server}->_on_request_complete if $self->{server};
2000             # Always close connection after exception (3.2) - don't try keep-alive
2001 15         59 $self->_handle_disconnect('server_error');
2002 15         60 $self->_close;
2003 15         1131 return;
2004             }
2005              
2006             # Write access log entry
2007 223         1028 $self->_write_access_log;
2008              
2009             # Notify server that request completed (for max_requests tracking)
2010 223 50       2001 $self->{server}->_on_request_complete if $self->{server};
2011              
2012             # Stop stall timer - request completed successfully
2013 223         850 $self->_stop_stall_timer;
2014              
2015             # Determine if we should keep the connection alive
2016 223         709 my $keep_alive = $self->_should_keep_alive($request);
2017              
2018 223 100       457 if ($keep_alive) {
2019             # Reset for next request
2020 208         394 $self->{handling_request} = 0;
2021 208         336 $self->{response_started} = 0;
2022 208         377 $self->{response_status} = undef;
2023 208         346 $self->{_response_size} = 0;
2024 208         451 $self->{request_start} = undef;
2025 208         410 $self->{current_request} = undef;
2026 208         436 $self->{request_future} = undef;
2027 208         1230 $self->{current_connection_state} = undef; # Clear for next request
2028              
2029             # Check if there's more data in the buffer (pipelining)
2030 208 100       10167 if (length($self->{buffer}) > 0) {
2031 1         5 $self->_try_handle_request;
2032             }
2033             } else {
2034             # Not keeping alive - close connection
2035 15         110 $self->_handle_disconnect_and_close('request_complete');
2036             }
2037             }
2038              
2039             sub _should_keep_alive {
2040 223     223   477 my ($self, $request) = @_;
2041              
2042 223   50     585 my $http_version = $request->{http_version} // '1.1';
2043              
2044             # Check for Connection header
2045 223         314 my $connection_header;
2046 223         295 for my $header (@{$request->{headers}}) {
  223         739  
2047 459 100       1062 if ($header->[0] eq 'connection') {
2048 214         564 $connection_header = lc($header->[1]);
2049 214         382 last;
2050             }
2051             }
2052              
2053             # HTTP/1.1: keep-alive by default unless Connection: close
2054 223 100       622 if ($http_version eq '1.1') {
2055 220 100 100     1308 return 0 if $connection_header && $connection_header =~ /close/;
2056 207         421 return 1;
2057             }
2058              
2059             # HTTP/1.0: close by default unless Connection: keep-alive
2060 3 50       9 if ($http_version eq '1.0') {
2061 3 100 66     15 return 1 if $connection_header && $connection_header =~ /keep-alive/;
2062 2         6 return 0;
2063             }
2064              
2065             # Unknown version: close connection
2066 0         0 return 0;
2067             }
2068              
2069             sub _create_scope {
2070 239     239   504 my ($self, $request) = @_;
2071              
2072             # Create connection state object for disconnect tracking (PAGI spec 0.3)
2073             # Uses lazy Future creation - Future only allocated if disconnect_future() is called
2074 239         2698 my $connection_state = PAGI::Server::ConnectionState->new(
2075             connection => $self,
2076             );
2077 239         580 $self->{current_connection_state} = $connection_state;
2078              
2079             my $scope = {
2080             type => 'http',
2081             pagi => {
2082             version => '0.2',
2083             spec_version => '0.2',
2084             features => {},
2085             },
2086             http_version => $request->{http_version},
2087             method => $request->{method},
2088             scheme => $self->_get_scheme,
2089             path => $request->{path},
2090             raw_path => $request->{raw_path},
2091             query_string => $request->{query_string},
2092             root_path => '',
2093             headers => $request->{headers},
2094             (defined $self->{client_host}
2095             ? (client => [$self->{client_host}, $self->{client_port}])
2096             : ()
2097             ),
2098             server => [$self->{server_host}, $self->{server_port}],
2099             # Optimized: avoid hash copy when state is empty (common case)
2100 239 100       2014 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  239 100       1350  
  3         15  
2101             extensions => $self->_get_extensions_for_scope,
2102             # Connection state for non-destructive disconnect detection (PAGI spec 0.3)
2103             'pagi.connection' => $connection_state,
2104             };
2105              
2106 239         556 return $scope;
2107             }
2108              
2109             sub _create_receive {
2110 239     239   410 my ($self, $request) = @_;
2111              
2112 239         416 my $content_length = $request->{content_length};
2113 239   50     567 my $is_chunked = $request->{chunked} // 0;
2114 239   50     587 my $expect_continue = $request->{expect_continue} // 0;
2115 239         323 my $continue_sent = 0;
2116 239         302 my $body_complete = 0;
2117 239         330 my $bytes_read = 0;
2118 239         435 my $chunk_size = 65536; # 64KB chunks for large bodies
2119              
2120             # For requests without Content-Length and not chunked, treat as no body
2121 239   100     1118 my $has_body = defined($content_length) && $content_length > 0 || $is_chunked;
2122              
2123 239         446 weaken(my $weak_self = $self);
2124              
2125             # Return a wrapper that tracks the Future from the async receive
2126             return sub {
2127 59 50   59   1256 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
2128 59 50       166 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
2129              
2130             # The actual async implementation
2131 59         196 my $future = (async sub {
2132 59 50       144 return { type => 'http.disconnect' } unless $weak_self;
2133 59 50       120 return { type => 'http.disconnect' } if $weak_self->{closed};
2134              
2135             # Check queue first - events from disconnect handler
2136 59 50       84 if (@{$weak_self->{receive_queue}}) {
  59         168  
2137 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2138             }
2139              
2140             # If body is already complete, wait for disconnect
2141 59 100       139 if ($body_complete) {
2142 1 50       3 if (!$weak_self->{receive_pending}) {
2143 1         3 $weak_self->{receive_pending} = Future->new;
2144             }
2145              
2146 1 50       6 if ($weak_self->{closed}) {
2147 0         0 $weak_self->{receive_pending} = undef;
2148 0         0 return { type => 'http.disconnect' };
2149             }
2150              
2151 1         2 my $result = await $weak_self->{receive_pending};
2152             # receive_pending may be completed with a value (disconnect event)
2153             # or just done() as a signal
2154 1 50       91 return $result if ref $result eq 'HASH';
2155             # If no value, check queue
2156 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2157 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2158             }
2159 0         0 return { type => 'http.disconnect' };
2160             }
2161              
2162             # For requests without body, return empty body immediately
2163 58 100       238 if (!$has_body) {
2164 40         58 $body_complete = 1;
2165             return {
2166 40         684 type => 'http.request',
2167             body => '',
2168             more => 0,
2169             };
2170             }
2171              
2172             # Send 100 Continue if client expects it (before reading body)
2173 18 50 33     40 if ($expect_continue && !$continue_sent) {
2174 0         0 $continue_sent = 1;
2175 0         0 $weak_self->{stream}->write($weak_self->{protocol}->serialize_continue);
2176             }
2177              
2178             # Handle chunked Transfer-Encoding
2179 18 100       28 if ($is_chunked) {
2180             # Wait for data if buffer is empty
2181 1   33     6 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2182 0 0       0 if (!$weak_self->{receive_pending}) {
2183 0         0 $weak_self->{receive_pending} = Future->new;
2184             }
2185 0         0 await $weak_self->{receive_pending};
2186 0         0 $weak_self->{receive_pending} = undef;
2187              
2188             # Check queue after waiting
2189 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2190 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2191             }
2192             }
2193              
2194             # Try to parse chunked data
2195 1         6 my ($data, $consumed, $complete) = $weak_self->{protocol}->parse_chunked_body($weak_self->{buffer});
2196              
2197             # Check for parse error (invalid chunk size)
2198 1 0 33     4 if (ref($data) eq 'HASH' && $data->{error}) {
2199 0         0 $weak_self->_handle_disconnect('protocol_error');
2200 0   0     0 $weak_self->_send_error_response($data->{error}, $data->{message} // 'Bad Request');
2201 0         0 $weak_self->_close;
2202 0         0 return { type => 'http.disconnect' };
2203             }
2204              
2205 1 50       2 if ($consumed > 0) {
2206 1         2 substr($weak_self->{buffer}, 0, $consumed) = '';
2207              
2208             # Track total bytes read for max_body_size check
2209 1   50     4 $bytes_read += length($data // '');
2210              
2211             # Check max_body_size for chunked requests (0 = unlimited)
2212 1 50 33     5 if ($weak_self->{max_body_size} && $bytes_read > $weak_self->{max_body_size}) {
2213             # Body too large - close connection
2214 0         0 $weak_self->_send_error_response(413, 'Payload Too Large');
2215 0         0 $weak_self->_handle_disconnect('body_too_large');
2216 0         0 $weak_self->_close;
2217 0         0 return { type => 'http.disconnect' };
2218             }
2219              
2220 1 50       2 if ($complete) {
2221 1         2 $body_complete = 1;
2222             }
2223              
2224             return {
2225 1 50 50     13 type => 'http.request',
2226             body => $data // '',
2227             more => $complete ? 0 : 1,
2228             };
2229             }
2230              
2231             # Need more data - wait for it
2232 0 0       0 if (!$weak_self->{receive_pending}) {
2233 0         0 $weak_self->{receive_pending} = Future->new;
2234             }
2235 0         0 await $weak_self->{receive_pending};
2236 0         0 $weak_self->{receive_pending} = undef;
2237              
2238             # Recursive call to re-process - but we can't use __SUB__ in nested async
2239             # Just return disconnect if closed
2240 0 0       0 return { type => 'http.disconnect' } if $weak_self->{closed};
2241             # This shouldn't happen often - caller should retry
2242 0         0 return { type => 'http.request', body => '', more => 1 };
2243             }
2244              
2245             # Handle Content-Length based body reading
2246 17         22 my $remaining = $content_length - $bytes_read;
2247              
2248 17 50       34 if ($remaining <= 0) {
2249 0         0 $body_complete = 1;
2250             return {
2251 0         0 type => 'http.request',
2252             body => '',
2253             more => 0,
2254             };
2255             }
2256              
2257             # Wait for data if buffer is empty
2258 17   66     79 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2259 12 50       19 if (!$weak_self->{receive_pending}) {
2260 12         33 $weak_self->{receive_pending} = Future->new;
2261             }
2262 12         91 await $weak_self->{receive_pending};
2263 12         719 $weak_self->{receive_pending} = undef;
2264              
2265             # Check queue after waiting
2266 12 50       13 if (@{$weak_self->{receive_queue}}) {
  12         44  
2267 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2268             }
2269             }
2270              
2271             # Return disconnect if closed while waiting
2272 17 50 33     38 if ($weak_self->{closed} && length($weak_self->{buffer}) == 0) {
2273 0         0 return { type => 'http.disconnect' };
2274             }
2275              
2276             # Read up to chunk_size or remaining bytes, whichever is smaller
2277 17 100       41 my $to_read = $remaining < $chunk_size ? $remaining : $chunk_size;
2278 17 100       46 $to_read = length($weak_self->{buffer}) if length($weak_self->{buffer}) < $to_read;
2279              
2280 17         80 my $body = substr($weak_self->{buffer}, 0, $to_read, '');
2281 17         28 $bytes_read += length($body);
2282              
2283             # Check if we've read all the body
2284 17 100       32 my $more = ($bytes_read < $content_length) ? 1 : 0;
2285              
2286 17 100       26 if (!$more) {
2287 5         7 $body_complete = 1;
2288             }
2289              
2290             return {
2291 17         138 type => 'http.request',
2292             body => $body,
2293             more => $more,
2294             };
2295 59         650 })->();
2296              
2297             # Track this Future so we can cancel it on close
2298 59         3500 push @{$weak_self->{receive_futures}}, $future;
  59         164  
2299              
2300             # Clean up completed futures from the list
2301 59         82 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  59         313  
  70         267  
  59         120  
2302              
2303 59         305 return $future;
2304 239         1877 };
2305             }
2306              
2307             sub _create_send {
2308 239     239   513 my ($self, $request) = @_;
2309              
2310 239         462 my $chunked = 0;
2311 239         337 my $response_started = 0;
2312 239         474 my $expects_trailers = 0;
2313 239         384 my $body_complete = 0;
2314 239   50     733 my $is_head_request = ($request->{method} // '') eq 'HEAD';
2315 239   50     646 my $http_version = $request->{http_version} // '1.1';
2316 239         456 my $is_http10 = ($http_version eq '1.0');
2317              
2318             # Check if HTTP/1.0 client requested keep-alive
2319 239         319 my $client_wants_keepalive = 0;
2320 239 100       559 if ($is_http10) {
2321 3         20 for my $h (@{$request->{headers}}) {
  3         13  
2322 2 100 66     10 if ($h->[0] eq 'connection' && lc($h->[1]) =~ /keep-alive/) {
2323 1         1 $client_wants_keepalive = 1;
2324 1         7 last;
2325             }
2326             }
2327             }
2328              
2329 239         396 weaken(my $weak_self = $self);
2330              
2331 469     469   709305 return async sub {
2332 469         741 my ($event) = @_;
2333 469 50       1022 return Future->done unless $weak_self;
2334 469 50       1128 return Future->done if $weak_self->{closed};
2335              
2336             # Reset stall timer on write activity
2337 469         1590 $weak_self->_reset_stall_timer;
2338              
2339 469   50     1095 my $type = $event->{type} // '';
2340              
2341             # Dev-mode event validation (PAGI spec compliance)
2342 469 50       992 if ($weak_self->{validate_events}) {
2343 0         0 require PAGI::Server::EventValidator;
2344 0         0 PAGI::Server::EventValidator::validate_http_send($event);
2345             }
2346              
2347 469 100       1270 if ($type eq 'http.response.start') {
    100          
    100          
    50          
2348 226 50       502 return if $response_started;
2349 226         362 $response_started = 1;
2350 226         427 $weak_self->{response_started} = 1;
2351 226   50     592 $weak_self->{response_status} = $event->{status} // 200; # Track for logging
2352 226   100     933 $expects_trailers = $event->{trailers} // 0;
2353              
2354 226   50     488 my $status = $event->{status} // 200;
2355 226   50     506 my $headers = $event->{headers} // [];
2356              
2357             # Check if we need chunked encoding (no Content-Length)
2358 226         357 my $has_content_length = 0;
2359 226         494 for my $h (@$headers) {
2360 317 100       904 if (lc($h->[0]) eq 'content-length') {
2361 62         69 $has_content_length = 1;
2362 62         93 last;
2363             }
2364             }
2365              
2366             # Add Date header
2367 226         724 my @final_headers = @$headers;
2368 226         1317 push @final_headers, ['date', $weak_self->{protocol}->format_date];
2369              
2370             # For HEAD requests, don't use chunked encoding (no body will be sent)
2371             # For HTTP/1.0, don't use chunked encoding - use Connection: close instead
2372 226 100 100     986 if ($is_head_request || $is_http10) {
2373 5         6 $chunked = 0;
2374 5 100       12 if ($is_http10) {
2375 3 100       10 if (!$has_content_length) {
    100          
2376             # No Content-Length means we can't do keep-alive
2377 1         5 push @final_headers, ['connection', 'close'];
2378             } elsif ($client_wants_keepalive) {
2379             # HTTP/1.0 client requested keep-alive and we can honor it
2380             # Must explicitly acknowledge with Connection: keep-alive
2381 1         3 push @final_headers, ['connection', 'keep-alive'];
2382             }
2383             }
2384             } else {
2385 221         437 $chunked = !$has_content_length;
2386             }
2387              
2388             my $response = $weak_self->{protocol}->serialize_response_start(
2389 226         1134 $status, \@final_headers, $chunked, $http_version
2390             );
2391              
2392             # Write headers to stream
2393 225         1109 $weak_self->{stream}->write($response);
2394             }
2395             elsif ($type eq 'http.response.body') {
2396 239 50       504 return unless $response_started;
2397 239 100       585 return if $body_complete;
2398              
2399             # For HEAD requests, suppress the body but track completion
2400 238 100       519 if ($is_head_request) {
2401 2   50     6 my $more = $event->{more} // 0;
2402 2 50       7 if (!$more) {
2403 2         4 $body_complete = 1;
2404             }
2405 2         6 return; # Don't send any body for HEAD
2406             }
2407              
2408             # --- BACKPRESSURE CHECK ---
2409             # Wait for buffer to drain if we're above high watermark
2410             # This prevents unbounded memory growth with slow clients
2411 236 50       885 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
2412 0         0 await $weak_self->_wait_for_drain;
2413             # Re-check connection state after await
2414 0 0       0 return Future->done unless $weak_self;
2415 0 0       0 return Future->done if $weak_self->{closed};
2416             }
2417             # --- END BACKPRESSURE CHECK ---
2418              
2419             # Determine body source: body, file, or fh (mutually exclusive)
2420 236         502 my $body = $event->{body};
2421 236         399 my $file = $event->{file};
2422 236         463 my $fh = $event->{fh};
2423 236   100     937 my $offset = $event->{offset} // 0;
2424 236         424 my $length = $event->{length};
2425              
2426 236 100       655 if (defined $file) {
    100          
2427             # File path response - stream from file (async, non-blocking)
2428             # File responses are implicitly complete (more is ignored)
2429 29         117 await $weak_self->_send_file_response($file, $offset, $length, $chunked);
2430 29         4903 $body_complete = 1;
2431             }
2432             elsif (defined $fh) {
2433             # Filehandle response - stream from handle (async, non-blocking)
2434             # Filehandle responses are implicitly complete (more is ignored)
2435 6         16 await $weak_self->_send_fh_response($fh, $offset, $length, $chunked);
2436 6         540 $body_complete = 1;
2437             }
2438             else {
2439             # Traditional body response
2440 201   100     488 $body //= '';
2441 201   100     495 my $more = $event->{more} // 0;
2442              
2443 201         437 $weak_self->{_response_size} += length($body);
2444              
2445 201 100       412 if ($chunked) {
2446 169 100       363 if (length $body) {
2447 161         724 my $len = sprintf("%x", length($body));
2448 161         712 $weak_self->{stream}->write("$len\r\n$body\r\n");
2449             }
2450             }
2451             else {
2452 32 50       149 $weak_self->{stream}->write($body) if length $body;
2453             }
2454              
2455             # Handle completion for body responses
2456 201 100       16250 if (!$more) {
2457 186         374 $body_complete = 1;
2458 186 100 100     878 if ($chunked && !$expects_trailers) {
2459 153         556 $weak_self->{stream}->write("0\r\n\r\n");
2460             }
2461             }
2462             }
2463             }
2464             elsif ($type eq 'http.response.trailers') {
2465 1 50       2 return unless $response_started;
2466 1 50       3 return unless $expects_trailers;
2467 1 50       2 return unless $chunked; # Trailers only work with chunked encoding
2468              
2469 1   50     4 my $trailer_headers = $event->{headers} // [];
2470              
2471             # Send final chunk + trailers
2472 1         2 my $trailers = "0\r\n";
2473 1         2 for my $header (@$trailer_headers) {
2474 1         2 my ($name, $value) = @$header;
2475 1         32 $name = _validate_header_name($name);
2476 1         4 $value = _validate_header_value($value);
2477 1         4 $trailers .= "$name: $value\r\n";
2478             }
2479 1         2 $trailers .= "\r\n";
2480              
2481 1         4 $weak_self->{stream}->write($trailers);
2482 1         76 $body_complete = 1;
2483             }
2484             elsif ($type eq 'http.fullflush') {
2485             # Fullflush extension - force immediate TCP buffer flush
2486             # Per spec: servers that don't advertise the extension must reject
2487 3 100       8 unless (exists $weak_self->{extensions}{fullflush}) {
2488 1         13 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
2489 1         12 die "Extension not enabled: fullflush\n";
2490             }
2491              
2492             # Force flush by ensuring TCP_NODELAY and flushing any pending writes
2493 2         4 my $handle = $weak_self->{stream}->write_handle;
2494 2 50 33     27 if ($handle && $handle->can('setsockopt')) {
2495             # Ensure TCP_NODELAY is set to disable Nagle buffering
2496 2         13 require Socket;
2497 2         5 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
2498             }
2499              
2500             # In IO::Async, writes are queued and sent when the event loop allows.
2501             # The above TCP_NODELAY ensures no Nagle buffering delays.
2502             # For this reference implementation, we return immediately as the
2503             # write buffer will be flushed by the event loop.
2504             }
2505             else {
2506             # Per PAGI spec: servers must raise exceptions for unrecognized event types
2507 0         0 _unrecognized_event_type($type, 'http');
2508             }
2509              
2510 464         65774 return;
2511 239         3354 };
2512             }
2513              
2514             sub _send_error_response {
2515 22     22   57 my ($self, $status, $message) = @_;
2516              
2517 22 50       67 return if $self->{closed};
2518 22 50       137 return if $self->{response_started};
2519              
2520 22         46 my $body = $message;
2521             my $headers = [
2522             ['content-type', 'text/plain'],
2523             ['content-length', length($body)],
2524 22         175 ['date', $self->{protocol}->format_date],
2525             ];
2526              
2527 22         115 my $response = $self->{protocol}->serialize_response_start($status, $headers, 0);
2528 22         52 $response .= $body;
2529              
2530 22         127 $self->{stream}->write($response);
2531 22         5565 $self->{response_started} = 1;
2532 22         106 $self->{response_status} = $status; # Track for logging
2533             }
2534              
2535             sub _write_access_log {
2536 272     272   597 my ($self) = @_;
2537              
2538 272 100       774 return unless $self->{access_log};
2539 263 50       661 return unless $self->{current_request};
2540              
2541 263         552 my $request = $self->{current_request};
2542              
2543             # Calculate request duration
2544 263         387 my $duration = 0;
2545 263 50       851 if ($self->{request_start}) {
2546 263         1840 $duration = tv_interval($self->{request_start});
2547             }
2548              
2549             # Per-second cached CLF timestamp
2550 263         4146 my $now = time();
2551 263 100       779 if ($now != $_cached_log_time) {
2552 66         174 $_cached_log_time = $now;
2553 66         301 my @gmt = gmtime($now);
2554 66         486 my @months = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
2555 66         626 $_cached_log_timestamp = sprintf("%02d/%s/%04d:%02d:%02d:%02d +0000",
2556             $gmt[3], $months[$gmt[4]], $gmt[5] + 1900,
2557             $gmt[2], $gmt[1], $gmt[0]);
2558             }
2559              
2560             my $info = {
2561             client_ip => $self->{client_host} // ($self->{transport_type} eq 'unix' ? 'unix' : '-'),
2562             timestamp => $_cached_log_timestamp,
2563             method => $request->{method} // '-',
2564             path => $request->{raw_path} // '/',
2565             query => $request->{query_string},
2566             http_version => $request->{http_version} // '1.1',
2567             status => $self->{response_status} // '-',
2568             size => $self->{_response_size} // 0,
2569             duration => $duration,
2570 263 50 66     5241 request_headers => $request->{headers} // [],
      50        
      50        
      50        
      50        
      50        
      50        
2571             };
2572              
2573 263         594 my $formatter = $self->{_access_log_formatter};
2574 263 50       639 if ($formatter) {
2575 263         509 print {$self->{access_log}} $formatter->($info), "\n";
  263         1219  
2576             }
2577             else {
2578             # Fallback (should not happen with properly initialized server)
2579 0         0 my $path = $info->{path};
2580 0         0 my $query = $info->{query};
2581 0 0 0     0 $path .= "?$query" if defined $query && length $query;
2582 0         0 print {$self->{access_log}} "$info->{client_ip} - - [$info->{timestamp}] \"$info->{method} $path\" $info->{status} $info->{duration}s\n";
  0         0  
2583             }
2584             }
2585              
2586             sub _handle_disconnect {
2587 545     545   1287 my ($self, $reason) = @_;
2588              
2589             # Idempotency guard - prevent duplicate disconnect handling
2590             # Multiple paths can trigger disconnect (timeout, protocol error, session end)
2591 545 100       1828 return if $self->{_disconnect_handled};
2592 322         988 $self->{_disconnect_handled} = 1;
2593              
2594             # Cancel any pending drain waiters (backpressure)
2595 322         1660 $self->_cancel_drain_waiters($reason);
2596              
2597             # Auto-detect server shutdown (PAGI spec compliance)
2598             # If no explicit reason and server is shutting down, use server_shutdown
2599 322 50 66     877 if (!$reason && $self->{server} && $self->{server}{shutting_down}) {
      33        
2600 0         0 $reason = 'server_shutdown';
2601             }
2602              
2603             # Default reason is client_closed (TCP FIN received)
2604 322   100     809 $reason //= 'client_closed';
2605              
2606             # Mark HTTP connection state as disconnected (PAGI spec 0.3)
2607             # Only for HTTP - WebSocket/SSE have their own patterns
2608 322 50 66     1357 if ($self->{current_connection_state} && !$self->{websocket_mode} && !$self->{sse_mode}) {
      66        
2609 31         204 $self->{current_connection_state}->_mark_disconnected($reason);
2610             }
2611              
2612             # Determine disconnect event type based on mode
2613 322         592 my $disconnect_event;
2614 322 100       1555 if ($self->{websocket_mode}) {
    100          
2615 18         86 $disconnect_event = { type => 'websocket.disconnect', code => 1006, reason => '' };
2616             } elsif ($self->{sse_mode}) {
2617             $disconnect_event = {
2618             type => 'sse.disconnect',
2619 14   50     138 reason => $self->{sse_disconnect_reason} // 'client_closed',
2620             };
2621             } else {
2622 290         1052 $disconnect_event = { type => 'http.disconnect' };
2623             }
2624              
2625             # Queue disconnect event (do this even if already closed)
2626 322         505 push @{$self->{receive_queue}}, $disconnect_event;
  322         959  
2627              
2628             # Complete any pending receive
2629 322 100 100     1365 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready) {
2630 16         123 $self->{receive_pending}->done($disconnect_event);
2631 16         3722 $self->{receive_pending} = undef;
2632             }
2633             }
2634              
2635             # Send a WebSocket close frame with status code and optional reason
2636             # Per RFC 6455 Section 7.4, common codes:
2637             # 1000 - Normal closure
2638             # 1007 - Invalid frame payload data (e.g., invalid UTF-8)
2639             # 1009 - Message too big
2640             # 1011 - Unexpected condition
2641             sub _send_close_frame {
2642 3     3   17 my ($self, $code, $reason) = @_;
2643 3   50     9 $reason //= '';
2644              
2645 3 50       11 return unless $self->{stream};
2646 3 50       9 return if $self->{close_sent};
2647              
2648 3         24 my $frame = Protocol::WebSocket::Frame->new(
2649             type => 'close',
2650             buffer => pack('n', $code) . $reason,
2651             );
2652              
2653 3         149 $self->{stream}->write($frame->to_bytes);
2654 3         888 $self->{close_sent} = 1;
2655             }
2656              
2657             sub _close {
2658 547     547   2113432 my ($self) = @_;
2659              
2660 547 100       1861 return if $self->{closed};
2661 322         678 $self->{closed} = 1;
2662              
2663             # Cancel pending drain waiters early (before other cleanup)
2664 322         1145 $self->_cancel_drain_waiters('connection closing');
2665              
2666             # Clean up HTTP/2 per-stream state
2667 322 50       955 if ($self->{h2_streams}) {
2668 322         461 for my $stream (values %{$self->{h2_streams}}) {
  322         1072  
2669 25 100 66     441 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
2670             my $event = $stream->{is_sse}
2671 9 100       88 ? { type => 'sse.disconnect', reason => 'client_closed' }
2672             : { type => 'http.disconnect' };
2673 9         45 $stream->{body_pending}->done($event);
2674             }
2675             }
2676 322         2540 delete $self->{h2_streams};
2677             }
2678 322 100       1017 if ($self->{h2_session}) {
2679 62         126 eval { $self->{h2_session}->terminate(0) };
  62         637  
2680 62         6252 delete $self->{h2_session};
2681             }
2682              
2683             # Clean up WebSocket frame parser to free memory immediately
2684 322         870 delete $self->{websocket_frame};
2685              
2686             # Remove from server's connection list (O(1) hash delete)
2687 322 50       1109 if ($self->{server}) {
2688 322         1379 delete $self->{server}{connections}{refaddr($self)};
2689              
2690             # Signal drain complete if this was the last connection during shutdown
2691 322 100 100     1289 if ($self->{server}{shutting_down} &&
      100        
      66        
2692 192         1559 keys %{$self->{server}{connections}} == 0 &&
2693             $self->{server}{drain_complete} &&
2694             !$self->{server}{drain_complete}->is_ready) {
2695 1         18 $self->{server}{drain_complete}->done;
2696             }
2697             }
2698              
2699             # Stop idle timer
2700 322         1705 $self->_stop_idle_timer;
2701              
2702             # Stop stall timer
2703 322         1267 $self->_stop_stall_timer;
2704              
2705             # Stop WS/SSE idle timers
2706 322         1336 $self->_stop_ws_idle_timer;
2707 322         1280 $self->_stop_sse_idle_timer;
2708              
2709             # Stop keepalive timers
2710 322         1396 $self->_stop_ws_keepalive;
2711 322         1452 $self->_stop_sse_keepalive;
2712              
2713             # Note: _close is resource cleanup ONLY. Callers should use
2714             # _handle_disconnect_and_close() which handles both protocol
2715             # notification and cleanup.
2716              
2717             # Determine disconnect event type based on mode
2718 322         586 my $disconnect_event;
2719 322 100       1271 if ($self->{websocket_mode}) {
    100          
2720 18         77 $disconnect_event = { type => 'websocket.disconnect', code => 1006, reason => '' };
2721             } elsif ($self->{sse_mode}) {
2722             $disconnect_event = {
2723             type => 'sse.disconnect',
2724 14   50     72 reason => $self->{sse_disconnect_reason} // 'client_closed',
2725             };
2726             } else {
2727 290         937 $disconnect_event = { type => 'http.disconnect' };
2728             }
2729              
2730             # Cancel any tracked receive Futures that are still pending
2731 322         529 for my $future (@{$self->{receive_futures}}) {
  322         1071  
2732 18 50       64 if (!$future->is_ready) {
2733             # Complete with disconnect event instead of cancelling
2734             # This allows the async sub to complete cleanly
2735 0         0 $future->done($disconnect_event);
2736             }
2737             }
2738 322         834 $self->{receive_futures} = [];
2739              
2740 322 50       1030 if ($self->{stream}) {
2741 322         1486 $self->{stream}->close_when_empty;
2742             }
2743             }
2744              
2745             # Combined disconnect and close - use this from callbacks where $weak_self may
2746             # become undefined after _handle_disconnect completes its Future callbacks.
2747             # This method holds a strong reference to $self throughout the operation.
2748             sub _handle_disconnect_and_close {
2749 524     524   1349 my ($self, $reason) = @_;
2750 524         2103 $self->_handle_disconnect($reason);
2751 524         1974 $self->_close;
2752             }
2753              
2754             #
2755             # TLS Support Methods
2756             #
2757              
2758             sub _extract_tls_info {
2759 44     44   102 my ($self) = @_;
2760              
2761 44         99 my $stream = $self->{stream};
2762 44         145 my $handle = $stream->read_handle;
2763              
2764             # Check if handle is an IO::Socket::SSL
2765 44 50 33     602 return unless $handle && $handle->isa('IO::Socket::SSL');
2766              
2767 44         442 my $tls_info = {
2768             server_cert => undef,
2769             client_cert_chain => [],
2770             client_cert_name => undef,
2771             client_cert_error => undef,
2772             tls_version => undef,
2773             cipher_suite => undef,
2774             };
2775              
2776             # Get TLS version - IO::Socket::SSL returns something like 'TLSv1_3'
2777 44 50       288 if (my $version_str = $handle->get_sslversion) {
2778             # Map version string to numeric value per TLS spec
2779 44         1520 my %version_map = (
2780             'SSLv3' => 0x0300,
2781             'TLSv1' => 0x0301,
2782             'TLSv1_1' => 0x0302,
2783             'TLSv1_2' => 0x0303,
2784             'TLSv1_3' => 0x0304,
2785             );
2786 44         361 $tls_info->{tls_version} = $version_map{$version_str};
2787             }
2788              
2789             # Get cipher suite
2790 44 50       284 if (my $cipher = $handle->get_cipher) {
2791             # IO::Socket::SSL provides cipher name, we need to map to numeric
2792             # For now, get the raw bits if available, or store undef
2793             # IO::Socket::SSL doesn't easily expose the numeric cipher ID
2794             # We'll try to get it from the SSL object
2795 44         617 my $ssl = $handle->_get_ssl_object;
2796 44 50 33     812 if ($ssl && $ssl->can('get_cipher_bits')) {
2797             # Unfortunately, OpenSSL doesn't expose cipher ID easily via perl bindings
2798             # We'll leave cipher_suite as undef for this reference implementation
2799             # A production server could use Net::SSLeay::get_current_cipher and
2800             # Net::SSLeay::CIPHER_get_id for the actual numeric ID
2801 0         0 eval {
2802 0         0 require Net::SSLeay;
2803 0         0 my $current_cipher = Net::SSLeay::get_current_cipher($ssl);
2804 0 0       0 if ($current_cipher) {
2805 0         0 my $id = Net::SSLeay::CIPHER_get_id($current_cipher);
2806             # The ID from OpenSSL includes protocol bits in upper bytes
2807             # We want just the cipher suite ID (lower 16 bits usually, but SSL3+ uses different encoding)
2808             # For TLS, the ID is returned as a 32-bit value with protocol in upper bits
2809             # Extract lower 16 bits for the cipher suite
2810 0 0       0 $tls_info->{cipher_suite} = $id & 0xFFFF if defined $id;
2811             }
2812             };
2813 0 0       0 if ($@) {
2814 0         0 warn "TLS cipher suite extraction error: $@\n";
2815 0         0 $tls_info->{cipher_extraction_error} = $@;
2816             }
2817             }
2818             }
2819              
2820             # Get server certificate (our certificate)
2821             # IO::Socket::SSL uses sock_certificate() for the server's own cert
2822 44         99 eval {
2823 44         206 my $cert = $handle->sock_certificate;
2824 44 50       798 if ($cert) {
2825 44         499 require Net::SSLeay;
2826 44         1189 $tls_info->{server_cert} = Net::SSLeay::PEM_get_string_X509($cert);
2827             }
2828             };
2829 44 50       196 if ($@) {
2830 0         0 warn "TLS server certificate extraction error: $@\n";
2831 0         0 $tls_info->{server_cert_error} = $@;
2832             }
2833              
2834             # Get client certificate if provided
2835 44         60 eval {
2836 44         324 my $client_cert = $handle->peer_certificate;
2837 44 100       2569 if ($client_cert) {
2838 8         88 require Net::SSLeay;
2839              
2840             # Get client cert chain
2841 8         32 my @chain;
2842 8         156 push @chain, Net::SSLeay::PEM_get_string_X509($client_cert);
2843              
2844             # Try to get additional certs in chain
2845 8 50       68 if (my $ssl = $handle->_get_ssl_object) {
2846 8         164 my $chain_obj = Net::SSLeay::get_peer_cert_chain($ssl);
2847 8 50       56 if ($chain_obj) {
2848 0         0 for my $i (0 .. Net::SSLeay::sk_X509_num($chain_obj) - 1) {
2849 0         0 my $cert = Net::SSLeay::sk_X509_value($chain_obj, $i);
2850 0 0       0 push @chain, Net::SSLeay::PEM_get_string_X509($cert) if $cert;
2851             }
2852             }
2853             }
2854 8         28 $tls_info->{client_cert_chain} = \@chain;
2855              
2856             # Get client cert DN (Subject)
2857 8         104 my $subject = Net::SSLeay::X509_NAME_oneline(
2858             Net::SSLeay::X509_get_subject_name($client_cert)
2859             );
2860 8 50       64 $tls_info->{client_cert_name} = $subject if $subject;
2861              
2862             # Check for verification errors
2863 8         36 my $verify_result = $handle->get_sslversion_int;
2864             # Actually, use verify_result
2865 8 50       148 if (my $ssl = $handle->_get_ssl_object) {
2866 8         116 my $result = Net::SSLeay::get_verify_result($ssl);
2867 8 50       56 if ($result != 0) { # X509_V_OK = 0
2868 0         0 $tls_info->{client_cert_error} = Net::SSLeay::X509_verify_cert_error_string($result);
2869             }
2870             }
2871             }
2872             };
2873 44 50       4411 if ($@) {
2874 0         0 warn "TLS client certificate extraction error: $@\n";
2875 0         0 $tls_info->{client_cert_extraction_error} = $@;
2876             }
2877              
2878 44         145 $self->{tls_info} = $tls_info;
2879             }
2880              
2881             sub _get_scheme {
2882 253     253   470 my ($self) = @_;
2883              
2884 253 100       2082 return $self->{tls_enabled} ? 'https' : 'http';
2885             }
2886              
2887             sub _get_ws_scheme {
2888 34     34   64 my ($self) = @_;
2889              
2890 34 50       301 return $self->{tls_enabled} ? 'wss' : 'ws';
2891             }
2892              
2893             sub _get_extensions_for_scope {
2894 338     338   684 my ($self) = @_;
2895              
2896 338         545 my %extensions = %{$self->{extensions}};
  338         995  
2897              
2898             # Add TLS info to extensions if this is a TLS connection
2899 338 100 66     1618 if ($self->{tls_enabled} && $self->{tls_info}) {
    50          
2900 44         98 $extensions{tls} = $self->{tls_info};
2901             }
2902             # Remove tls extension if not a TLS connection (per spec)
2903             elsif (!$self->{tls_enabled}) {
2904 294         493 delete $extensions{tls};
2905             }
2906              
2907 338         4013 return \%extensions;
2908             }
2909              
2910             #
2911             # SSE (Server-Sent Events) Support Methods
2912             #
2913              
2914 14     14   20 async sub _handle_sse_request {
2915 14         27 my ($self, $request) = @_;
2916              
2917 14         31 $self->{sse_mode} = 1;
2918 14         112 $self->_stop_idle_timer; # SSE connections are long-lived
2919 14         58 $self->_start_sse_idle_timer; # Start SSE-specific idle timer if configured
2920              
2921 14         44 my $scope = $self->_create_sse_scope($request);
2922 14         42 my $receive = $self->_create_sse_receive($request);
2923 14         76 my $send = $self->_create_sse_send($request);
2924              
2925 14         28 eval {
2926 14         62 await $self->{app}->($scope, $receive, $send);
2927             };
2928              
2929 14 50       1145 if (my $error = $@) {
2930             # If SSE not yet started, send HTTP error
2931 0 0       0 if (!$self->{sse_started}) {
2932 0         0 $self->_send_error_response(500, "Internal Server Error");
2933             }
2934 0         0 warn "PAGI application error (SSE): $error\n";
2935             }
2936              
2937             # Send chunked terminator if SSE was started (uses chunked encoding)
2938             # Check both closed flag and that stream is still writable
2939 14 100 33     123 if ($self->{sse_started} && !$self->{closed} &&
      33        
      66        
2940             $self->{stream} && $self->{stream}->write_handle) {
2941 13         72 $self->{stream}->write("0\r\n\r\n");
2942             }
2943              
2944             # Write access log entry (logs at connection close with total duration)
2945 14         1023 $self->_write_access_log;
2946              
2947             # Close connection after SSE stream ends
2948 14         53 $self->_handle_disconnect_and_close('stream_complete');
2949             }
2950              
2951             sub _create_sse_scope {
2952 14     14   28 my ($self, $request) = @_;
2953              
2954             my $scope = {
2955             type => 'sse',
2956             pagi => {
2957             version => '0.2',
2958             spec_version => '0.2',
2959             features => {},
2960             },
2961             http_version => $request->{http_version},
2962             method => $request->{method},
2963             scheme => $self->_get_scheme,
2964             path => $request->{path},
2965             raw_path => $request->{raw_path},
2966             query_string => $request->{query_string},
2967             root_path => '',
2968             headers => $request->{headers},
2969             (defined $self->{client_host}
2970             ? (client => [$self->{client_host}, $self->{client_port}])
2971             : ()
2972             ),
2973             server => [$self->{server_host}, $self->{server_port}],
2974             # Optimized: avoid hash copy when state is empty (common case)
2975 14 50       97 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  14 50       68  
  0         0  
2976             extensions => $self->_get_extensions_for_scope,
2977             };
2978              
2979 14         30 return $scope;
2980             }
2981              
2982             sub _create_sse_receive {
2983 14     14   23 my ($self, $request) = @_;
2984              
2985 14         29 my $content_length = $request->{content_length};
2986 14   66     35 my $has_body = defined($content_length) && $content_length > 0;
2987 14         19 my $body_complete = 0;
2988 14         17 my $bytes_read = 0;
2989              
2990 14         28 weaken(my $weak_self = $self);
2991              
2992             # Helper to create SSE disconnect event with reason
2993             my $sse_disconnect = sub {
2994             return {
2995             type => 'sse.disconnect',
2996 0 0 0 0   0 reason => ($weak_self ? $weak_self->{sse_disconnect_reason} : undef) // 'client_closed',
2997             };
2998 14         50 };
2999              
3000             return sub {
3001 5 50   5   84 return Future->done($sse_disconnect->())
3002             unless $weak_self;
3003             return Future->done($sse_disconnect->())
3004 5 50       13 if $weak_self->{closed};
3005              
3006 5         8 my $future = (async sub {
3007 5 50       13 return $sse_disconnect->()
3008             unless $weak_self;
3009             return $sse_disconnect->()
3010 5 50       10 if $weak_self->{closed};
3011              
3012             # Check queue first
3013 5 50       7 if (@{$weak_self->{receive_queue}}) {
  5         14  
3014 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3015             }
3016              
3017             # Handle request body for POST/PUT SSE requests
3018 5 100 66     29 if ($has_body && !$body_complete) {
3019 1         17 my $remaining = $content_length - $bytes_read;
3020              
3021             # Wait for data if buffer is empty
3022 1   33     17 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed} && $remaining > 0) {
      33        
3023 0 0       0 if (!$weak_self->{receive_pending}) {
3024 0         0 $weak_self->{receive_pending} = Future->new;
3025             }
3026 0         0 await $weak_self->{receive_pending};
3027 0         0 $weak_self->{receive_pending} = undef;
3028              
3029 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3030 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3031             }
3032             }
3033              
3034 1 50       5 return $sse_disconnect->() if $weak_self->{closed};
3035              
3036             # Read available data up to remaining
3037             my $to_read = $remaining < length($weak_self->{buffer})
3038             ? $remaining
3039 1 50       5 : length($weak_self->{buffer});
3040              
3041 1         5 my $chunk = substr($weak_self->{buffer}, 0, $to_read, '');
3042 1         2 $bytes_read += length($chunk);
3043              
3044 1 50       4 my $more = ($bytes_read < $content_length) ? 1 : 0;
3045 1 50       5 $body_complete = 1 if !$more;
3046              
3047             return {
3048 1         15 type => 'sse.request',
3049             body => $chunk,
3050             more => $more,
3051             };
3052             }
3053              
3054             # No body or body complete - return empty body if not yet returned
3055 4 100       8 if (!$body_complete) {
3056 2         25 $body_complete = 1;
3057             return {
3058 2         18 type => 'sse.request',
3059             body => '',
3060             more => 0,
3061             };
3062             }
3063              
3064             # Wait for disconnect
3065 2         3 while (1) {
3066 3 100       10 if (@{$weak_self->{receive_queue}}) {
  3         8  
3067 1         3 return shift @{$weak_self->{receive_queue}};
  1         7  
3068             }
3069              
3070             return $sse_disconnect->()
3071 2 50       6 if $weak_self->{closed};
3072              
3073 2 50       6 if (!$weak_self->{receive_pending}) {
3074 2         20 $weak_self->{receive_pending} = Future->new;
3075             }
3076 2         16 await $weak_self->{receive_pending};
3077 1         100 $weak_self->{receive_pending} = undef;
3078             }
3079 5         33 })->();
3080              
3081             # Track this Future
3082 5         247 push @{$weak_self->{receive_futures}}, $future;
  5         10  
3083 5         10 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  5         28  
  5         11  
  5         11  
3084              
3085 5         17 return $future;
3086 14         108 };
3087             }
3088              
3089             sub _format_sse_event {
3090 43     43   185596 my ($event) = @_;
3091 43         85 my $sse_data = '';
3092              
3093 43 100 66     197 if (defined $event->{event} && length $event->{event}) {
3094             die "Invalid SSE event name: contains newline\n"
3095 17 100       144 if $event->{event} =~ /[\r\n]/;
3096 16         53 $sse_data .= "event: $event->{event}\n";
3097             }
3098              
3099 42   50     114 my $data = $event->{data} // '';
3100 42         361 for my $line (split /\r?\n|\r/, $data, -1) {
3101 57         131 $sse_data .= "data: $line\n";
3102             }
3103              
3104 42 100 66     185 if (defined $event->{id} && length $event->{id}) {
3105             die "Invalid SSE id: contains newline\n"
3106 7 100       135 if $event->{id} =~ /[\r\n]/;
3107 6         17 $sse_data .= "id: $event->{id}\n";
3108             }
3109              
3110 41 100       109 if (defined $event->{retry}) {
3111             die "Invalid SSE retry: must be a non-negative integer\n"
3112 7 100       43 unless $event->{retry} =~ /\A[0-9]+\z/;
3113 5         10 $sse_data .= "retry: $event->{retry}\n";
3114             }
3115              
3116 39         136 $sse_data .= "\n";
3117 39         86 return $sse_data;
3118             }
3119              
3120             sub _format_sse_comment {
3121 5     5   3327 my ($event) = @_;
3122 5   50     14 my $text = $event->{comment} // '';
3123 5         8 my $formatted = '';
3124 5         41 for my $line (split /\r?\n|\r/, $text, -1) {
3125 9 100       24 $line = ":$line" unless $line =~ /^:/;
3126 9         16 $formatted .= "$line\n";
3127             }
3128 5         16 $formatted .= "\n";
3129 5         13 return $formatted;
3130             }
3131              
3132             sub _create_sse_send {
3133 14     14   26 my ($self, $request) = @_;
3134              
3135 14         23 weaken(my $weak_self = $self);
3136              
3137 32     32   825 return async sub {
3138 32         54 my ($event) = @_;
3139 32 50       91 return Future->done unless $weak_self;
3140 32 50       87 return Future->done if $weak_self->{closed};
3141              
3142             # Reset SSE idle timer on send activity
3143 32         129 $weak_self->_reset_sse_idle_timer;
3144              
3145 32   50     105 my $type = $event->{type} // '';
3146              
3147             # Dev-mode event validation (PAGI spec compliance)
3148 32 50       105 if ($weak_self->{validate_events}) {
3149 0         0 require PAGI::Server::EventValidator;
3150 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
3151             }
3152              
3153 32 100       112 if ($type eq 'sse.start') {
    100          
    100          
    50          
    50          
3154 14 50       27 return if $weak_self->{sse_started};
3155 14         26 $weak_self->{sse_started} = 1;
3156 14         23 $weak_self->{response_started} = 1;
3157              
3158 14   50     39 my $status = $event->{status} // 200;
3159 14         70 $weak_self->{response_status} = $status; # Track for access logging
3160 14   100     36 my $headers = $event->{headers} // [];
3161              
3162             # Ensure Content-Type is text/event-stream
3163 14         19 my $has_content_type = 0;
3164 14         31 for my $h (@$headers) {
3165 7 50       20 if (lc($h->[0]) eq 'content-type') {
3166 7         12 $has_content_type = 1;
3167 7         13 last;
3168             }
3169             }
3170              
3171 14         29 my @final_headers = @$headers;
3172 14 100       32 if (!$has_content_type) {
3173 7         14 push @final_headers, ['content-type', 'text/event-stream'];
3174             }
3175              
3176             # Add Cache-Control and Connection headers for SSE
3177 14         80 push @final_headers, ['cache-control', 'no-cache'];
3178 14         29 push @final_headers, ['connection', 'keep-alive'];
3179 14         75 push @final_headers, ['date', $weak_self->{protocol}->format_date];
3180              
3181             # SSE uses chunked encoding implicitly (no Content-Length)
3182             my $response = $weak_self->{protocol}->serialize_response_start(
3183 14         54 $status, \@final_headers, 1 # chunked = 1
3184             );
3185              
3186 14         77 $weak_self->{stream}->write($response);
3187              
3188             # Set protocol-specific keepalive writer (HTTP/1.1 chunked)
3189             $weak_self->{sse_keepalive_writer} = sub {
3190 0         0 my ($text) = @_;
3191 0 0       0 return unless $weak_self;
3192 0 0       0 return if $weak_self->{closed};
3193 0         0 my $len = sprintf("%x", length($text));
3194 0         0 $weak_self->{stream}->write("$len\r\n$text\r\n");
3195 14         3406 };
3196             }
3197             elsif ($type eq 'sse.send') {
3198 15 50       63 return unless $weak_self->{sse_started};
3199              
3200             # --- BACKPRESSURE CHECK ---
3201 15 50       69 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
3202 0         0 await $weak_self->_wait_for_drain;
3203 0 0       0 return Future->done unless $weak_self;
3204 0 0       0 return Future->done if $weak_self->{closed};
3205             }
3206             # --- END BACKPRESSURE CHECK ---
3207              
3208 15         93 my $sse_data = _format_sse_event($event);
3209              
3210             # Send as chunked data
3211 15         81 my $len = sprintf("%x", length($sse_data));
3212 15         130 $weak_self->{stream}->write("$len\r\n$sse_data\r\n");
3213             }
3214             elsif ($type eq 'sse.comment') {
3215 2 50       5 return unless $weak_self->{sse_started};
3216              
3217 2         5 my $comment = _format_sse_comment($event);
3218              
3219 2         9 my $len = sprintf("%x", length($comment));
3220 2         8 $weak_self->{stream}->write("$len\r\n$comment\r\n");
3221             }
3222             elsif ($type eq 'sse.keepalive') {
3223             # SSE keepalive - starts/stops periodic comment timer
3224 0   0     0 my $interval = $event->{interval} // 0;
3225 0         0 my $comment = $event->{comment};
3226              
3227 0 0       0 if ($interval > 0) {
3228 0         0 $weak_self->_start_sse_keepalive($interval, $comment);
3229             }
3230             else {
3231 0         0 $weak_self->_stop_sse_keepalive;
3232             }
3233             }
3234             elsif ($type eq 'http.fullflush') {
3235             # Fullflush extension - force immediate TCP buffer flush
3236             # Per spec: servers that don't advertise the extension must reject
3237 1 50       3 unless (exists $weak_self->{extensions}{fullflush}) {
3238 0         0 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
3239 0         0 die "Extension not enabled: fullflush\n";
3240             }
3241              
3242             # Force flush by ensuring TCP_NODELAY
3243 1         4 my $handle = $weak_self->{stream}->write_handle;
3244 1 50 33     8 if ($handle && $handle->can('setsockopt')) {
3245 1         7 require Socket;
3246 1         4 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
3247             }
3248             }
3249             else {
3250             # Per PAGI spec: servers must raise exceptions for unrecognized event types
3251 0         0 _unrecognized_event_type($type, 'sse');
3252             }
3253              
3254 32         2864 return;
3255 14         236 };
3256             }
3257              
3258             #
3259             # WebSocket Support Methods
3260             #
3261              
3262             # WebSocket handshake magic GUID per RFC 6455
3263 87     87   1235 use constant WS_GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
  87         213  
  87         406962  
3264              
3265 21     21   46 async sub _handle_websocket_request {
3266 21         121 my ($self, $request) = @_;
3267              
3268 21         79 $self->_stop_idle_timer; # WebSocket connections are long-lived
3269 21         180 $self->_start_ws_idle_timer; # Start WebSocket-specific idle timer if configured
3270              
3271 21         96 my $scope = $self->_create_websocket_scope($request);
3272 21         66 my $receive = $self->_create_websocket_receive($request);
3273 21         70 my $send = $self->_create_websocket_send($request);
3274              
3275 21         36 eval {
3276 21         95 await $self->{app}->($scope, $receive, $send);
3277             };
3278              
3279 20 100       1918 if (my $error = $@) {
3280             # If handshake not yet done, send HTTP error
3281 2 50       6 if (!$self->{websocket_accepted}) {
3282 2         21 $self->_send_error_response(500, "Internal Server Error");
3283             }
3284 2         64 warn "PAGI application error (WebSocket): $error\n";
3285             }
3286              
3287             # Write access log entry (logs at connection close with total duration)
3288 20         134 $self->_write_access_log;
3289              
3290             # Close connection after WebSocket session ends
3291 20         94 $self->_handle_disconnect_and_close('session_complete');
3292             }
3293              
3294             sub _create_websocket_scope {
3295 21     21   38 my ($self, $request) = @_;
3296              
3297             # Extract WebSocket key and subprotocols from headers
3298 21         123 my $ws_key;
3299             my @subprotocols;
3300              
3301 21         27 for my $header (@{$request->{headers}}) {
  21         109  
3302 118         168 my ($name, $value) = @$header;
3303 118 100       209 if ($name eq 'sec-websocket-key') {
    100          
3304 21         35 $ws_key = $value;
3305             }
3306             elsif ($name eq 'sec-websocket-protocol') {
3307             # Parse comma-separated list of subprotocols
3308 2         8 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  3         19  
3309             }
3310             }
3311              
3312             # Store ws_key for handshake response
3313 21         53 $self->{ws_key} = $ws_key;
3314              
3315             my $scope = {
3316             type => 'websocket',
3317             pagi => {
3318             version => '0.2',
3319             spec_version => '0.2',
3320             features => {},
3321             },
3322             http_version => $request->{http_version},
3323             scheme => $self->_get_ws_scheme,
3324             path => $request->{path},
3325             raw_path => $request->{raw_path},
3326             query_string => $request->{query_string},
3327             root_path => '',
3328             headers => $request->{headers},
3329             (defined $self->{client_host}
3330             ? (client => [$self->{client_host}, $self->{client_port}])
3331             : ()
3332             ),
3333             server => [$self->{server_host}, $self->{server_port}],
3334             subprotocols => \@subprotocols,
3335             # Optimized: avoid hash copy when state is empty (common case)
3336 21 50       152 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  21 50       102  
  0         0  
3337             extensions => $self->_get_extensions_for_scope,
3338             };
3339              
3340 21         98 return $scope;
3341             }
3342              
3343             sub _create_websocket_receive {
3344 21     21   39 my ($self, $request) = @_;
3345              
3346 21         35 my $connect_sent = 0;
3347 21         44 weaken(my $weak_self = $self);
3348              
3349             return sub {
3350 154 50   154   430337 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3351             unless $weak_self;
3352              
3353             # Check queue first - drain queued messages even if closed
3354 154 100       191 if (@{$weak_self->{receive_queue}}) {
  154         246  
3355 108         110 return Future->done(shift @{$weak_self->{receive_queue}});
  108         228  
3356             }
3357              
3358             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3359 46 50       119 if $weak_self->{closed};
3360              
3361 46         77 my $future = (async sub {
3362 46 50       83 return { type => 'websocket.disconnect', code => 1006, reason => '' }
3363             unless $weak_self;
3364              
3365             # Check queue first - drain queued messages even if closed
3366 46 50       50 if (@{$weak_self->{receive_queue}}) {
  46         88  
3367 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3368             }
3369              
3370             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3371 46 50       104 if $weak_self->{closed};
3372              
3373             # First call returns websocket.connect
3374 46 100       84 if (!$connect_sent) {
3375 19         25 $connect_sent = 1;
3376 19         179 return { type => 'websocket.connect' };
3377             }
3378              
3379             # If not in WebSocket mode yet (waiting for accept), wait
3380 27   33     64 while (!$weak_self->{websocket_mode} && !$weak_self->{closed}) {
3381 0 0       0 if (!$weak_self->{receive_pending}) {
3382 0         0 $weak_self->{receive_pending} = Future->new;
3383             }
3384 0         0 await $weak_self->{receive_pending};
3385 0         0 $weak_self->{receive_pending} = undef;
3386              
3387 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3388 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3389             }
3390             }
3391              
3392             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3393 27 50       46 if $weak_self->{closed};
3394              
3395             # Wait for events from frame processing
3396 27         29 while (1) {
3397 54 100       73 if (@{$weak_self->{receive_queue}}) {
  54         105  
3398 27         39 return shift @{$weak_self->{receive_queue}};
  27         117  
3399             }
3400              
3401             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3402 27 50       60 if $weak_self->{closed};
3403              
3404 27 50       59 if (!$weak_self->{receive_pending}) {
3405 27         58 $weak_self->{receive_pending} = Future->new;
3406             }
3407 27         194 await $weak_self->{receive_pending};
3408 27         2042 $weak_self->{receive_pending} = undef;
3409             }
3410 46         258 })->();
3411              
3412             # Track this Future
3413 46         1922 push @{$weak_self->{receive_futures}}, $future;
  46         154  
3414 46         69 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  46         202  
  59         149  
  46         84  
3415              
3416 46         145 return $future;
3417 21         129 };
3418             }
3419              
3420             sub _create_websocket_send {
3421 21     21   45 my ($self, $request) = @_;
3422              
3423 21         32 weaken(my $weak_self = $self);
3424              
3425 42     42   942 return async sub {
3426 42         65 my ($event) = @_;
3427 42 50       80 return Future->done unless $weak_self;
3428 42 50       144 return Future->done if $weak_self->{closed};
3429              
3430             # Reset WebSocket idle timer on send activity
3431 42         156 $weak_self->_reset_ws_idle_timer;
3432              
3433 42   50     117 my $type = $event->{type} // '';
3434              
3435             # Dev-mode event validation (PAGI spec compliance)
3436 42 50       81 if ($weak_self->{validate_events}) {
3437 0         0 require PAGI::Server::EventValidator;
3438 0         0 PAGI::Server::EventValidator::validate_websocket_send($event);
3439             }
3440              
3441 42 100       151 if ($type eq 'websocket.accept') {
    100          
    50          
    0          
3442 20 50       48 return if $weak_self->{websocket_accepted};
3443              
3444             # Complete the WebSocket handshake
3445 20         33 my $ws_key = $weak_self->{ws_key};
3446 20         231 my $accept_key = sha1_base64($ws_key . WS_GUID);
3447             # sha1_base64 doesn't add padding, but WebSocket requires it
3448 20         79 $accept_key .= '=' while length($accept_key) % 4;
3449              
3450 20         117 my @headers = (
3451             "HTTP/1.1 101 Switching Protocols\r\n",
3452             "Upgrade: websocket\r\n",
3453             "Connection: Upgrade\r\n",
3454             "Sec-WebSocket-Accept: $accept_key\r\n",
3455             );
3456              
3457             # Add subprotocol if specified (with validation)
3458 20 100       56 if (my $subprotocol = $event->{subprotocol}) {
3459 1         10 $subprotocol = _validate_subprotocol($subprotocol);
3460 0         0 push @headers, "Sec-WebSocket-Protocol: $subprotocol\r\n";
3461             }
3462              
3463             # Add custom headers if specified (with CRLF injection validation)
3464 19 100       41 if (my $extra_headers = $event->{headers}) {
3465 1         3 for my $h (@$extra_headers) {
3466 1         2 my ($name, $value) = @$h;
3467 1         4 $name = _validate_header_name($name);
3468 1         5 $value = _validate_header_value($value);
3469 0         0 push @headers, "$name: $value\r\n";
3470             }
3471             }
3472              
3473 18         33 push @headers, "\r\n";
3474              
3475 18         149 $weak_self->{stream}->write(join('', @headers));
3476              
3477             # Switch to WebSocket mode
3478 18         3829 $weak_self->{websocket_mode} = 1;
3479 18         59 $weak_self->{websocket_accepted} = 1;
3480             $weak_self->{websocket_frame} = Protocol::WebSocket::Frame->new(
3481             max_payload_size => $weak_self->{max_ws_frame_size},
3482 18         183 );
3483 18         566 $weak_self->{response_status} = 101; # Track for access logging
3484              
3485             # Notify any waiting receive
3486 18 50 33     174 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
3487 0         0 my $f = $weak_self->{receive_pending};
3488 0         0 $weak_self->{receive_pending} = undef;
3489 0         0 $f->done;
3490             }
3491              
3492             # Process any data that arrived before accept
3493 18 50       76 if (length($weak_self->{buffer}) > 0) {
3494 0         0 $weak_self->_process_websocket_frames;
3495             }
3496             }
3497             elsif ($type eq 'websocket.send') {
3498 19 50       44 return unless $weak_self->{websocket_mode};
3499              
3500             # --- BACKPRESSURE CHECK ---
3501 19 50       54 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
3502 0         0 await $weak_self->_wait_for_drain;
3503 0 0       0 return Future->done unless $weak_self;
3504 0 0       0 return Future->done if $weak_self->{closed};
3505             }
3506             # --- END BACKPRESSURE CHECK ---
3507              
3508 19         22 my $frame;
3509 19 100       38 if (defined $event->{text}) {
    50          
3510             $frame = Protocol::WebSocket::Frame->new(
3511             buffer => $event->{text},
3512 18         55 type => 'text',
3513             );
3514             }
3515             elsif (defined $event->{bytes}) {
3516             $frame = Protocol::WebSocket::Frame->new(
3517             buffer => $event->{bytes},
3518 1         5 type => 'binary',
3519             );
3520             }
3521             else {
3522 0         0 return; # Nothing to send
3523             }
3524              
3525 19         828 my $bytes = $frame->to_bytes;
3526 19         650 $weak_self->{stream}->write($bytes);
3527             }
3528             elsif ($type eq 'websocket.close') {
3529             # If not accepted yet, send 403 Forbidden
3530 3 100       9 if (!$weak_self->{websocket_accepted}) {
3531 1         5 $weak_self->_send_error_response(403, 'Forbidden');
3532 1         6 return;
3533             }
3534              
3535             # Send close frame
3536 2   50     10 my $code = $event->{code} // 1000;
3537 2   50     7 my $reason = $event->{reason} // '';
3538              
3539 2         12 my $frame = Protocol::WebSocket::Frame->new(
3540             type => 'close',
3541             buffer => pack('n', $code) . $reason,
3542             );
3543              
3544 2         64 $weak_self->{stream}->write($frame->to_bytes);
3545 2         246 $weak_self->{close_sent} = 1;
3546              
3547             # If we received a close frame, close immediately
3548             # Otherwise wait for close from client (handled in frame processing)
3549 2 50       9 if ($weak_self->{close_received}) {
3550 0         0 $weak_self->_handle_disconnect_and_close('client_closed');
3551             }
3552             }
3553             elsif ($type eq 'websocket.keepalive') {
3554 0 0       0 return unless $weak_self->{websocket_mode};
3555              
3556 0   0     0 my $interval = $event->{interval} // 0;
3557 0         0 my $timeout = $event->{timeout};
3558              
3559 0 0       0 if ($interval > 0) {
3560 0         0 $weak_self->_start_ws_keepalive($interval, $timeout);
3561             }
3562             else {
3563 0         0 $weak_self->_stop_ws_keepalive;
3564             }
3565             }
3566             else {
3567             # Per PAGI spec: servers must raise exceptions for unrecognized event types
3568 0         0 _unrecognized_event_type($type, 'websocket');
3569             }
3570              
3571 39         2836 return;
3572 21         232 };
3573             }
3574              
3575             sub _process_websocket_frames {
3576 40     40   78 my ($self) = @_;
3577              
3578 40 50       93 return unless $self->{websocket_mode};
3579 40 50       134 return if $self->{closed};
3580              
3581             # Reset WebSocket idle timer on receive activity
3582 40         101 $self->_reset_ws_idle_timer;
3583              
3584 40         76 my $frame = $self->{websocket_frame};
3585              
3586             # Append buffer to frame parser
3587 40         205 $frame->append($self->{buffer});
3588 40         380 $self->{buffer} = '';
3589              
3590             # Process all complete frames - use next_bytes to get raw bytes
3591             # Protocol::WebSocket::Frame->next() decodes as UTF-8, which corrupts binary data
3592 40         133 while (defined(my $bytes = $frame->next_bytes)) {
3593 128         8491 my $opcode = $frame->opcode;
3594              
3595             # RFC 6455 Section 5.2: RSV1-3 MUST be 0 unless extension defines meaning
3596             # PAGI doesn't support compression extensions, so RSV must always be 0
3597 128         355 my $rsv = $frame->rsv;
3598 128 50 33     533 if ($rsv && ref($rsv) eq 'ARRAY') {
3599 128 50       181 if (grep { $_ } @$rsv) {
  384         578  
3600 0         0 $self->_send_close_frame(1002, 'RSV bits must be 0');
3601 0         0 $self->_handle_disconnect_and_close('protocol_error');
3602 0         0 return;
3603             }
3604             }
3605              
3606             # RFC 6455 Section 5.2: Opcodes 3-7 and 11-15 (0xB-0xF) are reserved
3607             # Must fail connection with 1002 Protocol Error
3608 128 50 33     380 if (($opcode >= 3 && $opcode <= 7) || ($opcode >= 11 && $opcode <= 15)) {
      33        
      33        
3609 0         0 $self->_send_close_frame(1002, 'Reserved opcode');
3610 0         0 $self->_handle_disconnect_and_close('protocol_error');
3611 0         0 return;
3612             }
3613              
3614             # RFC 6455 Section 5.5: Control frames (close/ping/pong) MUST have
3615             # payload length <= 125 bytes
3616 128 50 33     413 if (($opcode == 8 || $opcode == 9 || $opcode == 10) && length($bytes) > 125) {
      33        
3617 0         0 $self->_send_close_frame(1002, 'Control frame too large');
3618 0         0 $self->_handle_disconnect_and_close('protocol_error');
3619 0         0 return;
3620             }
3621              
3622 128 100       183 if ($opcode == 1) {
    50          
    0          
    0          
    0          
3623             # Text frame - decode as UTF-8
3624 127         118 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  127         454  
3625 127 100       3296 unless (defined $text) {
3626             # Invalid UTF-8 - close with 1007 per RFC 6455
3627 2         9 $self->_send_close_frame(1007, 'Invalid UTF-8');
3628 2         9 $self->_handle_disconnect_and_close('protocol_error');
3629 2         48 return;
3630             }
3631             # Check queue limit before adding (DoS protection)
3632 125 100       121 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  125         268  
3633 1         7 $self->_send_close_frame(1008, 'Message queue overflow'); # Policy Violation
3634 1         5 $self->_handle_disconnect_and_close('policy_violation');
3635 1         18 return;
3636             }
3637 124         140 push @{$self->{receive_queue}}, {
  124         489  
3638             type => 'websocket.receive',
3639             text => $text,
3640             };
3641             }
3642             elsif ($opcode == 2) {
3643             # Binary frame - keep as raw bytes
3644             # Check queue limit before adding (DoS protection)
3645 1 50       2 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  1         5  
3646 0         0 $self->_send_close_frame(1008, 'Message queue overflow'); # Policy Violation
3647 0         0 $self->_handle_disconnect_and_close('policy_violation');
3648 0         0 return;
3649             }
3650 1         2 push @{$self->{receive_queue}}, {
  1         6  
3651             type => 'websocket.receive',
3652             bytes => $bytes,
3653             };
3654             }
3655             elsif ($opcode == 8) {
3656             # Close frame
3657 0         0 $self->{close_received} = 1;
3658 0         0 my ($code, $reason) = (1005, '');
3659              
3660             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
3661             # 1 byte is invalid
3662 0 0       0 if (length($bytes) == 1) {
3663 0         0 $self->_send_close_frame(1002, 'Invalid close frame');
3664 0         0 $self->_handle_disconnect_and_close('protocol_error');
3665 0         0 return;
3666             }
3667              
3668 0 0       0 if (length($bytes) >= 2) {
3669 0         0 $code = unpack('n', substr($bytes, 0, 2));
3670 0   0     0 $reason = substr($bytes, 2) // '';
3671              
3672             # RFC 6455 Section 7.4.1: Validate close code
3673             # Valid codes: 1000-1003, 1007-1011, 3000-4999
3674             # Invalid: 0-999, 1004-1006, 1012-2999, 5000+
3675 0         0 my $valid_code = 0;
3676 0 0 0     0 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    0 0        
    0 0        
      0        
      0        
3677 0         0 $valid_code = 1;
3678             }
3679             elsif ($code >= 1007 && $code <= 1011) {
3680 0         0 $valid_code = 1;
3681             }
3682             elsif ($code >= 3000 && $code <= 4999) {
3683 0         0 $valid_code = 1;
3684             }
3685 0 0       0 unless ($valid_code) {
3686 0         0 $self->_send_close_frame(1002, 'Invalid close code');
3687 0         0 $self->_handle_disconnect_and_close('protocol_error');
3688 0         0 return;
3689             }
3690              
3691             # RFC 6455: Close reason must be valid UTF-8
3692 0 0       0 if (length($reason) > 0) {
3693 0         0 my $reason_copy = $reason;
3694 0         0 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  0         0  
3695 0 0       0 unless (defined $decoded) {
3696 0         0 $self->_send_close_frame(1007, 'Invalid UTF-8 in close reason');
3697 0         0 $self->_handle_disconnect_and_close('protocol_error');
3698 0         0 return;
3699             }
3700             }
3701             }
3702              
3703             # If we haven't sent close yet, send it now
3704 0 0       0 if (!$self->{close_sent}) {
3705 0         0 my $close_frame = Protocol::WebSocket::Frame->new(
3706             type => 'close',
3707             buffer => pack('n', $code) . $reason,
3708             );
3709 0         0 $self->{stream}->write($close_frame->to_bytes);
3710 0         0 $self->{close_sent} = 1;
3711             }
3712              
3713 0         0 push @{$self->{receive_queue}}, {
  0         0  
3714             type => 'websocket.disconnect',
3715             code => $code,
3716             reason => $reason,
3717             };
3718             }
3719             elsif ($opcode == 9) {
3720             # Ping - respond with pong (transparent to app)
3721 0         0 my $pong = Protocol::WebSocket::Frame->new(
3722             type => 'pong',
3723             buffer => $bytes,
3724             );
3725 0         0 $self->{stream}->write($pong->to_bytes);
3726             }
3727             elsif ($opcode == 10) {
3728             # Pong - cancel any pending timeout (response to our ping)
3729 0         0 $self->_cancel_ws_pong_timeout;
3730             }
3731             }
3732              
3733             # Notify any waiting receive
3734 36 50 66     460 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready && @{$self->{receive_queue}}) {
  13   66     106  
3735 13         20 my $f = $self->{receive_pending};
3736 13         23 $self->{receive_pending} = undef;
3737 13         35 $f->done;
3738             }
3739             }
3740              
3741             # Async file response - prioritizes speed based on file size:
3742             # 1. Small files (<=64KB): direct in-process read (fastest for small files)
3743             # 2. Large files: async chunked reads via worker pool (non-blocking)
3744 29     29   39 async sub _send_file_response {
3745 29         71 my ($self, $file, $offset, $length, $chunked) = @_;
3746              
3747             # Get file size if length not specified
3748 29         598 my $file_size = -s $file;
3749 29 50       81 die "Cannot stat file $file: $!" unless defined $file_size;
3750 29   66     134 $length //= $file_size - $offset;
3751              
3752 29         62 $self->{_response_size} += $length;
3753              
3754 29         40 my $stream = $self->{stream};
3755              
3756 29 100 66     126 if ($self->{sync_file_threshold} > 0 && $length <= $self->{sync_file_threshold}) {
3757             # Small file fast path: read directly in-process
3758             # For files <= 64KB, a simple read() is fast and avoids async overhead
3759 25 50       1074 open my $fh, '<:raw', $file or die "Cannot open file $file: $!";
3760 25 100       78 seek($fh, $offset, 0) if $offset;
3761 25         915 my $bytes_read = read($fh, my $data, $length);
3762 25         258 close $fh;
3763              
3764 25 50       71 die "Failed to read file $file: $!" unless defined $bytes_read;
3765              
3766 25 100       65 if ($chunked) {
3767 1         6 my $len = sprintf("%x", length($data));
3768 1         7 $stream->write("$len\r\n$data\r\n");
3769 1         92 $stream->write("0\r\n\r\n");
3770             }
3771             else {
3772 24         101 $stream->write($data);
3773             }
3774             }
3775             else {
3776             # Large file path: async chunked reads via worker pool
3777 4 50       23 my $loop = $self->{server} ? $self->{server}->loop : undef;
3778 4 50       18 die "No event loop available for async file I/O" unless $loop;
3779              
3780             await PAGI::Server::AsyncFile->read_file_chunked(
3781             $loop, $file,
3782             sub {
3783 8     8   23 my ($chunk) = @_;
3784 8 100       53 if ($chunked) {
3785 2         24 my $len = sprintf("%x", length($chunk));
3786 2         17 $stream->write("$len\r\n$chunk\r\n");
3787             }
3788             else {
3789 6         55 $stream->write($chunk);
3790             }
3791 8         1302 return; # Sync callback
3792             },
3793 4         62 offset => $offset,
3794             length => $length,
3795             chunk_size => FILE_CHUNK_SIZE,
3796             );
3797              
3798             # Send final chunk terminator if chunked
3799 4 100       407 if ($chunked) {
3800 1         27 $stream->write("0\r\n\r\n");
3801             }
3802             }
3803             }
3804              
3805             # Async filehandle response - uses worker pool for non-blocking reads
3806             # Note: Can't easily use sendfile for arbitrary filehandles (may not have fd,
3807             # may be pipes, may be in-memory). Falls back to chunked reads.
3808 6     6   6 async sub _send_fh_response {
3809 6         12 my ($self, $fh, $offset, $length, $chunked) = @_;
3810              
3811             # Seek to offset if specified
3812 6 100 66     16 if ($offset && $offset > 0) {
3813 1 50       10 seek($fh, $offset, 0) or die "Cannot seek: $!";
3814             }
3815              
3816             # For filehandles, we can't easily use the worker pool (can't pass fh across fork).
3817             # Use blocking reads in small chunks - not ideal but practical.
3818             # TODO: Consider IO::Async::FileStream for better event loop integration.
3819              
3820 6         10 my $remaining = $length; # undef means read to EOF
3821 6         7 my $stream = $self->{stream};
3822              
3823 6         6 while (1) {
3824 12         12 my $to_read = FILE_CHUNK_SIZE;
3825 12 100       58 if (defined $remaining) {
3826 4 50       9 $to_read = $remaining if $remaining < $to_read;
3827 4 100       7 last if $to_read <= 0;
3828             }
3829              
3830 10         156 my $bytes_read = read($fh, my $chunk, $to_read);
3831              
3832 10 50       37 last if !defined $bytes_read; # Error
3833 10 100       15 last if $bytes_read == 0; # EOF
3834              
3835 6         12 $self->{_response_size} += $bytes_read;
3836              
3837 6 100       15 if ($chunked) {
3838 3         11 my $len = sprintf("%x", length($chunk));
3839 3         29 $stream->write("$len\r\n$chunk\r\n");
3840             }
3841             else {
3842 3         10 $stream->write($chunk);
3843             }
3844              
3845 6 100       526 if (defined $remaining) {
3846 2         5 $remaining -= $bytes_read;
3847             }
3848             }
3849              
3850             # Send final chunk if chunked encoding
3851 6 100       25 if ($chunked) {
3852 3         6 $stream->write("0\r\n\r\n");
3853             }
3854             }
3855              
3856             1;
3857              
3858             __END__