File Coverage

blib/lib/PAGI/Server/Connection.pm
Criterion Covered Total %
statement 1680 2027 82.8
branch 723 1216 59.4
condition 345 595 57.9
subroutine 137 149 91.9
pod 0 2 0.0
total 2885 3989 72.3


line stmt bran cond sub pod time code
1             package PAGI::Server::Connection;
2 113     113   593678 use strict;
  113         230  
  113         3874  
3 113     113   403 use warnings;
  113         188  
  113         6628  
4              
5             our $VERSION = '0.002005';
6              
7 113     113   1665 use Future;
  113         25766  
  113         2016  
8 113     113   2174 use Future::AsyncAwait;
  113         15087  
  113         567  
9 113     113   5212 use Scalar::Util qw(weaken refaddr);
  113         212  
  113         5465  
10 113     113   46023 use Protocol::WebSocket::Handshake::Server;
  113         2744703  
  113         4939  
11 113     113   749 use Protocol::WebSocket::Frame;
  113         187  
  113         2386  
12 113     113   421 use Digest::SHA qw(sha1_base64);
  113         191  
  113         8986  
13 113     113   509 use Encode;
  113         168  
  113         7827  
14 113     113   28383 use URI::Escape qw(uri_unescape);
  113         121599  
  113         6880  
15 113     113   30344 use IO::Async::Timer::Countdown;
  113         97558  
  113         4279  
16 113     113   9218 use IO::Async::Timer::Periodic;
  113         25880  
  113         3502  
17 113     113   505 use Time::HiRes qw(gettimeofday tv_interval);
  113         154  
  113         1504  
18 113     113   56598 use PAGI::Server::AsyncFile;
  113         384  
  113         7439  
19 113     113   51893 use PAGI::Server::ConnectionState;
  113         417  
  113         6346  
20 113     113   47445 use PAGI::Server::TransportState;
  113         403  
  113         5793  
21              
22              
23 113     113   661 use constant FILE_CHUNK_SIZE => 65536; # 64KB chunks for file streaming
  113         180  
  113         118047  
24              
25             # Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
26             my $_cached_log_timestamp;
27             my $_cached_log_time = 0;
28              
29             # =============================================================================
30             # Unrecognized Event Type Handler (PAGI spec compliance)
31             # =============================================================================
32             # Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"
33              
34             sub _unrecognized_event_type {
35 0     0   0 my ($type, $protocol) = @_;
36 0         0 die "Unrecognized event type '$type' for $protocol protocol\n";
37             }
38              
39             # =============================================================================
40             # Header Validation (CRLF Injection Prevention)
41             # =============================================================================
42             # RFC 7230 Section 3.2.6: Field values MUST NOT contain CR or LF
43              
44             sub _validate_header_value {
45 43     43   89 my ($value) = @_;
46              
47 43 100       166 if ($value =~ /[\r\n\0]/) {
48 1         17 die "Invalid header value: contains CR, LF, or null byte\n";
49             }
50 42         188 return $value;
51             }
52              
53             sub _validate_header_name {
54 43     43   107 my ($name) = @_;
55              
56 43 50       240 if ($name =~ /[\r\n\0]/) {
57 0         0 die "Invalid header name: contains CR, LF, or null byte\n";
58             }
59 43 50       157 if ($name =~ /[[:cntrl:]]/) {
60 0         0 die "Invalid header name: contains control characters\n";
61             }
62 43         194 return $name;
63             }
64              
65             # RFC 6455 Section 11.3.4: Subprotocol must be a token (no whitespace, separators)
66             sub _validate_subprotocol {
67 1     1   3 my ($value) = @_;
68              
69 1 50       8 if ($value =~ /[\r\n\0\s]/) {
70 1         23 die "Invalid subprotocol: contains CR, LF, null, or whitespace\n";
71             }
72             # Token characters only (roughly)
73 0 0       0 if ($value !~ /^[\w\-\.]+$/) {
74 0         0 die "Invalid subprotocol: must be alphanumeric, dash, underscore, or dot\n";
75             }
76 0         0 return $value;
77             }
78              
79             =head1 NAME
80              
81             PAGI::Server::Connection - Per-connection state machine
82              
83             =head1 SYNOPSIS
84              
85             # Internal use by PAGI::Server
86             my $conn = PAGI::Server::Connection->new(
87             stream => $stream,
88             app => $app,
89             protocol => $protocol,
90             server => $server,
91             extensions => {},
92             );
93             $conn->start;
94              
95             =head1 DESCRIPTION
96              
97             PAGI::Server::Connection manages the state machine for a single client
98             connection. It handles:
99              
100             =over 4
101              
102             =item * Request parsing via Protocol::HTTP1
103              
104             =item * Scope creation for the application
105              
106             =item * Event queue management for $receive and $send
107              
108             =item * Protocol upgrades (WebSocket, SSE)
109              
110             =item * SSE over HTTP/1.1 and HTTP/2
111              
112             =item * Connection lifecycle and cleanup
113              
114             =back
115              
116             =cut
117              
118             sub new {
119 286     286 0 34347 my ($class, %args) = @_;
120              
121             my $self = bless {
122             stream => $args{stream},
123             app => $args{app},
124             protocol => $args{protocol},
125             server => $args{server},
126             extensions => $args{extensions} // {},
127             state => $args{state} // {},
128             tls_enabled => $args{tls_enabled} // 0,
129             timeout => $args{timeout} // 60, # Idle timeout in seconds
130             request_timeout => $args{request_timeout} // 0, # Request stall timeout in seconds (0 = disabled, default for performance)
131             ws_idle_timeout => $args{ws_idle_timeout} // 0, # WebSocket idle timeout (0 = disabled)
132             sse_idle_timeout => $args{sse_idle_timeout} // 0, # SSE idle timeout (0 = disabled)
133             max_body_size => $args{max_body_size}, # 0 = unlimited
134             access_log => $args{access_log}, # Filehandle for access logging
135             _access_log_formatter => $args{_access_log_formatter}, # Pre-compiled format closure
136             max_receive_queue => $args{max_receive_queue} // 1000, # Max WebSocket receive queue size
137             max_ws_frame_size => $args{max_ws_frame_size} // 65536, # Max WebSocket frame size in bytes
138             sync_file_threshold => $args{sync_file_threshold} // 65536, # Threshold for sync file reads (default 64KB)
139             validate_events => $args{validate_events} // 0, # Dev-mode event validation (0 = disabled)
140             # Send-side backpressure (watermarks in bytes)
141             # Defaults match Python asyncio: 64KB high, 16KB low (high/4)
142             write_high_watermark => $args{write_high_watermark} // 65536, # 64KB - pause sending above this
143             write_low_watermark => $args{write_low_watermark} // 16384, # 16KB - resume sending below this
144             _drain_waiters => [], # Pending Futures waiting for buffer drain
145             _drain_check_active => 0, # Flag to prevent redundant on_outgoing_empty setup
146             tls_info => undef, # Populated on first request if TLS
147             buffer => '',
148             closed => 0,
149             response_started => 0,
150             response_status => undef, # Track response status for logging
151             _response_size => 0, # Track response body bytes for logging
152             request_start => undef, # Track request start time for logging
153             idle_timer => undef, # IO::Async::Timer for idle timeout
154             stall_timer => undef, # IO::Async::Timer for request stall timeout
155             ws_idle_timer => undef, # IO::Async::Timer for WebSocket idle timeout
156             sse_idle_timer => undef, # IO::Async::Timer for SSE idle timeout
157             # Event queue for $receive
158             receive_queue => [],
159             receive_pending => undef,
160             # Track all pending receive Futures to cancel on close
161             receive_futures => [],
162             # Track request handling Future to prevent "lost future" warning
163             request_future => undef,
164             # Idempotency guard for disconnect handling
165             _disconnect_handled => 0,
166             # WebSocket state
167             websocket_mode => 0,
168             websocket_frame => undef, # Protocol::WebSocket::Frame for parsing
169             websocket_accepted => 0,
170             # SSE state
171             sse_mode => 0,
172             sse_started => 0,
173             sse_disconnect_reason => undef, # Reason for SSE disconnect (client_closed, write_error, etc.)
174             ws_disconnect_reason => undef, # Standard reason token for the app-facing websocket.disconnect event
175             ws_disconnect_code => undef, # Wire close code for that event (defaults to 1006, abnormal closure)
176             # Keepalive state (protocol-level ping/pong for WebSocket, comments for SSE)
177             ws_keepalive_timer => undef, # Periodic timer for sending WebSocket pings
178             ws_pong_timeout => undef, # Timeout timer for pong response
179             ws_waiting_pong => 0, # Flag: are we waiting for a pong?
180             ws_keepalive_interval => 0, # Current keepalive interval (0 = disabled)
181             ws_keepalive_timeout => 0, # Current pong timeout (0 = no timeout check)
182             sse_keepalive_timer => undef, # Periodic timer for sending SSE keepalive comments
183             sse_keepalive_comment => '', # Comment text to send
184             # HTTP/2 state
185             alpn_protocol => $args{alpn_protocol}, # ALPN-negotiated protocol (e.g. 'h2', 'http/1.1')
186             h2_protocol => $args{h2_protocol}, # PAGI::Server::Protocol::HTTP2 instance
187             h2c_enabled => $args{h2c_enabled} // 0, # Allow h2c preface detection on cleartext
188             is_h2 => 0, # Set during start() if HTTP/2 detected
189             h2_session => undef, # PAGI::Server::Protocol::HTTP2::Session
190             h2_streams => {}, # Per-stream state for HTTP/2
191             # Transport info (tcp or unix)
192             transport_type => $args{transport_type} // 'tcp',
193             transport_path => $args{transport_path}, # socket path for unix
194             # Cached connection info (populated in start(), used by _create_scope)
195 286   100     26197 client_host => '127.0.0.1',
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
196             client_port => 0,
197             server_host => '127.0.0.1',
198             server_port => 5000,
199             }, $class;
200              
201             # Extract TLS info if this is a TLS connection
202 286 100       1575 if ($self->{tls_enabled}) {
203 45         282 $self->_extract_tls_info;
204             }
205              
206 286         1530 return $self;
207             }
208              
209 113     113   883 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  113         217  
  113         3384007  
210              
211             sub start {
212 284     284 0 21787 my ($self) = @_;
213              
214 284         670 my $stream = $self->{stream};
215 284         572 weaken(my $weak_self = $self);
216              
217             # Enable TCP_NODELAY to reduce latency for small responses (TCP only)
218 284   33     2204 my $handle = $stream->write_handle // $stream->read_handle;
219 284 100 66     4386 if ($self->{transport_type} eq 'tcp' && $handle && $handle->can('setsockopt')) {
      100        
220 207         392 eval {
221 207         1071 $handle->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
222             };
223             # Ignore errors - not all sockets support this
224             }
225              
226             # Cache connection info once (avoids per-request socket method calls)
227 284 100 66     5640 if ($self->{transport_type} eq 'unix') {
    100          
228             # Unix socket: no peer IP/port, server is identified by path
229 2         42 $self->{client_host} = undef;
230 2         26 $self->{client_port} = undef;
231 2         36 $self->{server_host} = $self->{transport_path};
232 2         33 $self->{server_port} = undef;
233             } elsif ($handle && $handle->can('peerhost')) {
234 206         386 eval {
235 206   100     862 $self->{client_host} = $handle->peerhost // '127.0.0.1';
236 206   100     11852 $self->{client_port} = $handle->peerport // 0;
237 206   50     6414 $self->{server_host} = $handle->sockhost // '127.0.0.1';
238 206   50     6768 $self->{server_port} = $handle->sockport // 5000;
239             };
240             # Ignore errors - keep defaults if extraction fails
241             }
242              
243             # Detect HTTP/2 via ALPN negotiation
244 284 50 100     6155 if ($self->{alpn_protocol} && $self->{alpn_protocol} eq 'h2' && $self->{h2_protocol}) {
      66        
245 29         126 $self->_init_h2_session;
246             }
247              
248             # Set up idle timeout timer
249 284 50 33     2577 if ($self->{timeout} && $self->{timeout} > 0 && $self->{server}) {
      33        
250             my $timer = IO::Async::Timer::Countdown->new(
251             delay => $self->{timeout},
252             on_expire => sub {
253 0 0   0   0 return unless $weak_self;
254 0 0       0 return if $weak_self->{closed};
255             # Close idle connection
256 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
257             },
258 284         5244 );
259 284         22344 $self->{idle_timer} = $timer;
260 284         1369 $self->{server}->add_child($timer);
261 284         23475 $timer->start;
262             }
263              
264             # Set up read handler
265             $stream->configure(
266             on_read => sub {
267 539     539   38427244 my ($s, $buffref, $eof) = @_;
268 539 100       1888 return 0 unless $weak_self;
269              
270             # Reset idle timer on any read activity
271 537         2796 $weak_self->_reset_idle_timer;
272              
273             # Reset stall timer on read activity (if handling a request)
274 537 100       3500 $weak_self->_reset_stall_timer if $weak_self->{handling_request};
275              
276 537         1994 $weak_self->{buffer} .= $$buffref;
277 537         1185 $$buffref = '';
278              
279 537 100       1481 if ($eof) {
280             # EOF means client closed - handle disconnect and cleanup
281 12         97 $weak_self->_handle_disconnect_and_close('client_closed');
282 12         858 return 0;
283             }
284              
285             # h2c detection: check if cleartext connection starts with HTTP/2 preface
286 525 100 66     2079 if ($weak_self->{h2c_enabled} && !$weak_self->{is_h2}) {
287 44 50       141 if (length($weak_self->{buffer}) >= 24) { # HTTP/2 preface is 24 bytes
288 44 100 66     399 if ($weak_self->{h2_protocol} && PAGI::Server::Protocol::HTTP2->detect_preface($weak_self->{buffer})) {
289 43         168 $weak_self->_init_h2_session;
290 43         112 $weak_self->{h2c_enabled} = 0; # Detection done
291             } else {
292 1         2 $weak_self->{h2c_enabled} = 0; # Not h2c, stop checking
293             }
294             } else {
295             # Not enough data yet to determine protocol, wait for more
296 0         0 return 0;
297             }
298             }
299              
300             # Wrap processing in eval to prevent exceptions from crashing the event loop
301             # This is critical - Protocol::WebSocket::Frame can throw exceptions for
302             # oversized payloads, and other parsing code may throw as well
303 525         993 eval {
304             # HTTP/2: feed data to session for frame processing
305 525 100       1493 if ($weak_self->{is_h2}) {
306 235         949 $weak_self->_h2_process_data;
307 235         556 return;
308             }
309              
310             # If in WebSocket mode, process WebSocket frames
311 290 100       723 if ($weak_self->{websocket_mode}) {
312 57         197 $weak_self->_process_websocket_frames;
313 56         476 return;
314             }
315              
316             # If we're waiting for body data, notify the receive handler
317 233 100 66     784 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
318 12         48 my $f = $weak_self->{receive_pending};
319 12         15 $weak_self->{receive_pending} = undef;
320 12         21 $f->done;
321             }
322              
323 233         2034 $weak_self->_try_handle_request;
324             };
325 525 100       16806 if (my $error = $@) {
326             # Log the error and close the connection gracefully
327 1         39 warn "PAGI connection error: $error";
328 1 50       4 return 0 unless $weak_self;
329 1         4 $weak_self->_handle_disconnect_and_close('server_error');
330             }
331 525         2161 return 0;
332             },
333             on_closed => sub {
334 281 100   281   83300924 return unless $weak_self;
335             # Stream closed - handle disconnect and remove from connections hash
336 220         1630 $weak_self->_handle_disconnect_and_close('client_closed');
337             },
338 284         73959 );
339             }
340              
341             sub _reset_idle_timer {
342 537     537   1206 my ($self) = @_;
343              
344 537 100       2294 return unless $self->{idle_timer};
345              
346             # Debounce: rescheduling the IO::Async countdown on every read is costly
347             # under keep-alive load -- it re-enqueues a loop timer each time. Reset at
348             # most ~20x/second; this coarsens the idle timeout by at most ~50ms, which is
349             # immaterial for a multi-second idle timeout but cuts the per-request timer
350             # churn dramatically under load.
351 477         1454 my $now = Time::HiRes::time();
352 477 100 100     2568 return if defined $self->{_idle_reset_at} && ($now - $self->{_idle_reset_at}) < 0.05;
353 334         990 $self->{_idle_reset_at} = $now;
354              
355 334         2545 $self->{idle_timer}->reset;
356 334 50       41759 $self->{idle_timer}->start unless $self->{idle_timer}->is_running;
357             }
358              
359             sub _stop_idle_timer {
360 327     327   725 my ($self) = @_;
361              
362 327 100       1090 return unless $self->{idle_timer};
363 284 50       1591 $self->{idle_timer}->stop if $self->{idle_timer}->is_running;
364             # Remove timer completely so _reset_idle_timer won't restart it
365             # This is important for long-lived connections (WebSocket, SSE)
366 284 50       13803 if ($self->{server}) {
367 284         1547 $self->{server}->remove_child($self->{idle_timer});
368             }
369 284         29384 $self->{idle_timer} = undef;
370             }
371              
372             # =============================================================================
373             # HTTP/2 Session Initialization
374             # =============================================================================
375              
376             sub _init_h2_session {
377 72     72   173 my ($self) = @_;
378              
379 72         204 $self->{is_h2} = 1;
380              
381 72         183 weaken(my $weak_self = $self);
382              
383             $self->{h2_session} = $self->{h2_protocol}->create_session(
384             on_request => sub {
385 71     71   280 my ($stream_id, $pseudo, $headers, $has_body) = @_;
386 71 50       208 return unless $weak_self;
387 71         339 $weak_self->_h2_on_request($stream_id, $pseudo, $headers, $has_body);
388             },
389             on_body => sub {
390 22     22   62 my ($stream_id, $data, $eof) = @_;
391 22 50       61 return unless $weak_self;
392 22         102 $weak_self->_h2_on_body($stream_id, $data, $eof);
393             },
394             on_close => sub {
395 43     43   88 my ($stream_id, $error_code) = @_;
396 43 50       113 return unless $weak_self;
397 43         209 $weak_self->_h2_on_close($stream_id, $error_code);
398             },
399 72         945 );
400              
401             # Send initial SETTINGS to client
402 72         394 $self->_h2_write_pending;
403             }
404              
405             sub _h2_process_data {
406 236     236   465 my ($self) = @_;
407 236 50       814 return unless $self->{h2_session};
408              
409 236 100       641 if (length($self->{buffer}) > 0) {
410 235         1192 $self->{h2_session}->feed($self->{buffer});
411 235         595 $self->{buffer} = '';
412             }
413              
414 236         761 $self->_h2_write_pending;
415              
416             # Close connection when session is done (GOAWAY received or sent)
417 236 100 66     1099 if ($self->{h2_session} && !$self->{h2_session}->want_read) {
418 2         7 $self->_h2_write_pending; # Flush any remaining output
419 2         12 $self->_handle_disconnect_and_close;
420             }
421             }
422              
423             sub _h2_write_pending {
424 498     498   1037 my ($self) = @_;
425 498 50       1269 return unless $self->{h2_session};
426 498         731 while (1) {
427 800         60281 my $data = $self->{h2_session}->extract;
428 800 100 66     4484 last unless defined $data && length($data) > 0;
429 302         1632 $self->{stream}->write($data);
430             }
431             }
432              
433             # =============================================================================
434             # HTTP/2 Stream Callbacks
435             # =============================================================================
436              
437             sub _h2_on_request {
438 72     72   804074 my ($self, $stream_id, $pseudo, $headers, $has_body) = @_;
439              
440             # Detect CONNECT method
441 72         146 my $is_websocket = 0;
442 72 100 50     389 if (($pseudo->{':method'} // '') eq 'CONNECT') {
443 17 100 100     98 if (($pseudo->{':protocol'} // '') eq 'websocket') {
444             # Extended CONNECT for WebSocket (RFC 8441)
445 16         34 $is_websocket = 1;
446             } else {
447             # Plain CONNECT not supported — defer response to avoid
448             # re-entrant nghttp2 calls (we're inside feed/mem_recv)
449 1         4 weaken(my $ws = $self);
450             $self->{server}->loop->later(sub {
451 1 50   1   107 return unless $ws;
452 1 50       5 return if $ws->{closed};
453 1         11 $ws->{h2_session}->submit_response($stream_id,
454             status => 501,
455             headers => [['content-type', 'text/plain']],
456             body => "CONNECT method not supported\n",
457             );
458 1         41 $ws->_h2_write_pending;
459 1         30 });
460 1         45 return;
461             }
462             }
463              
464             # Detect SSE (Accept: text/event-stream)
465 71         135 my $is_sse = 0;
466 71 100       237 if (!$is_websocket) {
467 55         136 for my $h (@$headers) {
468 63 100 100     352 if ($h->[0] eq 'accept' && $h->[1] =~ m{text/event-stream}) {
469 17         29 $is_sse = 1;
470 17         40 last;
471             }
472             }
473             }
474              
475             # Initialize per-stream state
476 71         1368 $self->{h2_streams}{$stream_id} = {
477             pseudo => $pseudo,
478             headers => $headers,
479             has_body => $has_body,
480             body => '',
481             body_complete => !$has_body,
482             body_pending => undef, # Future for body availability
483             receive_queue => [],
484             response_started => 0,
485             is_websocket => $is_websocket,
486             is_sse => $is_sse,
487             ws_accepted => 0,
488             ws_frame => undef, # Protocol::WebSocket::Frame for parsing
489             ws_connect_sent => 0,
490             };
491              
492             # Check Content-Length against max_body_size limit before dispatching
493             # (after stream init so _h2_on_body/_h2_on_close can find the stream)
494 71 100 100     425 if ($self->{max_body_size} && $has_body) {
495 6         18 for my $h (@$headers) {
496 12 100       32 if ($h->[0] eq 'content-length') {
497 1 50       6 if ($h->[1] > $self->{max_body_size}) {
498             # Defer response to avoid re-entrant nghttp2 calls
499 1         24 weaken(my $ws = $self);
500             $self->{server}->loop->later(sub {
501 1 50   1   50 return unless $ws;
502 1 50       5 return if $ws->{closed};
503 1         23 $ws->{h2_session}->submit_response($stream_id,
504             status => 413,
505             headers => [['content-type', 'text/plain']],
506             body => "Payload Too Large\n",
507             );
508 1         36 $ws->_h2_write_pending;
509 1         9 });
510 1         44 return;
511             }
512 0         0 last;
513             }
514             }
515             }
516              
517             # Defer dispatch to next event loop tick to prevent re-entrant nghttp2 calls
518 70         202 weaken(my $weak_self = $self);
519             $self->{server}->loop->later(sub {
520 70 50   70   4057 return unless $weak_self;
521 70 50       248 return if $weak_self->{closed};
522 70         424 $weak_self->_h2_dispatch_stream($stream_id);
523 70         411 });
524             }
525              
526             sub _h2_on_body {
527 22     22   69 my ($self, $stream_id, $data, $eof) = @_;
528              
529 22         160 my $stream = $self->{h2_streams}{$stream_id};
530 22 100       60 return unless $stream;
531              
532 21 50 66     100 if ($stream->{is_websocket} && $stream->{ws_accepted}) {
533             # WebSocket: DATA frames contain raw WebSocket frames
534 16 50       109 $self->_h2_process_ws_frames($stream_id, $stream, $data) if length($data);
535              
536 16 50       1010 if ($eof) {
537             # END_STREAM = client closing the WebSocket stream
538 0         0 push @{$stream->{receive_queue}}, {
  0         0  
539             type => 'websocket.disconnect',
540             code => 1005,
541             reason => '',
542             };
543 0         0 $self->_h2_wake_pending($stream);
544             }
545 16         61 return;
546             }
547              
548 5 100       16 if (length($data) > 0) {
549 3         12 $stream->{body} .= $data;
550              
551             # Enforce max_body_size (0 = unlimited)
552 3 100 66     21 if ($self->{max_body_size} && length($stream->{body}) > $self->{max_body_size}) {
553 1         15 $self->{h2_session}->submit_response($stream_id,
554             status => 413,
555             headers => [['content-type', 'text/plain']],
556             body => 'Payload Too Large',
557             );
558             # No _h2_write_pending here — we're inside feed(); _h2_process_data
559             # flushes after feed() returns
560 1         59 $self->_h2_resolve_stream_drain_waiters($stream);
561             # Drop (don't fire) the app's on_drain fires: the stream is closing,
562             # not draining, and the transport handle is going away. Also break
563             # the $stream <-> transport_state cycle (the handle's measure/arm
564             # closures hold $stream strongly), or the stream state leaks for the
565             # life of the process once h2_streams drops its external ref.
566 1         4 $stream->{transport_drain_fires} = [];
567 1         4 delete $stream->{transport_state};
568 1         4 delete $self->{h2_streams}{$stream_id};
569 1         9 return;
570             }
571             }
572              
573 4 100       9 if ($eof) {
574 2         5 $stream->{body_complete} = 1;
575             }
576              
577 4         10 $self->_h2_wake_pending($stream);
578             }
579              
580             sub _h2_wake_pending {
581 60     60   173 my ($self, $stream) = @_;
582 60 100 66     293 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
583 15         126 my $f = $stream->{body_pending};
584 15         31 $stream->{body_pending} = undef;
585 15         59 $f->done;
586             }
587             }
588              
589             sub _h2_on_close {
590 43     43   125 my ($self, $stream_id, $error_code) = @_;
591              
592 43         116 my $stream = $self->{h2_streams}{$stream_id};
593 43 100       105 return unless $stream;
594              
595             # Mark body complete to unblock any pending receive
596 41         101 $stream->{body_complete} = 1;
597              
598             # Enqueue disconnect event
599 41 50       150 if ($stream->{is_websocket}) {
    100          
600 0         0 push @{$stream->{receive_queue}}, {
  0         0  
601             type => 'websocket.disconnect',
602             code => 1006,
603             reason => '',
604             };
605             } elsif ($stream->{is_sse}) {
606 5         10 push @{$stream->{receive_queue}}, {
  5         90  
607             type => 'sse.disconnect',
608             reason => 'client_closed',
609             };
610             } else {
611 36         56 push @{$stream->{receive_queue}}, { type => 'http.disconnect' };
  36         148  
612             }
613              
614 41         182 $self->_h2_wake_pending($stream);
615              
616             # Release any producer blocked on this stream's backpressure drain — the
617             # stream is closing, so it must not hang waiting for a queue that will
618             # never drain.
619 41         187 $self->_h2_resolve_stream_drain_waiters($stream);
620             # Drop (don't fire) the app's on_drain fires: this is a close, not a drain.
621             # Also break the $stream <-> transport_state cycle so the stream state can be
622             # collected once the deferred delete below drops h2_streams' external ref.
623 41         111 $stream->{transport_drain_fires} = [];
624 41         139 delete $stream->{transport_state};
625              
626             # Clean up after a delay (let any pending futures resolve)
627 41         175 weaken(my $weak_self = $self);
628             $self->{server}->loop->later(sub {
629 41 50   41   25992 return unless $weak_self;
630 41         499 delete $weak_self->{h2_streams}{$stream_id};
631 41         219 });
632             }
633              
634             # =============================================================================
635             # HTTP/2 Stream Dispatch (scope/receive/send creation)
636             # =============================================================================
637              
638             sub _h2_dispatch_stream {
639 70     70   242 my ($self, $stream_id) = @_;
640              
641 70         187 my $stream_state = $self->{h2_streams}{$stream_id};
642 70 100       192 return unless $stream_state;
643              
644 69         161 my ($scope, $receive, $send);
645              
646 69 100       340 if ($stream_state->{is_websocket}) {
    100          
647 16         88 $scope = $self->_h2_create_websocket_scope($stream_id, $stream_state);
648 16         76 $receive = $self->_h2_create_websocket_receive($stream_id, $stream_state);
649 16         70 $send = $self->_h2_create_websocket_send($stream_id, $stream_state);
650             } elsif ($stream_state->{is_sse}) {
651 17         131 $scope = $self->_h2_create_sse_scope($stream_id, $stream_state);
652 17         83 $receive = $self->_h2_create_sse_receive($stream_id, $stream_state);
653 17         85 $send = $self->_h2_create_sse_send($stream_id, $stream_state);
654             } else {
655 36         255 $scope = $self->_h2_create_scope($stream_id, $stream_state);
656 36         150 $receive = $self->_h2_create_receive($stream_id, $stream_state);
657 36         145 $send = $self->_h2_create_send($stream_id, $stream_state);
658             }
659              
660 69         171 weaken(my $weak_self = $self);
661              
662 69     69   139 my $future = (async sub {
663 69         124 eval {
664 69         403 await $weak_self->{app}->($scope, $receive, $send);
665             };
666 69         6665 my $error = $@;
667              
668             # If the application failed, OR returned without starting a response,
669             # synthesize a 500 (only possible while no response has begun). A clean
670             # return that produced no response is a protocol error, same as a throw.
671 69 100 66     523 if ($weak_self && !$stream_state->{response_started}) {
    50          
672 5 100       372 warn $error
673             ? "PAGI application error (HTTP/2 stream $stream_id): $error\n"
674             : "PAGI application returned without starting a response (HTTP/2 stream $stream_id)\n";
675 5         28 eval {
676 5         46 $weak_self->{h2_session}->submit_response($stream_id,
677             status => 500,
678             headers => [['content-type', 'text/plain']],
679             body => "Internal Server Error\n",
680             );
681 5         196 $weak_self->_h2_write_pending;
682             };
683             # The synthesized 500 is this stream's response — mark it started.
684             $stream_state->{connection_state}->_mark_response_started
685 5 100       32 if $stream_state->{connection_state};
686             }
687             elsif ($error) {
688             # Response already started; cannot send a 500. Log only.
689 0         0 warn "PAGI application error after response started (HTTP/2 stream $stream_id): $error\n";
690             }
691              
692             # Notify server that request completed (for max_requests tracking)
693 69 50 33     796 $weak_self->{server}->_on_request_complete if $weak_self && $weak_self->{server};
694 69         432 })->();
695              
696 69         5105 $self->{server}->adopt_future($future);
697             }
698              
699             sub _h2_create_scope {
700 39     39   198092 my ($self, $stream_id, $stream_state) = @_;
701              
702 39         86 my $pseudo = $stream_state->{pseudo};
703 39         103 my $headers = $stream_state->{headers};
704              
705             # Parse path and query string from :path pseudo-header
706 39   50     130 my $full_path = $pseudo->{':path'} // '/';
707 39         185 my ($path, $query_string) = split(/\?/, $full_path, 2);
708 39   100     217 $query_string //= '';
709              
710             # Decode percent-encoded path for scope (keep raw_path as-is)
711             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
712 39         296 my $unescaped = uri_unescape($path);
713 39   33     415 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  39         564  
714             // $unescaped;
715              
716 39         3114 my $connection_state = PAGI::Server::ConnectionState->new(
717             connection => $self,
718             );
719             # Store on the stream-state so the send path can mark response_started on
720             # this stream's own connection object (h2 multiplexes many streams).
721 39         172 $stream_state->{connection_state} = $connection_state;
722              
723             return {
724             type => 'http',
725             pagi => {
726             version => '0.3',
727             spec_version => '0.3',
728             },
729             http_version => '2',
730             method => $pseudo->{':method'} // 'GET',
731             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
732             path => $decoded_path,
733             raw_path => $path,
734             query_string => $query_string,
735             root_path => '',
736             headers => $headers,
737             (defined $self->{client_host}
738             ? (client => [$self->{client_host}, $self->{client_port}])
739             : ()
740             ),
741             server => [$self->{server_host}, $self->{server_port}],
742 39         353 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
743             extensions => $self->_get_extensions_for_scope,
744             'pagi.connection' => $connection_state,
745             # h2 transport handle measures THIS stream's send queue (per-stream),
746             # stored on the stream state rather than $self->{current_transport_state}
747             # because h2 multiplexes many concurrent streams over one connection.
748 39 50 50     622 'pagi.transport' => ($stream_state->{transport_state} = $self->_h2_transport_state($stream_state)),
    50 33        
749             };
750             }
751              
752             sub _h2_create_receive {
753 36     36   102 my ($self, $stream_id, $stream_state) = @_;
754              
755 36         99 weaken(my $weak_self = $self);
756              
757             return sub {
758 35 50   35   315 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
759 35 50       97 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
760              
761 35         82 my $ss = $weak_self->{h2_streams}{$stream_id};
762 35 50       98 return Future->done({ type => 'http.disconnect' }) unless $ss;
763              
764 35         96 my $future = (async sub {
765 35 50       86 return { type => 'http.disconnect' } unless $weak_self;
766              
767 35         69 my $ss = $weak_self->{h2_streams}{$stream_id};
768 35 50       73 return { type => 'http.disconnect' } unless $ss;
769              
770             # Check queue first
771 35 50       75 if (@{$ss->{receive_queue}}) {
  35         105  
772 0         0 return shift @{$ss->{receive_queue}};
  0         0  
773             }
774              
775             # If body is already complete, return final body event
776 35 50       98 if ($ss->{body_complete}) {
777 35         80 my $body = $ss->{body};
778 35         72 $ss->{body} = '';
779             return {
780 35         727 type => 'http.request',
781             body => $body,
782             more => 0,
783             };
784             }
785              
786             # Wait for body data
787 0 0       0 if (!$ss->{body_pending}) {
788 0         0 $ss->{body_pending} = Future->new;
789             }
790 0         0 await $ss->{body_pending};
791              
792             # Re-fetch stream state (may have changed)
793 0         0 $ss = $weak_self->{h2_streams}{$stream_id};
794 0 0       0 return { type => 'http.disconnect' } unless $ss;
795              
796             # Check queue after waking
797 0 0       0 if (@{$ss->{receive_queue}}) {
  0         0  
798 0         0 return shift @{$ss->{receive_queue}};
  0         0  
799             }
800              
801 0         0 my $body = $ss->{body};
802 0         0 $ss->{body} = '';
803             return {
804             type => 'http.request',
805             body => $body,
806 0 0       0 more => $ss->{body_complete} ? 0 : 1,
807             };
808 35         302 })->();
809              
810 35         1765 return $future;
811 36         247 };
812             }
813              
814             sub _h2_create_send {
815 36     36   121 my ($self, $stream_id, $stream_state) = @_;
816              
817 36         75 weaken(my $weak_self = $self);
818              
819 36         92 my $status;
820             my @response_headers;
821              
822             # Streaming state for deferred data provider pattern.
823             # The send queue lives on per-stream state ($ss->{send_queue} /
824             # $ss->{send_queue_bytes}) so the h2 transport handle can measure it;
825             # $eof_pending / $streaming_started stay closure-local.
826 36         60 my $eof_pending = 0;
827 36         53 my $streaming_started = 0;
828              
829             # Data callback for nghttp2's streaming response.
830             # Returns ($data, $eof) when data is available, or undef to defer.
831             my $data_callback = sub {
832 132     132   260 my ($cb_stream_id, $max_len) = @_;
833              
834 132   33     488 my $ss = $weak_self && $weak_self->{h2_streams}{$stream_id};
835 132 50       251 return undef unless $ss;
836 132   50     278 my $q = $ss->{send_queue} ||= [];
837              
838 132 100       265 if (@$q) {
839 98         192 my $chunk = shift @$q;
840             # Respect max_len — XS truncates without preserving remainder
841 98 100       227 if (length($chunk) > $max_len) {
842 15         240 unshift @$q, substr($chunk, $max_len);
843 15         141 $chunk = substr($chunk, 0, $max_len);
844             }
845 98         174 $ss->{send_queue_bytes} -= length($chunk);
846              
847             # Per-stream backpressure: once this stream's queue falls below the
848             # low watermark, release any producer blocked in
849             # _h2_wait_for_stream_drain. This callback runs inside nghttp2's
850             # extract(), so resolve on the next loop tick — completing the Future
851             # resumes the awaiting producer synchronously, and it must not call
852             # resume_stream/_h2_write_pending re-entrantly into nghttp2.
853 98 100 50     411 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      100        
854 12         41 && $ss->{stream_drain_waiters} && @{$ss->{stream_drain_waiters}}) {
855 5         12 my @waiters = splice @{$ss->{stream_drain_waiters}};
  5         21  
856             $weak_self->{server}->loop->later(sub {
857 5         2871 $_->done for grep { !$_->is_ready } @waiters;
  5         22  
858 5         30 });
859             }
860              
861             # Fire the app's on_drain hysteresis callbacks once this stream's
862             # queue falls below the low watermark. Like the waiters above, this
863             # runs inside nghttp2's extract(), and an on_drain callback may call
864             # $send to resume its source — which would re-enter nghttp2. Splice
865             # the fires out first (so they can't double-fire), then invoke them on
866             # the next loop tick.
867 98 100 50     580 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      100        
868 15         47 && $ss->{transport_drain_fires} && @{$ss->{transport_drain_fires}}) {
869 4         8 my @fires = splice @{$ss->{transport_drain_fires}};
  4         11  
870             $weak_self->{server}->loop->later(sub {
871 4         1083 $_->() for @fires;
872 4         19 });
873             }
874              
875 98 100 100     394 my $eof = (!@$q && $eof_pending) ? 1 : 0;
876 98         886 return ($chunk, $eof);
877             }
878              
879             # Queue empty but EOF pending — signal end of stream
880 34 100       62 if ($eof_pending) {
881 2         14 return ('', 1);
882             }
883              
884             # Queue empty, more data expected — defer (NGHTTP2_ERR_DEFERRED in C layer)
885 32         136 return undef;
886 36         318 };
887              
888 141     141   7005 return async sub {
889 141         259 my ($event) = @_;
890 141 50       337 return unless $weak_self;
891 141 50       375 return if $weak_self->{closed};
892              
893 141   50     393 my $type = $event->{type} // '';
894              
895 141 100       424 if ($type eq 'http.response.start') {
    50          
896 33 50       117 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
897 33 50       86 return if $ss->{response_started};
898 33         122 $ss->{response_started} = 1;
899 33 50       253 $ss->{connection_state}->_mark_response_started if $ss->{connection_state};
900              
901 33   50     81 $status = $event->{status} // 200;
902             @response_headers = map {
903 32         196 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
904 33   50     61 } @{$event->{headers} // []};
  33         112  
905             # Server-supplied Date header (HTTP/1.1 parity) — add if the app didn't.
906 33 50       103 unless (grep { lc($_->[0]) eq 'date' } @response_headers) {
  32         475  
907 33         291 push @response_headers, ['date', $weak_self->{protocol}->format_date];
908             }
909             }
910             elsif ($type eq 'http.response.body') {
911 108 50       331 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
912 108 50       258 return unless $ss->{response_started};
913              
914 108   50     264 my $body = $event->{body} // '';
915 108   100     286 my $more = $event->{more} // 0;
916              
917 108 100       240 if ($more) {
918 75 100       147 if (!$streaming_started) {
919             # First streaming chunk — submit with data callback
920 10         18 $streaming_started = 1;
921 10   50     58 $ss->{send_queue} //= [];
922 10   50     49 $ss->{send_queue_bytes} //= 0;
923 10 50       24 if (length $body) {
924 10         33 push @{$ss->{send_queue}}, $body;
  10         50  
925 10         24 $ss->{send_queue_bytes} += length $body;
926             }
927             # Synchronous: we're in the app's send path (not nghttp2's
928             # extract), so on_high_water can fire here to tell the app to
929             # pause its source.
930 10 50       74 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
931             $weak_self->{h2_session}->submit_response_streaming(
932 10         73 $stream_id,
933             status => $status,
934             headers => \@response_headers,
935             data_callback => $data_callback,
936             );
937 10         428 $weak_self->_h2_write_pending;
938             } else {
939             # Subsequent chunk — backpressure check then push and resume.
940             # Bound on THIS stream's send queue (per-stream), not the
941             # shared TCP buffer which is meaningless across multiplexed
942             # streams.
943 65 100 50     241 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
944 5         21 await $weak_self->_h2_wait_for_stream_drain($stream_id);
945 5 50       598 return unless $weak_self;
946 5 50       18 return if $weak_self->{closed};
947 5 50       20 return unless $weak_self->{h2_streams}{$stream_id};
948             }
949 65 50       185 if (length $body) {
950 65         105 push @{$ss->{send_queue}}, $body;
  65         251  
951 65         142 $ss->{send_queue_bytes} += length $body;
952             }
953             # Synchronous — app send path, not nghttp2 extract.
954 65 50       362 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
955 65         374 $weak_self->{h2_session}->resume_stream($stream_id);
956 65         632 $weak_self->_h2_write_pending;
957             }
958             } else {
959 33 100       83 if ($streaming_started) {
960             # Final chunk on an already-streaming response. Bound on THIS
961             # stream's send queue (per-stream), not the shared TCP buffer.
962 10 50 50     56 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
963 0         0 await $weak_self->_h2_wait_for_stream_drain($stream_id);
964 0 0       0 return unless $weak_self;
965 0 0       0 return if $weak_self->{closed};
966 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
967             }
968 10         19 $eof_pending = 1;
969 10 100       73 if (length $body) {
970 8         14 push @{$ss->{send_queue}}, $body;
  8         17  
971 8         20 $ss->{send_queue_bytes} += length $body;
972             }
973             # Synchronous — app send path, not nghttp2 extract.
974 10 50       63 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
975 10         40 $weak_self->{h2_session}->resume_stream($stream_id);
976 10         88 $weak_self->_h2_write_pending;
977             } else {
978             # Non-streaming: single response (unchanged one-shot path)
979 23         143 $weak_self->{h2_session}->submit_response($stream_id,
980             status => $status,
981             headers => \@response_headers,
982             body => $body,
983             );
984 23         890 $weak_self->_h2_write_pending;
985             }
986             }
987             }
988             else {
989 0         0 _unrecognized_event_type($type, 'http');
990             }
991 36         470 };
992             }
993              
994             # =============================================================================
995             # HTTP/2 WebSocket over HTTP/2 (RFC 8441)
996             # =============================================================================
997              
998             sub _h2_create_websocket_scope {
999 17     17   198981 my ($self, $stream_id, $stream_state) = @_;
1000              
1001 17         63 my $pseudo = $stream_state->{pseudo};
1002 17         46 my $headers = $stream_state->{headers};
1003              
1004 17   50     60 my $full_path = $pseudo->{':path'} // '/';
1005 17         89 my ($path, $query_string) = split(/\?/, $full_path, 2);
1006 17   50     117 $query_string //= '';
1007              
1008             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
1009 17         115 my $unescaped = uri_unescape($path);
1010 17   33     170 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  17         213  
1011             // $unescaped;
1012              
1013             # Extract subprotocols from headers
1014 17         1107 my @subprotocols;
1015 17         51 for my $header (@$headers) {
1016 34         89 my ($name, $value) = @$header;
1017 34 100       80 if ($name eq 'sec-websocket-protocol') {
1018 1         4 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  2         17  
1019             }
1020             }
1021              
1022             return {
1023             type => 'websocket',
1024             pagi => {
1025             version => '0.3',
1026             spec_version => '0.3',
1027             },
1028             http_version => '2',
1029             scheme => $self->_get_ws_scheme,
1030             path => $decoded_path,
1031             raw_path => $path,
1032             query_string => $query_string,
1033             root_path => '',
1034             headers => $headers,
1035             (defined $self->{client_host}
1036             ? (client => [$self->{client_host}, $self->{client_port}])
1037             : ()
1038             ),
1039             server => [$self->{server_host}, $self->{server_port}],
1040             subprotocols => \@subprotocols,
1041 17         179 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
1042 17 50       203 extensions => { %{$self->_get_extensions_for_scope}, 'websocket.http.response' => {} },
  17 50       228  
1043             };
1044             }
1045              
1046             sub _h2_create_websocket_receive {
1047 16     16   56 my ($self, $stream_id, $stream_state) = @_;
1048              
1049 16         36 weaken(my $weak_self = $self);
1050              
1051             return sub {
1052 31 50   31   1351 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1053             unless $weak_self;
1054             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1055 31 50       78 if $weak_self->{closed};
1056              
1057 31         85 my $ss = $weak_self->{h2_streams}{$stream_id};
1058 31 50       70 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1059             unless $ss;
1060              
1061 31         47 my $future = (async sub {
1062 31 50       96 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1063             unless $weak_self;
1064              
1065 31         55 my $ss = $weak_self->{h2_streams}{$stream_id};
1066 31 50       67 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1067             unless $ss;
1068              
1069             # Check queue first
1070 31 50       47 if (@{$ss->{receive_queue}}) {
  31         78  
1071 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1072             }
1073              
1074             # First call returns websocket.connect
1075 31 100       75 if (!$ss->{ws_connect_sent}) {
1076 15         29 $ss->{ws_connect_sent} = 1;
1077 15         169 return { type => 'websocket.connect' };
1078             }
1079              
1080             # Wait for events
1081 16         22 while (1) {
1082 38 100       64 if (@{$ss->{receive_queue}}) {
  38         118  
1083 9         17 return shift @{$ss->{receive_queue}};
  9         59  
1084             }
1085              
1086             return { type => 'websocket.disconnect', code => 1006, reason => '' }
1087 29 100       168 if $weak_self->{closed};
1088              
1089 22 50       59 if (!$ss->{body_pending}) {
1090 22         163 $ss->{body_pending} = Future->new;
1091             }
1092 22         232 await $ss->{body_pending};
1093              
1094 22         3037 $ss = $weak_self->{h2_streams}{$stream_id};
1095 22 50       76 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1096             unless $ss;
1097             }
1098 31         162 })->();
1099              
1100 31         1689 return $future;
1101 16         154 };
1102             }
1103              
1104             sub _h2_create_websocket_send {
1105 16     16   43 my ($self, $stream_id, $stream_state) = @_;
1106              
1107 16         46 weaken(my $weak_self = $self);
1108              
1109 25     25   864 return async sub {
1110 25         58 my ($event) = @_;
1111 25 50       65 return unless $weak_self;
1112 25 50       338 return if $weak_self->{closed};
1113              
1114 25         60 my $ss = $weak_self->{h2_streams}{$stream_id};
1115 25 50       61 return unless $ss;
1116              
1117 25   50     72 my $type = $event->{type} // '';
1118              
1119 25 100       111 if ($type eq 'websocket.accept') {
    100          
    100          
    100          
    50          
1120 12 50       205 return if $ss->{ws_accepted};
1121              
1122             # HTTP/2 WebSocket: respond with 200 (not 101)
1123 12         21 my @headers;
1124 12 50       49 if (my $subprotocol = $event->{subprotocol}) {
1125 0         0 $subprotocol = _validate_subprotocol($subprotocol);
1126 0         0 push @headers, ['sec-websocket-protocol', $subprotocol];
1127             }
1128 12 50       32 if (my $extra = $event->{headers}) {
1129             push @headers, map {
1130 0         0 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
  0         0  
1131             } @$extra;
1132             }
1133              
1134 12         26 $ss->{ws_accepted} = 1;
1135 12         26 $ss->{response_started} = 1;
1136             $ss->{ws_frame} = Protocol::WebSocket::Frame->new(
1137             max_payload_size => $weak_self->{max_ws_frame_size},
1138 12         172 );
1139              
1140             # Submit 200 response with streaming body that defers
1141             $weak_self->{h2_session}->submit_response($stream_id,
1142             status => 200,
1143             headers => \@headers,
1144 12         43 body => sub { return undef }, # defer until submit_data
1145 12         611 );
1146 12         367 $weak_self->_h2_write_pending;
1147              
1148             # Process any data that arrived before accept
1149 12 50       52 if (length($ss->{body}) > 0) {
1150 0         0 my $buffered = $ss->{body};
1151 0         0 $ss->{body} = '';
1152 0         0 $weak_self->_h2_process_ws_frames($stream_id, $ss, $buffered);
1153             }
1154             }
1155             elsif ($type eq 'websocket.send') {
1156 5 50       46 return unless $ss->{ws_accepted};
1157              
1158 5         11 my $frame;
1159 5 100       17 if (defined $event->{text}) {
    50          
1160             $frame = Protocol::WebSocket::Frame->new(
1161             buffer => $event->{text},
1162 4         52 type => 'text',
1163             );
1164             }
1165             elsif (defined $event->{bytes}) {
1166             $frame = Protocol::WebSocket::Frame->new(
1167             buffer => $event->{bytes},
1168 1         6 type => 'binary',
1169             );
1170             }
1171             else {
1172 0         0 return;
1173             }
1174              
1175 5         562 my $bytes = $frame->to_bytes;
1176 5         304 $weak_self->{h2_session}->submit_data($stream_id, $bytes, 0);
1177 5         26 $weak_self->_h2_write_pending;
1178             }
1179             elsif ($type eq 'websocket.http.response.start') {
1180 2 50       5 return if $ss->{ws_accepted};
1181 2 50       5 return if $ss->{ws_denial_started};
1182 2         3 $ss->{ws_denial_started} = 1;
1183 2   50     6 $ss->{ws_denial_status} = $event->{status} // 403;
1184             $ss->{ws_denial_headers} = [
1185 2         8 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
1186 2   50     4 @{$event->{headers} // []}
  2         5  
1187             ];
1188 2         5 $ss->{ws_denial_body} = '';
1189             }
1190             elsif ($type eq 'websocket.http.response.body') {
1191 3 50       6 return unless $ss->{ws_denial_started};
1192 3 50       6 return if $ss->{response_started};
1193 3   50     7 $ss->{ws_denial_body} .= $event->{body} // '';
1194 3 100       8 return if $event->{more}; # more chunks coming — keep buffering
1195              
1196 2         3 $ss->{response_started} = 1;
1197             $weak_self->{h2_session}->submit_response($stream_id,
1198             status => $ss->{ws_denial_status},
1199             headers => $ss->{ws_denial_headers},
1200             body => $ss->{ws_denial_body},
1201 2         11 );
1202 2         85 $weak_self->_h2_write_pending;
1203             }
1204             elsif ($type eq 'websocket.close') {
1205 3 100       13 if (!$ss->{ws_accepted}) {
1206             # Reject: send 403
1207 2         18 $weak_self->{h2_session}->submit_response($stream_id,
1208             status => 403,
1209             headers => [['content-type', 'text/plain']],
1210             body => 'Forbidden',
1211             );
1212 2         71 $weak_self->_h2_write_pending;
1213 2         29 return;
1214             }
1215              
1216 1   50     3 my $code = $event->{code} // 1000;
1217 1   50     3 my $reason = $event->{reason} // '';
1218              
1219 1         9 my $frame = Protocol::WebSocket::Frame->new(
1220             type => 'close',
1221             buffer => pack('n', $code) . $reason,
1222             );
1223              
1224             # Send close frame + END_STREAM
1225 1         50 $weak_self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1226 1         8 $weak_self->_h2_write_pending;
1227             }
1228              
1229 22         257 return;
1230 16         220 };
1231             }
1232              
1233             # =============================================================================
1234             # HTTP/2 SSE (Server-Sent Events over HTTP/2)
1235             # =============================================================================
1236              
1237             sub _h2_create_sse_scope {
1238 18     18   2160 my ($self, $stream_id, $stream_state) = @_;
1239              
1240 18         43 my $pseudo = $stream_state->{pseudo};
1241 18         54 my $headers = $stream_state->{headers};
1242              
1243 18   50     89 my $full_path = $pseudo->{':path'} // '/';
1244 18         99 my ($path, $query_string) = split(/\?/, $full_path, 2);
1245 18   50     197 $query_string //= '';
1246              
1247             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
1248 18         130 my $unescaped = uri_unescape($path);
1249 18   33     227 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  18         286  
1250             // $unescaped;
1251              
1252             return {
1253             type => 'sse',
1254             pagi => {
1255             version => '0.3',
1256             spec_version => '0.3',
1257             },
1258             http_version => '2',
1259             method => $pseudo->{':method'} // 'GET',
1260             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
1261             path => $decoded_path,
1262             raw_path => $path,
1263             query_string => $query_string,
1264             root_path => '',
1265             headers => $headers,
1266             (defined $self->{client_host}
1267             ? (client => [$self->{client_host}, $self->{client_port}])
1268             : ()
1269             ),
1270             server => [$self->{server_host}, $self->{server_port}],
1271 18         169 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
1272             extensions => $self->_get_extensions_for_scope,
1273             # Per-stream outbound flow-control handle. Like the h2 streaming scope,
1274             # it measures THIS stream's send queue (h2 multiplexes many streams over
1275             # one connection, so the shared TCP buffer is meaningless per stream).
1276 18 50 50     1489 'pagi.transport' => ($stream_state->{transport_state} = $self->_h2_transport_state($stream_state)),
    50 33        
1277             };
1278             }
1279              
1280             sub _h2_create_sse_receive {
1281 17     17   53 my ($self, $stream_id, $stream_state) = @_;
1282              
1283 17         38 weaken(my $weak_self = $self);
1284              
1285             my $sse_disconnect = sub {
1286             return {
1287 2     2   14 type => 'sse.disconnect',
1288             reason => 'client_closed',
1289             };
1290 17         97 };
1291              
1292             return sub {
1293 14 50   14   160 return Future->done($sse_disconnect->()) unless $weak_self;
1294 14 50       41 return Future->done($sse_disconnect->()) if $weak_self->{closed};
1295              
1296 14         36 my $ss = $weak_self->{h2_streams}{$stream_id};
1297 14 50       39 return Future->done($sse_disconnect->()) unless $ss;
1298              
1299 14         27 my $future = (async sub {
1300 14 50       28 return $sse_disconnect->() unless $weak_self;
1301              
1302 14         50 my $ss = $weak_self->{h2_streams}{$stream_id};
1303 14 50       35 return $sse_disconnect->() unless $ss;
1304              
1305             # Check queue first
1306 14 50       20 if (@{$ss->{receive_queue}}) {
  14         49  
1307 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1308             }
1309              
1310             # First call returns sse.request with body
1311 14 100       35 if (!$ss->{sse_request_sent}) {
1312 12         26 $ss->{sse_request_sent} = 1;
1313             return {
1314             type => 'sse.request',
1315             body => $ss->{body},
1316 12         297 more => 0,
1317             };
1318             }
1319              
1320             # Wait for disconnect
1321 2         4 while (1) {
1322 4 50       5 if (@{$ss->{receive_queue}}) {
  4         9  
1323 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1324             }
1325              
1326             return $sse_disconnect->()
1327 4 100       10 if $weak_self->{closed};
1328              
1329 2 50       4 if (!$ss->{body_pending}) {
1330 2         6 $ss->{body_pending} = Future->new;
1331             }
1332 2         12 await $ss->{body_pending};
1333              
1334 2         178 $ss = $weak_self->{h2_streams}{$stream_id};
1335 2 50       6 return $sse_disconnect->() unless $ss;
1336             }
1337 14         88 })->();
1338              
1339 14         774 return $future;
1340 17         128 };
1341             }
1342              
1343             sub _h2_create_sse_send {
1344 17     17   46 my ($self, $stream_id, $stream_state) = @_;
1345              
1346 17         41 weaken(my $weak_self = $self);
1347              
1348             # Streaming state for the data-provider pull pattern. The send queue lives on
1349             # per-stream state ($ss->{send_queue} / $ss->{send_queue_bytes}) so the
1350             # pagi.transport handle can measure THIS stream's backlog. $streaming_started
1351             # stays closure-local.
1352 17         29 my $streaming_started = 0;
1353              
1354             # Data callback for nghttp2's streaming response. Pulls from the per-stream
1355             # queue; SSE responses stay open, so this never signals EOF (returns eof=0),
1356             # or undef to defer when the queue is empty.
1357             my $data_callback = sub {
1358 79     79   160 my ($cb_stream_id, $max_len) = @_;
1359              
1360 79   33     369 my $ss = $weak_self && $weak_self->{h2_streams}{$stream_id};
1361 79 50       146 return undef unless $ss;
1362 79   50     369 my $q = $ss->{send_queue} ||= [];
1363              
1364 79 100       159 if (@$q) {
1365 34         94 my $chunk = shift @$q;
1366             # Respect max_len — XS truncates without preserving remainder
1367 34 100       112 if (length($chunk) > $max_len) {
1368 4         317 unshift @$q, substr($chunk, $max_len);
1369 4         65 $chunk = substr($chunk, 0, $max_len);
1370             }
1371 34         105 $ss->{send_queue_bytes} -= length($chunk);
1372              
1373             # Per-stream backpressure: once this stream's queue falls below the
1374             # low watermark, release any producer blocked in
1375             # _h2_wait_for_stream_drain. This runs inside nghttp2's extract(), so
1376             # resolve on the next loop tick — completing the Future resumes the
1377             # awaiting producer synchronously, and it must not re-enter nghttp2.
1378 34 50 50     215 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      66        
      33        
1379 0         0 && $ss->{stream_drain_waiters} && @{$ss->{stream_drain_waiters}}) {
1380 0         0 my @waiters = splice @{$ss->{stream_drain_waiters}};
  0         0  
1381             $weak_self->{server}->loop->later(sub {
1382 0         0 $_->done for grep { !$_->is_ready } @waiters;
  0         0  
1383 0         0 });
1384             }
1385              
1386             # Fire the app's on_drain hysteresis callbacks once this stream's
1387             # queue falls below the low watermark. Deferred for the same reason:
1388             # an on_drain callback may call $send, which would re-enter nghttp2.
1389 34 100 50     222 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      66        
1390 1         5 && $ss->{transport_drain_fires} && @{$ss->{transport_drain_fires}}) {
1391 1         4 my @fires = splice @{$ss->{transport_drain_fires}};
  1         7  
1392             $weak_self->{server}->loop->later(sub {
1393 1         1208 $_->() for @fires;
1394 1         18 });
1395             }
1396              
1397 34         552 return ($chunk, 0); # SSE streams never EOF via data_callback
1398             }
1399              
1400             # Queue empty. If the application closed this stream (sse.close), emit a
1401             # final empty DATA frame with END_STREAM to terminate it; otherwise defer.
1402 45 100       105 return ('', 1) if $ss->{sse_closing};
1403              
1404             # Queue empty — defer (NGHTTP2_ERR_DEFERRED in the C layer)
1405 44         188 return undef;
1406 17         132 };
1407              
1408 52     52   103671 return async sub {
1409 52         94 my ($event) = @_;
1410 52 50       131 return unless $weak_self;
1411              
1412 52         126 my $ss = $weak_self->{h2_streams}{$stream_id};
1413 52 50       113 return unless $ss;
1414              
1415 52   50     283 my $type = $event->{type} // '';
1416              
1417             # After an application-initiated sse.close on THIS stream, a second
1418             # sse.close is a no-op and any other send raises. Tracked per-stream
1419             # ($ss), since HTTP/2 multiplexes many SSE streams over one connection.
1420 52 100       132 if ($ss->{sse_close_sent}) {
1421 1 50       16 return if $type eq 'sse.close';
1422 1         23 die "cannot send '$type' after sse.close\n";
1423             }
1424              
1425             # After an sse.http.response.start (decline), only the decline body may
1426             # follow; a stream event is a programming error (first-send-wins).
1427 51 100 100     161 if ($ss->{sse_decline_started} && $type !~ /^sse\.http\.response\./) {
1428 1         25 die "cannot send '$type' after sse.http.response.start\n";
1429             }
1430              
1431 50 50       118 return if $weak_self->{closed};
1432              
1433             # Reset SSE idle timer on send activity
1434 50         243 $weak_self->_reset_sse_idle_timer;
1435              
1436             # Dev-mode event validation (PAGI spec compliance)
1437 50 50       126 if ($weak_self->{validate_events}) {
1438 0         0 require PAGI::Server::EventValidator;
1439 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
1440             }
1441              
1442 50 100       233 if ($type eq 'sse.start') {
    100          
    100          
    100          
    100          
    100          
    50          
1443 14 50       43 return if $ss->{response_started};
1444 14         34 $ss->{response_started} = 1;
1445              
1446 14   50     49 my $status = $event->{status} // 200;
1447 14   100     71 my $headers = $event->{headers} // [];
1448              
1449             # Ensure Content-Type is text/event-stream
1450 14         34 my $has_content_type = 0;
1451 14         42 for my $h (@$headers) {
1452 2 100       6 if (lc($h->[0]) eq 'content-type') {
1453 1         1 $has_content_type = 1;
1454 1         2 last;
1455             }
1456             }
1457              
1458 14         29 my @final_headers;
1459 14         30 for my $h (@$headers) {
1460 2         7 push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
1461             }
1462 14 100       35 if (!$has_content_type) {
1463 13         54 push @final_headers, ['content-type', 'text/event-stream'];
1464             }
1465 14         38 push @final_headers, ['cache-control', 'no-cache'];
1466             # Server-supplied Date header (HTTP/1.1 parity) — the h1 SSE path adds
1467             # this too; add it unless the app supplied one.
1468 14 50       31 unless (grep { lc($_->[0]) eq 'date' } @final_headers) {
  29         118  
1469 14         135 push @final_headers, ['date', $weak_self->{protocol}->format_date];
1470             }
1471              
1472 14         29 $streaming_started = 1;
1473 14   50     78 $ss->{send_queue} //= [];
1474 14   50     76 $ss->{send_queue_bytes} //= 0;
1475             $weak_self->{h2_session}->submit_response_streaming(
1476 14         81 $stream_id,
1477             status => $status,
1478             headers => \@final_headers,
1479             data_callback => $data_callback,
1480             );
1481 14         534 $weak_self->_h2_write_pending;
1482              
1483             # Protocol-specific keepalive writer (HTTP/2 DATA frames). Keepalive
1484             # bytes are counted in the per-stream backlog so buffered_amount stays
1485             # accurate, but they do not poke the watermark callbacks — a server
1486             # heartbeat is not an application send.
1487             $weak_self->{sse_keepalive_writer} = sub {
1488 6         18 my ($text) = @_;
1489 6 50       23 return unless $weak_self;
1490 6 50       22 return if $weak_self->{closed};
1491 6 50       56 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
1492 6   50     13 push @{$ss->{send_queue} ||= []}, $text;
  6         106  
1493 6   50     86 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $text;
1494 6         63 $weak_self->{h2_session}->resume_stream($stream_id);
1495 6         118 $weak_self->_h2_write_pending;
1496 14         117 };
1497              
1498             # Start SSE idle timer if configured
1499 14         60 $weak_self->_start_sse_idle_timer;
1500             }
1501             elsif ($type eq 'sse.send') {
1502 24 50       77 return unless $ss->{response_started};
1503              
1504             # Per-stream backpressure: bound on THIS stream's queue, not the
1505             # shared TCP buffer (meaningless across multiplexed h2 streams).
1506 24 50 50     113 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
1507 0         0 await $weak_self->_h2_wait_for_stream_drain($stream_id);
1508 0 0       0 return unless $weak_self;
1509 0 0       0 return if $weak_self->{closed};
1510 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
1511             }
1512              
1513 24         108 my $sse_data = _format_sse_event($event);
1514 24   50     44 push @{$ss->{send_queue} ||= []}, $sse_data;
  24         99  
1515 24   50     95 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $sse_data;
1516             # Synchronous — app send path, not nghttp2 extract — so on_high_water
1517             # may fire here to tell the app to pause its source.
1518 24 50       190 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
1519 24         153 $weak_self->{h2_session}->resume_stream($stream_id);
1520 24         324 $weak_self->_h2_write_pending;
1521             }
1522             elsif ($type eq 'sse.comment') {
1523 1 50       3 return unless $ss->{response_started};
1524              
1525 1         4 my $comment = _format_sse_comment($event);
1526 1   50     1 push @{$ss->{send_queue} ||= []}, $comment;
  1         4  
1527 1   50     4 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $comment;
1528 1 50       6 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
1529 1         3 $weak_self->{h2_session}->resume_stream($stream_id);
1530 1         8 $weak_self->_h2_write_pending;
1531             }
1532             elsif ($type eq 'sse.keepalive') {
1533 2   50     10 my $interval = $event->{interval} // 0;
1534 2         5 my $comment = $event->{comment};
1535              
1536 2 50       7 if ($interval > 0) {
1537 2         11 $weak_self->_start_sse_keepalive($interval, $comment);
1538             }
1539             else {
1540 0         0 $weak_self->_stop_sse_keepalive;
1541             }
1542             }
1543             elsif ($type eq 'sse.close') {
1544             # End THIS HTTP/2 stream now: flush remaining queued events, then the
1545             # data_callback emits a final END_STREAM frame. `reason` is
1546             # server-side only and is never written to the wire.
1547 1         3 $ss->{sse_close_sent} = 1;
1548 1         2 $ss->{sse_closing} = 1;
1549             $weak_self->{sse_disconnect_reason} = $event->{reason}
1550 1 50       5 if defined $event->{reason};
1551 1         4 $weak_self->{h2_session}->resume_stream($stream_id);
1552 1         8 $weak_self->_h2_write_pending;
1553             }
1554             elsif ($type eq 'sse.http.response.start') {
1555             die "cannot decline with sse.http.response.start after sse.start\n"
1556 4 100       24 if $ss->{response_started};
1557 3 50       10 return if $ss->{sse_decline_started}; # idempotent
1558 3         5 $ss->{sse_decline_started} = 1;
1559 3   50     14 $ss->{sse_decline_status} = $event->{status} // 200;
1560             $ss->{sse_decline_headers} = [
1561 2         8 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
1562 3   50     5 @{$event->{headers} // []}
  3         12  
1563             ];
1564 3         10 $ss->{sse_decline_body} = '';
1565             }
1566             elsif ($type eq 'sse.http.response.body') {
1567 4 50       13 return unless $ss->{sse_decline_started};
1568 4 50       11 return if $ss->{response_started};
1569 4   50     14 $ss->{sse_decline_body} .= $event->{body} // '';
1570 4 100       37 return if $event->{more}; # more chunks coming — keep buffering
1571              
1572 3         7 $ss->{response_started} = 1;
1573             $weak_self->{h2_session}->submit_response($stream_id,
1574             status => $ss->{sse_decline_status},
1575             headers => $ss->{sse_decline_headers},
1576             body => $ss->{sse_decline_body},
1577 3         21 );
1578 3         158 $weak_self->_h2_write_pending;
1579             }
1580             else {
1581 0         0 _unrecognized_event_type($type, 'sse');
1582             }
1583              
1584 48         589 return;
1585 17         269 };
1586             }
1587              
1588             sub _h2_process_ws_frames {
1589 16     16   39 my ($self, $stream_id, $stream, $data) = @_;
1590              
1591 16         74 my $frame = $stream->{ws_frame};
1592 16 50       42 return unless $frame;
1593              
1594 16         94 $frame->append($data);
1595              
1596 16         335 while (defined(my $bytes = $frame->next_bytes)) {
1597 10         1667 my $opcode = $frame->opcode;
1598              
1599 10 100       122 if ($opcode == 1) {
    100          
    50          
    0          
1600             # Text frame
1601 5         15 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  5         99  
1602 5 100       491 unless (defined $text) {
1603 1         5 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8');
1604 1         3 return;
1605             }
1606 4         10 push @{$stream->{receive_queue}}, {
  4         39  
1607             type => 'websocket.receive',
1608             text => $text,
1609             };
1610             }
1611             elsif ($opcode == 2) {
1612             # Binary frame
1613 1         1 push @{$stream->{receive_queue}}, {
  1         7  
1614             type => 'websocket.receive',
1615             bytes => $bytes,
1616             };
1617             }
1618             elsif ($opcode == 8) {
1619             # Close frame
1620 4         14 my ($code, $reason) = (1005, '');
1621              
1622             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
1623 4 100       28 if (length($bytes) == 1) {
1624 1         5 $self->_h2_ws_close($stream_id, 1002, 'Invalid close frame');
1625 1         2 push @{$stream->{receive_queue}}, {
  1         16  
1626             type => 'websocket.disconnect',
1627             code => 1002,
1628             reason => 'Invalid close frame',
1629             };
1630 1         6 $self->_h2_wake_pending($stream);
1631 1         121 return;
1632             }
1633              
1634 3 50       41 if (length($bytes) >= 2) {
1635 3         21 $code = unpack('n', substr($bytes, 0, 2));
1636 3   50     34 $reason = substr($bytes, 2) // '';
1637              
1638             # RFC 6455 Section 7.4.1: Validate close code
1639 3         8 my $valid_code = 0;
1640 3 100 66     32 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    50 66        
    50 33        
      33        
      33        
1641 2         5 $valid_code = 1;
1642             }
1643             elsif ($code >= 1007 && $code <= 1011) {
1644 0         0 $valid_code = 1;
1645             }
1646             elsif ($code >= 3000 && $code <= 4999) {
1647 0         0 $valid_code = 1;
1648             }
1649 3 100       13 unless ($valid_code) {
1650 1         4 $self->_h2_ws_close($stream_id, 1002, 'Invalid close code');
1651 1         2 push @{$stream->{receive_queue}}, {
  1         6  
1652             type => 'websocket.disconnect',
1653             code => 1002,
1654             reason => 'Invalid close code',
1655             };
1656 1         13 $self->_h2_wake_pending($stream);
1657 1         155 return;
1658             }
1659              
1660             # RFC 6455: Close reason must be valid UTF-8
1661 2 50       9 if (length($reason) > 0) {
1662 2         6 my $reason_copy = $reason;
1663 2         6 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  2         27  
1664 2 100       137 unless (defined $decoded) {
1665 1         21 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8 in close reason');
1666 1         2 push @{$stream->{receive_queue}}, {
  1         6  
1667             type => 'websocket.disconnect',
1668             code => 1007,
1669             reason => 'Invalid UTF-8 in close reason',
1670             };
1671 1         5 $self->_h2_wake_pending($stream);
1672 1         146 return;
1673             }
1674             }
1675             }
1676              
1677             # Send close frame back + END_STREAM
1678 1         6 my $close_frame = Protocol::WebSocket::Frame->new(
1679             type => 'close',
1680             buffer => pack('n', $code) . $reason,
1681             );
1682 1         35 $self->{h2_session}->submit_data($stream_id, $close_frame->to_bytes, 1);
1683             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1684              
1685 1         5 push @{$stream->{receive_queue}}, {
  1         15  
1686             type => 'websocket.disconnect',
1687             code => $code,
1688             reason => $reason,
1689             };
1690             }
1691             elsif ($opcode == 9) {
1692             # Ping — respond with pong
1693 0         0 my $pong = Protocol::WebSocket::Frame->new(
1694             type => 'pong',
1695             buffer => $bytes,
1696             );
1697 0         0 $self->{h2_session}->submit_data($stream_id, $pong->to_bytes, 0);
1698             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1699             }
1700             # Opcode 10 (pong) — ignore
1701             }
1702              
1703 12         4170 $self->_h2_wake_pending($stream);
1704             }
1705              
1706             sub _h2_ws_close {
1707 4     4   12 my ($self, $stream_id, $code, $reason) = @_;
1708              
1709 4   50     34 my $frame = Protocol::WebSocket::Frame->new(
1710             type => 'close',
1711             buffer => pack('n', $code) . ($reason // ''),
1712             );
1713 4         150 $self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1714             # No _h2_write_pending — called from inside feed(); flushed by _h2_process_data
1715             }
1716              
1717             # Request stall timeout - closes connection if no I/O activity during request processing
1718             sub _start_stall_timer {
1719 172     172   349 my ($self) = @_;
1720              
1721 172 50 33     659 return unless $self->{request_timeout} && $self->{request_timeout} > 0;
1722 0 0       0 return unless $self->{server};
1723 0 0       0 return if $self->{stall_timer}; # Already running
1724              
1725 0         0 weaken(my $weak_self = $self);
1726              
1727             my $timer = IO::Async::Timer::Countdown->new(
1728             delay => $self->{request_timeout},
1729             on_expire => sub {
1730 0 0   0   0 return unless $weak_self;
1731 0 0       0 return if $weak_self->{closed};
1732             # Log the timeout
1733 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1734 0         0 $weak_self->{server}->_log(warn =>
1735             "Request stall timeout ($weak_self->{request_timeout}s) - closing connection");
1736             }
1737 0         0 $weak_self->_handle_disconnect_and_close('client_timeout');
1738             },
1739 0         0 );
1740 0         0 $self->{stall_timer} = $timer;
1741 0         0 $self->{server}->add_child($timer);
1742 0         0 $timer->start;
1743             }
1744              
1745             sub _reset_stall_timer {
1746 401     401   652 my ($self) = @_;
1747              
1748 401 50       981 return unless $self->{stall_timer};
1749 0         0 $self->{stall_timer}->reset;
1750 0 0       0 $self->{stall_timer}->start unless $self->{stall_timer}->is_running;
1751             }
1752              
1753             sub _stop_stall_timer {
1754 439     439   951 my ($self) = @_;
1755              
1756 439 50       1242 return unless $self->{stall_timer};
1757 0 0       0 $self->{stall_timer}->stop if $self->{stall_timer}->is_running;
1758 0 0       0 if ($self->{server}) {
1759 0         0 $self->{server}->remove_child($self->{stall_timer});
1760             }
1761 0         0 $self->{stall_timer} = undef;
1762             }
1763              
1764             # WebSocket idle timeout - closes connection if no activity
1765             sub _start_ws_idle_timer {
1766 22     22   51 my ($self) = @_;
1767              
1768 22 50 33     84 return unless $self->{ws_idle_timeout} && $self->{ws_idle_timeout} > 0;
1769 0 0       0 return unless $self->{server};
1770 0 0       0 return if $self->{ws_idle_timer};
1771              
1772 0         0 weaken(my $weak_self = $self);
1773              
1774             my $timer = IO::Async::Timer::Countdown->new(
1775             delay => $self->{ws_idle_timeout},
1776             on_expire => sub {
1777 0 0   0   0 return unless $weak_self;
1778 0 0       0 return if $weak_self->{closed};
1779 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1780 0         0 $weak_self->{server}->_log(warn =>
1781             "WebSocket idle timeout ($weak_self->{ws_idle_timeout}s) - closing connection");
1782             }
1783 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1784             },
1785 0         0 );
1786 0         0 $self->{ws_idle_timer} = $timer;
1787 0         0 $self->{server}->add_child($timer);
1788 0         0 $timer->start;
1789             }
1790              
1791             sub _reset_ws_idle_timer {
1792 99     99   141 my ($self) = @_;
1793              
1794 99 50       217 return unless $self->{ws_idle_timer};
1795 0         0 $self->{ws_idle_timer}->reset;
1796 0 0       0 $self->{ws_idle_timer}->start unless $self->{ws_idle_timer}->is_running;
1797             }
1798              
1799             sub _stop_ws_idle_timer {
1800 284     284   2003 my ($self) = @_;
1801              
1802 284 50       918 return unless $self->{ws_idle_timer};
1803 0 0       0 $self->{ws_idle_timer}->stop if $self->{ws_idle_timer}->is_running;
1804 0 0       0 if ($self->{server}) {
1805 0         0 $self->{server}->remove_child($self->{ws_idle_timer});
1806             }
1807 0         0 $self->{ws_idle_timer} = undef;
1808             }
1809              
1810             # SSE idle timeout - closes connection if no activity
1811             sub _start_sse_idle_timer {
1812 35     35   132 my ($self) = @_;
1813              
1814 35 50 33     175 return unless $self->{sse_idle_timeout} && $self->{sse_idle_timeout} > 0;
1815 0 0       0 return unless $self->{server};
1816 0 0       0 return if $self->{sse_idle_timer};
1817              
1818 0         0 weaken(my $weak_self = $self);
1819              
1820             my $timer = IO::Async::Timer::Countdown->new(
1821             delay => $self->{sse_idle_timeout},
1822             on_expire => sub {
1823 0 0   0   0 return unless $weak_self;
1824 0 0       0 return if $weak_self->{closed};
1825 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1826 0         0 $weak_self->{server}->_log(warn =>
1827             "SSE idle timeout ($weak_self->{sse_idle_timeout}s) - closing connection");
1828             }
1829 0         0 $weak_self->{sse_disconnect_reason} = 'idle_timeout';
1830 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1831             },
1832 0         0 );
1833 0         0 $self->{sse_idle_timer} = $timer;
1834 0         0 $self->{server}->add_child($timer);
1835 0         0 $timer->start;
1836             }
1837              
1838             sub _reset_sse_idle_timer {
1839 97     97   247 my ($self) = @_;
1840              
1841 97 50       269 return unless $self->{sse_idle_timer};
1842 0         0 $self->{sse_idle_timer}->reset;
1843 0 0       0 $self->{sse_idle_timer}->start unless $self->{sse_idle_timer}->is_running;
1844             }
1845              
1846             sub _stop_sse_idle_timer {
1847 284     284   612 my ($self) = @_;
1848              
1849 284 50       880 return unless $self->{sse_idle_timer};
1850 0 0       0 $self->{sse_idle_timer}->stop if $self->{sse_idle_timer}->is_running;
1851 0 0       0 if ($self->{server}) {
1852 0         0 $self->{server}->remove_child($self->{sse_idle_timer});
1853             }
1854 0         0 $self->{sse_idle_timer} = undef;
1855             }
1856              
1857             # ============================================================================
1858             # Send-side backpressure support
1859             # ============================================================================
1860             #
1861             # Prevents unbounded memory growth when apps send faster than slow clients
1862             # can receive. Uses watermark-based flow control:
1863             # - High watermark (default 1MB): pause sending when buffer exceeds this
1864             # - Low watermark (default 256KB): resume sending when buffer drops below this
1865             #
1866             # The $send->() Future will block (await) when high watermark is exceeded,
1867             # and resolve when buffer drains below low watermark.
1868              
1869             sub _get_write_buffer_size {
1870 388     388   723 my ($self) = @_;
1871              
1872 388 50       1058 return 0 unless $self->{stream};
1873              
1874             # Access IO::Async::Stream's internal write queue
1875             # IO::Async doesn't expose a public API for buffer size, so we access internals
1876 388   50     1102 my $queue = $self->{stream}{writequeue} // [];
1877 388         578 my $total = 0;
1878              
1879 388         881 for my $writer (@$queue) {
1880 261         5808 my $data = $writer->data;
1881 261 50 33     2098 if (defined $data && !ref $data) {
1882 261         493 $total += length($data);
1883             }
1884             }
1885              
1886 388         1584 return $total;
1887             }
1888              
1889             # HTTP/1.1: the transport handle reads the shared TCP write buffer. One
1890             # connection is one stream here, so the IO::Async write queue is the per-stream
1891             # backlog. The connection is held weakly so the handle never keeps it alive.
1892             sub _h1_transport_state {
1893 215     215   562 my ($self) = @_;
1894 215         667 weaken(my $w = $self);
1895             return PAGI::Server::TransportState->new(
1896 187 50   187   602 measure => sub { $w ? $w->_get_write_buffer_size : 0 },
1897 187 50   187   744 high => sub { $w ? $w->{write_high_watermark} : undef },
1898 3 50   3   8 low => sub { $w ? $w->{write_low_watermark} : undef },
1899 1 50   1   1 arm_drain => sub { my $fire = shift; $w->_wait_for_drain->on_ready($fire) if $w },
  1         11  
1900 215         3969 );
1901             }
1902              
1903             # HTTP/2: the transport handle reads this stream's send queue, not the shared
1904             # TCP write buffer — under h2, N streams multiplex one connection, so that
1905             # buffer is the whole connection's backlog (meaningless per stream). $ss is the
1906             # per-stream state hashref; it's held directly (it is the stream's own state),
1907             # while $self is weakened so the handle never keeps the connection alive.
1908             # arm_drain parks the $fire callback on the stream; the data_callback pull fires
1909             # it (deferred) when the queue crosses below the low watermark. Kept separate
1910             # from stream_drain_waiters: those are Futures for blocking backpressure, these
1911             # are the on_drain hysteresis fires.
1912             sub _h2_transport_state {
1913 55     55   155 my ($self, $ss) = @_;
1914 55         150 weaken(my $w = $self);
1915             return PAGI::Server::TransportState->new(
1916 93   100 93   519 measure => sub { $ss->{send_queue_bytes} // 0 },
1917 93 50   93   319 high => sub { $w ? $w->{write_high_watermark} : undef },
1918 0 0   0   0 low => sub { $w ? $w->{write_low_watermark} : undef },
1919 5     5   12 arm_drain => sub { my $fire = shift; push @{$ss->{transport_drain_fires}}, $fire },
  5         70  
  5         29  
1920 55         1286 );
1921             }
1922              
1923             # Notify the current transport-state handle after an application write so its
1924             # backpressure callbacks (on_high_water/on_drain) can fire on a watermark cross.
1925             sub _notify_transport_write {
1926 183     183   399 my ($self) = @_;
1927 183         345 my $ts = $self->{current_transport_state};
1928 183 50       1355 $ts->_check_watermarks if $ts;
1929             }
1930              
1931             sub _check_drain_waiters {
1932 1     1   2 my ($self) = @_;
1933              
1934 1 50       1 return unless @{$self->{_drain_waiters}};
  1         4  
1935 1 50       3 return unless $self->{stream};
1936              
1937 1         3 my $buffered = $self->_get_write_buffer_size;
1938              
1939             # Resolve all waiters if we've drained below low watermark
1940 1 50       3 if ($buffered < $self->{write_low_watermark}) {
1941 1         2 my @waiters = splice @{$self->{_drain_waiters}};
  1         3  
1942 1         2 for my $f (@waiters) {
1943 1 50       4 $f->done unless $f->is_ready;
1944             }
1945             # Disable drain checking until next high watermark hit
1946 1         9 $self->{_drain_check_active} = 0;
1947             }
1948             }
1949              
1950             sub _setup_drain_detection {
1951 1     1   2 my ($self) = @_;
1952              
1953             # Avoid redundant setup
1954 1 50       3 return if $self->{_drain_check_active};
1955 1         1 $self->{_drain_check_active} = 1;
1956              
1957 1         2 weaken(my $weak_self = $self);
1958              
1959             # Primary mechanism: check when write queue empties
1960             # This guarantees we notice drain even for fast-draining connections
1961             # Store previous handler to chain if needed
1962 1         2 my $prev_on_empty = $self->{_prev_on_outgoing_empty};
1963              
1964             $self->{stream}->configure(
1965             on_outgoing_empty => sub {
1966 1 50   1   3891 return unless $weak_self;
1967 1         5 $weak_self->_check_drain_waiters;
1968             # Call previous handler if any
1969 1 50       3 $prev_on_empty->(@_) if $prev_on_empty;
1970             },
1971 1         6 );
1972             }
1973              
1974             sub _wait_for_drain {
1975 1     1   3 my ($self) = @_;
1976              
1977             # Fast path: already below low watermark
1978 1         2 my $buffered = $self->_get_write_buffer_size;
1979 1 50       4 if ($buffered < $self->{write_low_watermark}) {
1980 0         0 return Future->done;
1981             }
1982              
1983             # Create Future to be resolved when drained
1984 1         4 my $f = $self->{server}->loop->new_future;
1985 1         32 push @{$self->{_drain_waiters}}, $f;
  1         2  
1986              
1987             # Ensure drain detection is active
1988 1         4 $self->_setup_drain_detection;
1989              
1990 1         70 return $f;
1991             }
1992              
1993             sub _cancel_drain_waiters {
1994 568     568   1318 my ($self, $reason) = @_;
1995 568   100     1484 $reason //= 'connection closed';
1996              
1997 568         934 my @waiters = splice @{$self->{_drain_waiters}};
  568         1594  
1998 568         1256 for my $f (@waiters) {
1999             # Resolve (not fail) - app should check connection state after await
2000 0 0       0 $f->done unless $f->is_ready;
2001             }
2002 568         1184 $self->{_drain_check_active} = 0;
2003             }
2004              
2005             # HTTP/2 per-stream backpressure: the h2 analogue of _wait_for_drain. Resolves
2006             # when this stream's send queue falls below the low watermark. Each multiplexed
2007             # stream is bounded independently, so a quiet TCP buffer can't let one stream's
2008             # queue grow without limit.
2009             sub _h2_wait_for_stream_drain {
2010 5     5   12 my ($self, $stream_id) = @_;
2011              
2012 5 50       17 my $ss = $self->{h2_streams}{$stream_id} or return Future->done;
2013              
2014             # Fast path: already below low watermark
2015 5 50 50     20 if (($ss->{send_queue_bytes} // 0) < $self->{write_low_watermark}) {
2016 0         0 return Future->done;
2017             }
2018              
2019             # Create Future to be resolved when this stream's queue drains (in the
2020             # data_callback pull) or when the stream is torn down.
2021 5         25 my $f = $self->{server}->loop->new_future;
2022 5   100     2480 push @{$ss->{stream_drain_waiters} //= []}, $f;
  5         24  
2023              
2024 5         37 return $f;
2025             }
2026              
2027             # Release any producer blocked on _h2_wait_for_stream_drain for a stream that
2028             # is being torn down (close/RST/connection shutdown). Resolve, never fail - the
2029             # producer rechecks connection/stream state after the await. Some teardown
2030             # sites run inside nghttp2's feed() (e.g. the oversize-body 413 path); completing
2031             # a waiter resumes the producer synchronously, so defer to the next loop tick to
2032             # keep the resumed producer out of a re-entrant nghttp2 call.
2033             sub _h2_resolve_stream_drain_waiters {
2034 72     72   192 my ($self, $ss) = @_;
2035 72 100 66     473 return unless $ss && $ss->{stream_drain_waiters};
2036 1         3 my @waiters = splice @{$ss->{stream_drain_waiters}};
  1         4  
2037 1 50       5 return unless @waiters;
2038             $self->{server}->loop->later(sub {
2039 0     0   0 $_->done for grep { !$_->is_ready } @waiters;
  0         0  
2040 0         0 });
2041             }
2042              
2043             # ============================================================================
2044              
2045             # WebSocket keepalive - sends protocol-level ping frames (RFC 6455)
2046             sub _start_ws_keepalive {
2047 0     0   0 my ($self, $interval, $timeout) = @_;
2048              
2049             # Stop existing timers first
2050 0         0 $self->_stop_ws_keepalive;
2051              
2052 0 0 0     0 return unless $interval && $interval > 0;
2053 0 0       0 return unless $self->{server};
2054              
2055 0         0 $self->{ws_keepalive_interval} = $interval;
2056 0   0     0 $self->{ws_keepalive_timeout} = $timeout // 0;
2057              
2058 0         0 weaken(my $weak_self = $self);
2059              
2060             my $timer = IO::Async::Timer::Periodic->new(
2061             interval => $interval,
2062             on_tick => sub {
2063 0 0   0   0 return unless $weak_self;
2064 0 0       0 return if $weak_self->{closed};
2065 0 0       0 return unless $weak_self->{websocket_mode};
2066              
2067             # Send ping frame
2068 0         0 my $ping = Protocol::WebSocket::Frame->new(
2069             type => 'ping',
2070             buffer => '',
2071             );
2072 0         0 $weak_self->{stream}->write($ping->to_bytes);
2073              
2074             # Start pong timeout if configured
2075 0 0       0 if ($weak_self->{ws_keepalive_timeout} > 0) {
2076 0         0 $weak_self->{ws_waiting_pong} = 1;
2077 0         0 $weak_self->_start_ws_pong_timeout;
2078             }
2079             },
2080 0         0 );
2081              
2082 0         0 $self->{ws_keepalive_timer} = $timer;
2083 0         0 $self->{server}->add_child($timer);
2084 0         0 $timer->start;
2085             }
2086              
2087             sub _start_ws_pong_timeout {
2088 0     0   0 my ($self) = @_;
2089              
2090             # Don't start another timeout if one is running
2091 0 0       0 return if $self->{ws_pong_timeout};
2092 0 0       0 return unless $self->{ws_keepalive_timeout} > 0;
2093 0 0       0 return unless $self->{server};
2094              
2095 0         0 weaken(my $weak_self = $self);
2096              
2097             my $timer = IO::Async::Timer::Countdown->new(
2098             delay => $self->{ws_keepalive_timeout},
2099             on_expire => sub {
2100 0 0   0   0 return unless $weak_self;
2101 0 0       0 return if $weak_self->{closed};
2102              
2103 0 0       0 if ($weak_self->{ws_waiting_pong}) {
2104             # No pong received within timeout - close connection
2105 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
2106 0         0 $weak_self->{server}->_log(warn =>
2107             "WebSocket keepalive timeout - no pong received within $weak_self->{ws_keepalive_timeout}s");
2108             }
2109 0         0 $weak_self->_handle_disconnect_and_close('keepalive_timeout');
2110             }
2111             },
2112 0         0 );
2113              
2114 0         0 $self->{ws_pong_timeout} = $timer;
2115 0         0 $self->{server}->add_child($timer);
2116 0         0 $timer->start;
2117             }
2118              
2119             sub _cancel_ws_pong_timeout {
2120 284     284   654 my ($self) = @_;
2121              
2122 284         642 $self->{ws_waiting_pong} = 0;
2123              
2124 284 50       875 return unless $self->{ws_pong_timeout};
2125 0 0       0 $self->{ws_pong_timeout}->stop if $self->{ws_pong_timeout}->is_running;
2126 0 0       0 if ($self->{server}) {
2127 0         0 $self->{server}->remove_child($self->{ws_pong_timeout});
2128             }
2129 0         0 $self->{ws_pong_timeout} = undef;
2130             }
2131              
2132             sub _stop_ws_keepalive {
2133 284     284   591 my ($self) = @_;
2134              
2135             # Stop pong timeout first
2136 284         1200 $self->_cancel_ws_pong_timeout;
2137              
2138 284 50       867 return unless $self->{ws_keepalive_timer};
2139 0 0       0 $self->{ws_keepalive_timer}->stop if $self->{ws_keepalive_timer}->is_running;
2140 0 0       0 if ($self->{server}) {
2141 0         0 $self->{server}->remove_child($self->{ws_keepalive_timer});
2142             }
2143 0         0 $self->{ws_keepalive_timer} = undef;
2144 0         0 $self->{ws_keepalive_interval} = 0;
2145 0         0 $self->{ws_keepalive_timeout} = 0;
2146             }
2147              
2148             # SSE keepalive - sends comment lines to prevent proxy timeouts
2149             sub _start_sse_keepalive {
2150 2     2   6 my ($self, $interval, $comment) = @_;
2151              
2152             # Stop existing timer first
2153 2         10 $self->_stop_sse_keepalive;
2154              
2155 2 50 33     12 return unless $interval && $interval > 0;
2156 2 50       7 return unless $self->{server};
2157              
2158 2   50     9 $self->{sse_keepalive_comment} = $comment // '';
2159              
2160 2         5 weaken(my $weak_self = $self);
2161              
2162             my $timer = IO::Async::Timer::Periodic->new(
2163             interval => $interval,
2164             on_tick => sub {
2165 6 50   6   893838 return unless $weak_self;
2166 6 50       39 return if $weak_self->{closed};
2167              
2168 6         24 my $text = $weak_self->{sse_keepalive_comment};
2169 6 50       40 $text = ":$text" unless $text =~ /^:/;
2170 6         22 my $formatted = "$text\n\n";
2171              
2172 6 50       30 if (my $writer = $weak_self->{sse_keepalive_writer}) {
2173 6         24 $writer->($formatted);
2174             }
2175             },
2176 2         38 );
2177              
2178 2         231 $self->{sse_keepalive_timer} = $timer;
2179 2         18 $self->{server}->add_child($timer);
2180 2         226 $timer->start;
2181             }
2182              
2183             sub _stop_sse_keepalive {
2184 286     286   702 my ($self) = @_;
2185              
2186 286 100       864 return unless $self->{sse_keepalive_timer};
2187 2 50       9 $self->{sse_keepalive_timer}->stop if $self->{sse_keepalive_timer}->is_running;
2188 2 50       102 if ($self->{server}) {
2189 2         8 $self->{server}->remove_child($self->{sse_keepalive_timer});
2190             }
2191 2         174 $self->{sse_keepalive_timer} = undef;
2192 2         7 $self->{sse_keepalive_comment} = '';
2193             }
2194              
2195             sub _try_handle_request {
2196 234     234   484 my ($self) = @_;
2197              
2198 234 50       668 return if $self->{closed};
2199 234 100       661 return if $self->{handling_request};
2200              
2201             # Try to parse a request from the buffer
2202 223         1734 my ($request, $consumed) = $self->{protocol}->parse_request($self->{buffer});
2203              
2204 223 100       755 return unless $request;
2205              
2206             # Remove consumed bytes from buffer
2207 221         741 substr($self->{buffer}, 0, $consumed) = '';
2208              
2209             # Handle parse errors (malformed request, header too large)
2210 221 100       610 if ($request->{error}) {
2211             # Mark connection as disconnected with protocol_error reason (PAGI spec compliance)
2212 5         25 $self->_handle_disconnect('protocol_error');
2213 5         32 $self->_send_error_response($request->{error}, $request->{message});
2214 5         25 $self->_close;
2215 5         189 return;
2216             }
2217              
2218             # Check Content-Length against max_body_size limit (0 = unlimited)
2219 216 100 100     1253 if ($self->{max_body_size} && defined $request->{content_length}) {
2220 6 100       46 if ($request->{content_length} > $self->{max_body_size}) {
2221 1         6 $self->_handle_disconnect('body_too_large');
2222 1         6 $self->_send_error_response(413, 'Payload Too Large');
2223 1         6 $self->_close;
2224 1         33 return;
2225             }
2226             }
2227              
2228             # Check if this is a WebSocket upgrade request
2229 215         965 my $is_websocket = $self->_is_websocket_upgrade($request);
2230              
2231             # Check if this is an SSE request
2232 215   100     1021 my $is_sse = !$is_websocket && $self->_is_sse_request($request);
2233              
2234             # Handle the request - store the Future to prevent "lost future" warning
2235 215         509 $self->{handling_request} = 1;
2236 215         889 $self->{request_start} = [gettimeofday];
2237 215         476 $self->{current_request} = $request; # Store for access logging
2238              
2239 215 100       628 if ($is_websocket) {
    100          
2240 22         188 $self->{request_future} = $self->_handle_websocket_request($request);
2241             } elsif ($is_sse) {
2242 21         82 $self->{request_future} = $self->_handle_sse_request($request);
2243             } else {
2244             # Start stall timer for HTTP requests (WebSocket/SSE have their own handling)
2245 172         1029 $self->_start_stall_timer;
2246 172         632 $self->{request_future} = $self->_handle_request($request);
2247             }
2248              
2249             # Use adopt_future for proper error tracking instead of retain
2250             # This ensures errors are propagated to the server's error handling
2251 215         14829 $self->{server}->adopt_future($self->{request_future});
2252             }
2253              
2254             sub _is_websocket_upgrade {
2255 215     215   418 my ($self, $request) = @_;
2256              
2257             # Check for WebSocket upgrade headers
2258 215         320 my $has_upgrade = 0;
2259 215         329 my $has_connection_upgrade = 0;
2260 215         335 my $has_ws_key = 0;
2261              
2262 215         336 for my $header (@{$request->{headers}}) {
  215         658  
2263 658         1174 my ($name, $value) = @$header;
2264 658 100 66     2209 if ($name eq 'upgrade' && lc($value) eq 'websocket') {
    100          
    100          
2265 22         39 $has_upgrade = 1;
2266             }
2267             elsif ($name eq 'connection') {
2268             # Connection header can have multiple values
2269 193 100       964 $has_connection_upgrade = 1 if lc($value) =~ /upgrade/;
2270             }
2271             elsif ($name eq 'sec-websocket-key') {
2272 22         45 $has_ws_key = 1;
2273             }
2274             }
2275              
2276 215   66     797 return $has_upgrade && $has_connection_upgrade && $has_ws_key;
2277             }
2278              
2279             sub _is_sse_request {
2280 193     193   372 my ($self, $request) = @_;
2281              
2282             # SSE detection per spec:
2283             # - Accept header includes text/event-stream
2284             # - Request has not been upgraded to WebSocket (already checked)
2285             # Note: SSE works with any HTTP method (GET, POST, etc.) to support
2286             # modern patterns like htmx 4 and datastar using fetch-event-source
2287              
2288 193         277 for my $header (@{$request->{headers}}) {
  193         476  
2289 516         844 my ($name, $value) = @$header;
2290 516 100       1005 if ($name eq 'accept') {
2291             # Check if Accept header includes text/event-stream
2292 21 50       135 return 1 if $value =~ m{text/event-stream};
2293             }
2294             }
2295              
2296 172         452 return 0;
2297             }
2298              
2299 172     172   279 async sub _handle_request {
2300 172         373 my ($self, $request) = @_;
2301              
2302 172         715 my $scope = $self->_create_scope($request);
2303 172         665 my $receive = $self->_create_receive($request);
2304 172         667 my $send = $self->_create_send($request);
2305              
2306 172         378 eval {
2307 172         942 await $self->{app}->($scope, $receive, $send);
2308             };
2309              
2310 171 100       13252 if (my $error = $@) {
2311             # Handle application error - always close connection after exception
2312             # If response already started, we can't send error page (3.17)
2313 15 100       80 if ($self->{response_started}) {
2314 2         10 $self->_flush_pending_headers; # don't lose a started response's headers
2315 2         447 warn "PAGI application error (after response started): $error\n";
2316             } else {
2317 13         77 $self->_send_error_response(500, "Internal Server Error");
2318 13         1144 warn "PAGI application error: $error\n";
2319             }
2320             # Write access log before closing
2321 15         123 $self->_write_access_log;
2322             # Notify server that request completed (for max_requests tracking)
2323 15 50       179 $self->{server}->_on_request_complete if $self->{server};
2324             # Always close connection after exception (3.2) - don't try keep-alive
2325 15         63 $self->_handle_disconnect('server_error');
2326 15         72 $self->_close;
2327 15         1318 return;
2328             }
2329              
2330             # The application returned without starting a response. An incomplete
2331             # response is a protocol error: if the client is still connected, synthesize
2332             # a 500; either way do not keep-alive a connection on which no response was
2333             # written (that would hang the client, which is waiting for a response). A
2334             # response that was started but not completed is handled by the body-framing
2335             # and keep-alive logic below.
2336 156 100       543 if (!$self->{response_started}) {
2337 1 50       6 unless ($self->{closed}) {
2338 1         17 warn "PAGI application returned without starting a response\n";
2339 1         13 $self->_send_error_response(500, "Internal Server Error");
2340             }
2341 1         4 $self->_write_access_log;
2342 1 50       9 $self->{server}->_on_request_complete if $self->{server};
2343 1         4 $self->_handle_disconnect('server_error');
2344 1         4 $self->_close;
2345 1         64 return;
2346             }
2347              
2348             # Flush any headers buffered by response.start that were never paired with a
2349             # body write (a started-but-bodyless response).
2350 155         706 $self->_flush_pending_headers;
2351              
2352             # Write access log entry
2353 155         635 $self->_write_access_log;
2354              
2355             # Notify server that request completed (for max_requests tracking)
2356 155 50       1695 $self->{server}->_on_request_complete if $self->{server};
2357              
2358             # Stop stall timer - request completed successfully
2359 155         700 $self->_stop_stall_timer;
2360              
2361             # Request finished cleanly: fire on_complete (not on_disconnect) on the
2362             # HTTP connection-state object. Must happen on BOTH the keep-alive and
2363             # close paths, and before the keep-alive branch clears the state below.
2364             # Once marked complete, the non-keep-alive _handle_disconnect_and_close
2365             # call below no-ops the state transition, so on_disconnect never fires for
2366             # a completed request.
2367 155 50       523 if (my $conn_state = $self->{current_connection_state}) {
2368 155         976 $conn_state->_mark_complete;
2369             }
2370              
2371             # Determine if we should keep the connection alive
2372 155         630 my $keep_alive = $self->_should_keep_alive($request);
2373              
2374 155 100       394 if ($keep_alive) {
2375             # Reset for next request
2376 142         276 $self->{handling_request} = 0;
2377 142         260 $self->{response_started} = 0;
2378 142         244 $self->{_resp_pending} = undef;
2379 142         217 $self->{response_status} = undef;
2380 142         228 $self->{_response_size} = 0;
2381 142         309 $self->{request_start} = undef;
2382 142         241 $self->{current_request} = undef;
2383 142         224 $self->{request_future} = undef;
2384 142         233 $self->{current_connection_state} = undef; # Clear for next request
2385 142         213 $self->{current_transport_state} = undef; # New request gets a fresh handle
2386              
2387             # Check if there's more data in the buffer (pipelining)
2388 142 100       8557 if (length($self->{buffer}) > 0) {
2389 1         11 $self->_try_handle_request;
2390             }
2391             } else {
2392             # Not keeping alive - close connection
2393 13         110 $self->_handle_disconnect_and_close('request_complete');
2394             }
2395             }
2396              
2397             sub _should_keep_alive {
2398 155     155   353 my ($self, $request) = @_;
2399              
2400 155   50     738 my $http_version = $request->{http_version} // '1.1';
2401              
2402             # Check for Connection header
2403 155         283 my $connection_header;
2404 155         229 for my $header (@{$request->{headers}}) {
  155         376  
2405 279 100       733 if ($header->[0] eq 'connection') {
2406 146         395 $connection_header = lc($header->[1]);
2407 146         314 last;
2408             }
2409             }
2410              
2411             # HTTP/1.1: keep-alive by default unless Connection: close
2412 155 100       437 if ($http_version eq '1.1') {
2413 152 100 100     1047 return 0 if $connection_header && $connection_header =~ /close/;
2414 141         358 return 1;
2415             }
2416              
2417             # HTTP/1.0: close by default unless Connection: keep-alive
2418 3 50       11 if ($http_version eq '1.0') {
2419 3 100 66     15 return 1 if $connection_header && $connection_header =~ /keep-alive/;
2420 2         6 return 0;
2421             }
2422              
2423             # Unknown version: close connection
2424 0         0 return 0;
2425             }
2426              
2427             sub _create_scope {
2428 172     172   474 my ($self, $request) = @_;
2429              
2430             # Create connection state object for disconnect tracking
2431             # Uses lazy Future creation - Future only allocated if disconnect_future() is called
2432 172         2512 my $connection_state = PAGI::Server::ConnectionState->new(
2433             connection => $self,
2434             );
2435 172         469 $self->{current_connection_state} = $connection_state;
2436              
2437             my $scope = {
2438             type => 'http',
2439             pagi => {
2440             version => '0.3',
2441             spec_version => '0.3',
2442             },
2443             http_version => $request->{http_version},
2444             method => $request->{method},
2445             scheme => $self->_get_scheme,
2446             path => $request->{path},
2447             raw_path => $request->{raw_path},
2448             query_string => $request->{query_string},
2449             root_path => '',
2450             headers => $request->{headers},
2451             (defined $self->{client_host}
2452             ? (client => [$self->{client_host}, $self->{client_port}])
2453             : ()
2454             ),
2455             server => [$self->{server_host}, $self->{server_port}],
2456             # Optimized: avoid hash copy when state is empty (common case)
2457 172         1051 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  3         12  
2458             extensions => $self->_get_extensions_for_scope,
2459             # Connection state for non-destructive disconnect detection
2460             'pagi.connection' => $connection_state,
2461             # Outbound flow-control introspection (buffered_amount, watermarks,
2462             # on_high_water/on_drain). Stashed on the connection too, so the send
2463             # path can poke _check_watermarks after each write.
2464 172 100       1495 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    100          
2465             };
2466              
2467 172         517 return $scope;
2468             }
2469              
2470             sub _create_receive {
2471 172     172   378 my ($self, $request) = @_;
2472              
2473 172         357 my $content_length = $request->{content_length};
2474 172   50     559 my $is_chunked = $request->{chunked} // 0;
2475 172   50     442 my $expect_continue = $request->{expect_continue} // 0;
2476 172         303 my $continue_sent = 0;
2477 172         240 my $body_complete = 0;
2478 172         221 my $bytes_read = 0;
2479 172         252 my $chunk_size = 65536; # 64KB chunks for large bodies
2480              
2481             # For requests without Content-Length and not chunked, treat as no body
2482 172   100     939 my $has_body = defined($content_length) && $content_length > 0 || $is_chunked;
2483              
2484 172         314 weaken(my $weak_self = $self);
2485              
2486             # Return a wrapper that tracks the Future from the async receive
2487             return sub {
2488 57 50   57   1198 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
2489 57 50       173 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
2490              
2491             # The actual async implementation
2492 57         118 my $future = (async sub {
2493 57 50       211 return { type => 'http.disconnect' } unless $weak_self;
2494 57 50       130 return { type => 'http.disconnect' } if $weak_self->{closed};
2495              
2496             # Check queue first - events from disconnect handler
2497 57 50       89 if (@{$weak_self->{receive_queue}}) {
  57         221  
2498 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2499             }
2500              
2501             # If body is already complete, wait for disconnect
2502 57 100       145 if ($body_complete) {
2503 1 50       2 if (!$weak_self->{receive_pending}) {
2504 1         2 $weak_self->{receive_pending} = Future->new;
2505             }
2506              
2507 1 50       6 if ($weak_self->{closed}) {
2508 0         0 $weak_self->{receive_pending} = undef;
2509 0         0 return { type => 'http.disconnect' };
2510             }
2511              
2512 1         3 my $result = await $weak_self->{receive_pending};
2513             # receive_pending may be completed with a value (disconnect event)
2514             # or just done() as a signal
2515 1 50       80 return $result if ref $result eq 'HASH';
2516             # If no value, check queue
2517 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2518 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2519             }
2520 0         0 return { type => 'http.disconnect' };
2521             }
2522              
2523             # For requests without body, return empty body immediately
2524 56 100       137 if (!$has_body) {
2525 39         74 $body_complete = 1;
2526             return {
2527 39         610 type => 'http.request',
2528             body => '',
2529             more => 0,
2530             };
2531             }
2532              
2533             # Send 100 Continue if client expects it (before reading body)
2534 17 50 33     69 if ($expect_continue && !$continue_sent) {
2535 0         0 $continue_sent = 1;
2536 0         0 $weak_self->{stream}->write($weak_self->{protocol}->serialize_continue);
2537             }
2538              
2539             # Handle chunked Transfer-Encoding
2540 17 100       37 if ($is_chunked) {
2541             # Wait for data if buffer is empty
2542 1   33     4 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2543 0 0       0 if (!$weak_self->{receive_pending}) {
2544 0         0 $weak_self->{receive_pending} = Future->new;
2545             }
2546 0         0 await $weak_self->{receive_pending};
2547 0         0 $weak_self->{receive_pending} = undef;
2548              
2549             # Check queue after waiting
2550 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2551 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2552             }
2553             }
2554              
2555             # Try to parse chunked data
2556 1         6 my ($data, $consumed, $complete) = $weak_self->{protocol}->parse_chunked_body($weak_self->{buffer});
2557              
2558             # Check for parse error (invalid chunk size)
2559 1 0 33     3 if (ref($data) eq 'HASH' && $data->{error}) {
2560 0         0 $weak_self->_handle_disconnect('protocol_error');
2561 0   0     0 $weak_self->_send_error_response($data->{error}, $data->{message} // 'Bad Request');
2562 0         0 $weak_self->_close;
2563 0         0 return { type => 'http.disconnect' };
2564             }
2565              
2566 1 50       2 if ($consumed > 0) {
2567 1         2 substr($weak_self->{buffer}, 0, $consumed) = '';
2568              
2569             # Track total bytes read for max_body_size check
2570 1   50     3 $bytes_read += length($data // '');
2571              
2572             # Check max_body_size for chunked requests (0 = unlimited)
2573 1 50 33     4 if ($weak_self->{max_body_size} && $bytes_read > $weak_self->{max_body_size}) {
2574             # Body too large - close connection
2575 0         0 $weak_self->_send_error_response(413, 'Payload Too Large');
2576 0         0 $weak_self->_handle_disconnect('body_too_large');
2577 0         0 $weak_self->_close;
2578 0         0 return { type => 'http.disconnect' };
2579             }
2580              
2581 1 50       3 if ($complete) {
2582 1         2 $body_complete = 1;
2583             }
2584              
2585             return {
2586 1 50 50     12 type => 'http.request',
2587             body => $data // '',
2588             more => $complete ? 0 : 1,
2589             };
2590             }
2591              
2592             # Need more data - wait for it
2593 0 0       0 if (!$weak_self->{receive_pending}) {
2594 0         0 $weak_self->{receive_pending} = Future->new;
2595             }
2596 0         0 await $weak_self->{receive_pending};
2597 0         0 $weak_self->{receive_pending} = undef;
2598              
2599             # Recursive call to re-process - but we can't use __SUB__ in nested async
2600             # Just return disconnect if closed
2601 0 0       0 return { type => 'http.disconnect' } if $weak_self->{closed};
2602             # This shouldn't happen often - caller should retry
2603 0         0 return { type => 'http.request', body => '', more => 1 };
2604             }
2605              
2606             # Handle Content-Length based body reading
2607 16         34 my $remaining = $content_length - $bytes_read;
2608              
2609 16 50       31 if ($remaining <= 0) {
2610 0         0 $body_complete = 1;
2611             return {
2612 0         0 type => 'http.request',
2613             body => '',
2614             more => 0,
2615             };
2616             }
2617              
2618             # Wait for data if buffer is empty
2619 16   66     66 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2620 12 50       22 if (!$weak_self->{receive_pending}) {
2621 12         30 $weak_self->{receive_pending} = Future->new;
2622             }
2623 12         96 await $weak_self->{receive_pending};
2624 12         681 $weak_self->{receive_pending} = undef;
2625              
2626             # Check queue after waiting
2627 12 50       12 if (@{$weak_self->{receive_queue}}) {
  12         37  
2628 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2629             }
2630             }
2631              
2632             # Return disconnect if closed while waiting
2633 16 50 33     39 if ($weak_self->{closed} && length($weak_self->{buffer}) == 0) {
2634 0         0 return { type => 'http.disconnect' };
2635             }
2636              
2637             # Read up to chunk_size or remaining bytes, whichever is smaller
2638 16 100       35 my $to_read = $remaining < $chunk_size ? $remaining : $chunk_size;
2639 16 100       36 $to_read = length($weak_self->{buffer}) if length($weak_self->{buffer}) < $to_read;
2640              
2641 16         64 my $body = substr($weak_self->{buffer}, 0, $to_read, '');
2642 16         22 $bytes_read += length($body);
2643              
2644             # Check if we've read all the body
2645 16 100       42 my $more = ($bytes_read < $content_length) ? 1 : 0;
2646              
2647 16 100       47 if (!$more) {
2648 4         9 $body_complete = 1;
2649             }
2650              
2651             return {
2652 16         154 type => 'http.request',
2653             body => $body,
2654             more => $more,
2655             };
2656 57         715 })->();
2657              
2658             # Track this Future so we can cancel it on close
2659 57         3427 push @{$weak_self->{receive_futures}}, $future;
  57         177  
2660              
2661             # Clean up completed futures from the list
2662 57         151 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  57         359  
  68         218  
  57         129  
2663              
2664 57         311 return $future;
2665 172         1175 };
2666             }
2667              
2668             sub _create_send {
2669 172     172   414 my ($self, $request) = @_;
2670              
2671 172         282 my $chunked = 0;
2672 172         226 my $response_started = 0;
2673 172         307 my $expects_trailers = 0;
2674 172         288 my $body_complete = 0;
2675 172   50     777 my $is_head_request = ($request->{method} // '') eq 'HEAD';
2676 172   50     511 my $http_version = $request->{http_version} // '1.1';
2677 172         356 my $is_http10 = ($http_version eq '1.0');
2678              
2679             # Check if HTTP/1.0 client requested keep-alive
2680 172         258 my $client_wants_keepalive = 0;
2681 172 100       431 if ($is_http10) {
2682 3         6 for my $h (@{$request->{headers}}) {
  3         9  
2683 3 100 66     13 if ($h->[0] eq 'connection' && lc($h->[1]) =~ /keep-alive/) {
2684 1         1 $client_wants_keepalive = 1;
2685 1         7 last;
2686             }
2687             }
2688             }
2689              
2690 172         355 weaken(my $weak_self = $self);
2691              
2692 328     328   710153 return async sub {
2693 328         545 my ($event) = @_;
2694 328 50       682 return Future->done unless $weak_self;
2695 328 50       755 return Future->done if $weak_self->{closed};
2696              
2697             # Reset stall timer on write activity
2698 328         1099 $weak_self->_reset_stall_timer;
2699              
2700 328   50     715 my $type = $event->{type} // '';
2701              
2702             # Dev-mode event validation (PAGI spec compliance)
2703 328 50       737 if ($weak_self->{validate_events}) {
2704 0         0 require PAGI::Server::EventValidator;
2705 0         0 PAGI::Server::EventValidator::validate_http_send($event);
2706             }
2707              
2708 328 100       1025 if ($type eq 'http.response.start') {
    100          
    100          
    50          
2709 158 50       405 return if $response_started;
2710 158         250 $response_started = 1;
2711 158         314 $weak_self->{response_started} = 1;
2712             $weak_self->{current_connection_state}->_mark_response_started
2713 158 50       1159 if $weak_self->{current_connection_state};
2714 158   50     434 $weak_self->{response_status} = $event->{status} // 200; # Track for logging
2715 158   100     818 $expects_trailers = $event->{trailers} // 0;
2716              
2717 158   50     358 my $status = $event->{status} // 200;
2718 158   50     403 my $headers = $event->{headers} // [];
2719              
2720             # Check if we need chunked encoding (no Content-Length)
2721 158         252 my $has_content_length = 0;
2722 158         357 for my $h (@$headers) {
2723 162 100       667 if (lc($h->[0]) eq 'content-length') {
2724 16         18 $has_content_length = 1;
2725 16         27 last;
2726             }
2727             }
2728              
2729             # Add Date header
2730 158         354 my @final_headers = @$headers;
2731 158         1041 push @final_headers, ['date', $weak_self->{protocol}->format_date];
2732              
2733             # For HEAD requests, don't use chunked encoding (no body will be sent)
2734             # For HTTP/1.0, don't use chunked encoding - use Connection: close instead
2735 158 100 100     856 if ($is_head_request || $is_http10) {
2736 4         7 $chunked = 0;
2737 4 100       11 if ($is_http10) {
2738 3 100       9 if (!$has_content_length) {
    100          
2739             # No Content-Length means we can't do keep-alive
2740 1         19 push @final_headers, ['connection', 'close'];
2741             } elsif ($client_wants_keepalive) {
2742             # HTTP/1.0 client requested keep-alive and we can honor it
2743             # Must explicitly acknowledge with Connection: keep-alive
2744 1         7 push @final_headers, ['connection', 'keep-alive'];
2745             }
2746             }
2747             } else {
2748 154         402 $chunked = !$has_content_length;
2749             }
2750              
2751             my $response = $weak_self->{protocol}->serialize_response_start(
2752 158         939 $status, \@final_headers, $chunked, $http_version
2753             );
2754              
2755             # Buffer the headers instead of writing them now; they are flushed
2756             # together with the first body write (or at finalization). This
2757             # coalesces the common "start + complete body" case into a single
2758             # stream write instead of one per headers/chunk/terminator.
2759 157         660 $weak_self->{_resp_pending} = $response;
2760             }
2761             elsif ($type eq 'http.response.body') {
2762 166 50       361 return unless $response_started;
2763 166 100       374 return if $body_complete;
2764              
2765             # For HEAD requests, suppress the body but track completion
2766 165 100       428 if ($is_head_request) {
2767 1   50     14 my $more = $event->{more} // 0;
2768             # HEAD has headers but no body, so flush the buffered headers now.
2769 1         5 $weak_self->_flush_pending_headers;
2770 1 50       345 if (!$more) {
2771 1         4 $body_complete = 1;
2772             }
2773 1         10 return; # Don't send any body for HEAD
2774             }
2775              
2776             # --- BACKPRESSURE CHECK ---
2777             # Wait for buffer to drain if we're above high watermark
2778             # This prevents unbounded memory growth with slow clients
2779 164 50       570 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
2780 0         0 await $weak_self->_wait_for_drain;
2781             # Re-check connection state after await
2782 0 0       0 return Future->done unless $weak_self;
2783 0 0       0 return Future->done if $weak_self->{closed};
2784             }
2785             # --- END BACKPRESSURE CHECK ---
2786              
2787             # Determine body source: body, file, or fh (mutually exclusive)
2788 164         338 my $body = $event->{body};
2789 164         332 my $file = $event->{file};
2790 164         318 my $fh = $event->{fh};
2791 164   100     713 my $offset = $event->{offset} // 0;
2792 164         309 my $length = $event->{length};
2793              
2794 164 100       497 if (defined $file) {
    100          
2795             # File path response - stream from file (async, non-blocking)
2796             # File responses are implicitly complete (more is ignored)
2797 10         22 $weak_self->_flush_pending_headers; # headers before the file body
2798 10         1989 await $weak_self->_send_file_response($file, $offset, $length, $chunked);
2799 10         1652 $body_complete = 1;
2800             }
2801             elsif (defined $fh) {
2802             # Filehandle response - stream from handle (async, non-blocking)
2803             # Filehandle responses are implicitly complete (more is ignored)
2804 6         13 $weak_self->_flush_pending_headers; # headers before the fh body
2805 6         1194 await $weak_self->_send_fh_response($fh, $offset, $length, $chunked);
2806 6         547 $body_complete = 1;
2807             }
2808             else {
2809             # Traditional body response
2810 148   50     402 $body //= '';
2811 148   100     396 my $more = $event->{more} // 0;
2812              
2813 148         356 $weak_self->{_response_size} += length($body);
2814              
2815             # Coalesce any buffered headers, the body (chunk framing if
2816             # chunked), and the final terminator into a single stream write.
2817             # The common start + complete-body response becomes one write
2818             # rather than three.
2819 148         229 my $out = $weak_self->{_resp_pending};
2820 148 100       386 $out = '' unless defined $out;
2821 148         327 $weak_self->{_resp_pending} = undef;
2822              
2823 148 100       321 if ($chunked) {
2824 142 50       358 if (length $body) {
2825 142         581 my $len = sprintf("%x", length($body));
2826 142         399 $out .= "$len\r\n$body\r\n";
2827             }
2828 142 100 100     791 if (!$more && !$expects_trailers) {
2829 131         296 $out .= "0\r\n\r\n";
2830             }
2831             }
2832             else {
2833 6         12 $out .= $body;
2834             }
2835              
2836 148 50       903 $weak_self->{stream}->write($out) if length $out;
2837 148         35815 $weak_self->_notify_transport_write;
2838              
2839             # Handle completion for body responses
2840 148 100       343 if (!$more) {
2841 138         348 $body_complete = 1;
2842             }
2843             }
2844             }
2845             elsif ($type eq 'http.response.trailers') {
2846 1 50       3 return unless $response_started;
2847 1 50       2 return unless $expects_trailers;
2848 1 50       3 return unless $chunked; # Trailers only work with chunked encoding
2849              
2850 1   50     3 my $trailer_headers = $event->{headers} // [];
2851              
2852             # Send final chunk + trailers (prepend any still-buffered headers).
2853 1   50     5 my $trailers = $weak_self->{_resp_pending} // '';
2854 1         3 $weak_self->{_resp_pending} = undef;
2855 1         3 $trailers .= "0\r\n";
2856 1         2 for my $header (@$trailer_headers) {
2857 1         2 my ($name, $value) = @$header;
2858 1         4 $name = _validate_header_name($name);
2859 1         5 $value = _validate_header_value($value);
2860 1         3 $trailers .= "$name: $value\r\n";
2861             }
2862 1         1 $trailers .= "\r\n";
2863              
2864 1         4 $weak_self->{stream}->write($trailers);
2865 1         77 $body_complete = 1;
2866             }
2867             elsif ($type eq 'http.fullflush') {
2868             # Fullflush extension - force immediate TCP buffer flush
2869             # Per spec: servers that don't advertise the extension must reject
2870 3 100       9 unless (exists $weak_self->{extensions}{fullflush}) {
2871 1         11 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
2872 1         11 die "Extension not enabled: fullflush\n";
2873             }
2874              
2875             # Force flush by ensuring TCP_NODELAY and flushing any pending writes
2876 2         8 my $handle = $weak_self->{stream}->write_handle;
2877 2 50 33     15 if ($handle && $handle->can('setsockopt')) {
2878             # Ensure TCP_NODELAY is set to disable Nagle buffering
2879 2         11 require Socket;
2880 2         6 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
2881             }
2882              
2883             # In IO::Async, writes are queued and sent when the event loop allows.
2884             # The above TCP_NODELAY ensures no Nagle buffering delays.
2885             # For this reference implementation, we return immediately as the
2886             # write buffer will be flushed by the event loop.
2887             }
2888             else {
2889             # Per PAGI spec: servers must raise exceptions for unrecognized event types
2890 0         0 _unrecognized_event_type($type, 'http');
2891             }
2892              
2893 324         2502 return;
2894 172         2734 };
2895             }
2896              
2897             # Flush any response headers buffered by http.response.start that were not yet
2898             # paired with a body write (HEAD/file/fh paths, started-but-incomplete responses).
2899             sub _flush_pending_headers {
2900 174     174   375 my ($self) = @_;
2901 174         348 my $pending = $self->{_resp_pending};
2902 174 100 66     647 return unless defined $pending && length $pending;
2903 18         41 $self->{_resp_pending} = undef;
2904 18         66 $self->{stream}->write($pending);
2905             }
2906              
2907             sub _send_error_response {
2908 23     23   74 my ($self, $status, $message) = @_;
2909              
2910 23 50       87 return if $self->{closed};
2911 23 50       82 return if $self->{response_started};
2912              
2913 23         70 my $body = $message;
2914             my $headers = [
2915             ['content-type', 'text/plain'],
2916             ['content-length', length($body)],
2917 23         254 ['date', $self->{protocol}->format_date],
2918             ];
2919              
2920 23         169 my $response = $self->{protocol}->serialize_response_start($status, $headers, 0);
2921 23         64 $response .= $body;
2922              
2923 23         167 $self->{stream}->write($response);
2924 23         7146 $self->{response_started} = 1;
2925             # A server-synthesized response is still "this request's response started".
2926             $self->{current_connection_state}->_mark_response_started
2927 23 100       160 if $self->{current_connection_state};
2928 23         124 $self->{response_status} = $status; # Track for logging
2929             }
2930              
2931             sub _write_access_log {
2932 213     213   445 my ($self) = @_;
2933              
2934 213 100       639 return unless $self->{access_log};
2935 205 50       616 return unless $self->{current_request};
2936              
2937 205         428 my $request = $self->{current_request};
2938              
2939             # Calculate request duration
2940 205         415 my $duration = 0;
2941 205 50       576 if ($self->{request_start}) {
2942 205         1435 $duration = tv_interval($self->{request_start});
2943             }
2944              
2945             # Per-second cached CLF timestamp
2946 205         3343 my $now = time();
2947 205 100       633 if ($now != $_cached_log_time) {
2948 77         164 $_cached_log_time = $now;
2949 77         391 my @gmt = gmtime($now);
2950 77         531 my @months = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
2951 77         766 $_cached_log_timestamp = sprintf("%02d/%s/%04d:%02d:%02d:%02d +0000",
2952             $gmt[3], $months[$gmt[4]], $gmt[5] + 1900,
2953             $gmt[2], $gmt[1], $gmt[0]);
2954             }
2955              
2956             my $info = {
2957             client_ip => $self->{client_host} // ($self->{transport_type} eq 'unix' ? 'unix' : '-'),
2958             timestamp => $_cached_log_timestamp,
2959             method => $request->{method} // '-',
2960             path => $request->{raw_path} // '/',
2961             query => $request->{query_string},
2962             http_version => $request->{http_version} // '1.1',
2963             status => $self->{response_status} // '-',
2964             size => $self->{_response_size} // 0,
2965             duration => $duration,
2966 205 50 66     4551 request_headers => $request->{headers} // [],
      50        
      50        
      50        
      50        
      50        
      50        
2967             };
2968              
2969 205         492 my $formatter = $self->{_access_log_formatter};
2970 205 50       451 if ($formatter) {
2971 205         303 print {$self->{access_log}} $formatter->($info), "\n";
  205         1046  
2972             }
2973             else {
2974             # Fallback (should not happen with properly initialized server)
2975 0         0 my $path = $info->{path};
2976 0         0 my $query = $info->{query};
2977 0 0 0     0 $path .= "?$query" if defined $query && length $query;
2978 0         0 print {$self->{access_log}} "$info->{client_ip} - - [$info->{timestamp}] \"$info->{method} $path\" $info->{status} $info->{duration}s\n";
  0         0  
2979             }
2980             }
2981              
2982             # Reasons passed to _handle_disconnect only for teardown after a clean finish
2983             # (the app has already returned). They are completions, not abnormal disconnects,
2984             # and must not surface as a disconnect reason to the application.
2985             my %COMPLETION_REASON = map { ($_ => 1) } qw(
2986             request_complete
2987             stream_complete
2988             session_complete
2989             );
2990              
2991             # Build the app-facing websocket.disconnect event for a server-detected close.
2992             # The code and reason come from the close the server initiated; the defaults are
2993             # the RFC 6455 "abnormal closure, no status received" pair (1006 / empty), used
2994             # when the connection dropped with no close handshake (timeout, TCP FIN).
2995             sub _ws_disconnect_event {
2996 36     36   57 my ($self) = @_;
2997             return {
2998             type => 'websocket.disconnect',
2999             code => $self->{ws_disconnect_code} // 1006,
3000 36   100     258 reason => $self->{ws_disconnect_reason} // '',
      100        
3001             };
3002             }
3003              
3004             sub _handle_disconnect {
3005 456     456   954 my ($self, $reason) = @_;
3006              
3007             # Idempotency guard - prevent duplicate disconnect handling
3008             # Multiple paths can trigger disconnect (timeout, protocol error, session end)
3009 456 100       1546 return if $self->{_disconnect_handled};
3010 284         600 $self->{_disconnect_handled} = 1;
3011              
3012             # Cancel any pending drain waiters (backpressure)
3013 284         1597 $self->_cancel_drain_waiters($reason);
3014              
3015             # Auto-detect server shutdown (PAGI spec compliance)
3016             # If no explicit reason and server is shutting down, use server_shutdown
3017 284 50 66     883 if (!$reason && $self->{server} && $self->{server}{shutting_down}) {
      33        
3018 0         0 $reason = 'server_shutdown';
3019             }
3020              
3021             # Default reason is client_closed (TCP FIN received)
3022 284   100     841 $reason //= 'client_closed';
3023              
3024             # A clean completion is not an abnormal disconnect: don't surface its reason.
3025 284         790 my $is_completion = $COMPLETION_REASON{$reason};
3026              
3027             # Mark HTTP connection state as disconnected (abnormal only).
3028             # Only for HTTP - WebSocket/SSE have their own patterns.
3029 284 50 66     1427 if ($self->{current_connection_state} && !$self->{websocket_mode} && !$self->{sse_mode}) {
      66        
3030 30 100       208 $self->{current_connection_state}->_mark_disconnected($reason)
3031             unless $is_completion;
3032             }
3033              
3034             # Record the abnormal reason so the WebSocket disconnect event reports it
3035             # (instead of the old empty string). SSE tracks its own reason at the
3036             # detection sites via sse_disconnect_reason.
3037 284 100 100     1195 if ($self->{websocket_mode} && !$is_completion) {
3038 16         39 $self->{ws_disconnect_reason} = $reason;
3039             }
3040              
3041             # Determine disconnect event type based on mode
3042 284         565 my $disconnect_event;
3043 284 100       1288 if ($self->{websocket_mode}) {
    100          
3044 18         56 $disconnect_event = $self->_ws_disconnect_event;
3045             } elsif ($self->{sse_mode}) {
3046             $disconnect_event = {
3047             type => 'sse.disconnect',
3048 21   100     147 reason => $self->{sse_disconnect_reason} // 'client_closed',
3049             };
3050             } else {
3051 245         1051 $disconnect_event = { type => 'http.disconnect' };
3052             }
3053              
3054             # Queue disconnect event (do this even if already closed)
3055 284         501 push @{$self->{receive_queue}}, $disconnect_event;
  284         892  
3056              
3057             # Complete any pending receive
3058 284 100 100     1246 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready) {
3059 16         124 $self->{receive_pending}->done($disconnect_event);
3060 16         4603 $self->{receive_pending} = undef;
3061             }
3062             }
3063              
3064             # Send a WebSocket close frame with status code and optional reason
3065             # Per RFC 6455 Section 7.4, common codes:
3066             # 1000 - Normal closure
3067             # 1007 - Invalid frame payload data (e.g., invalid UTF-8)
3068             # 1009 - Message too big
3069             # 1011 - Unexpected condition
3070             sub _send_close_frame {
3071 4     4   18 my ($self, $code, $reason) = @_;
3072 4   50     15 $reason //= '';
3073              
3074 4 50       13 return unless $self->{stream};
3075 4 50       13 return if $self->{close_sent};
3076              
3077             # Remember the wire code so the app-facing websocket.disconnect event reports
3078             # the same code the peer received, rather than the 1006 abnormal-close default.
3079 4         9 $self->{ws_disconnect_code} = $code;
3080              
3081 4         32 my $frame = Protocol::WebSocket::Frame->new(
3082             type => 'close',
3083             buffer => pack('n', $code) . $reason,
3084             );
3085              
3086 4         191 $self->{stream}->write($frame->to_bytes);
3087 4         1097 $self->{close_sent} = 1;
3088             }
3089              
3090             sub _close {
3091 458     458   2113502 my ($self) = @_;
3092              
3093 458 100       2372 return if $self->{closed};
3094 284         780 $self->{closed} = 1;
3095              
3096             # Cancel pending drain waiters early (before other cleanup)
3097 284         932 $self->_cancel_drain_waiters('connection closing');
3098              
3099             # Clean up HTTP/2 per-stream state
3100 284 50       994 if ($self->{h2_streams}) {
3101 284         465 for my $stream (values %{$self->{h2_streams}}) {
  284         1023  
3102 30 100 66     272 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
3103             my $event = $stream->{is_sse}
3104 9 100       110 ? { type => 'sse.disconnect', reason => 'client_closed' }
3105             : { type => 'http.disconnect' };
3106 9         52 $stream->{body_pending}->done($event);
3107             }
3108             # Release producers blocked on per-stream backpressure so they
3109             # don't hang on a connection that is going away.
3110 30         1786 $self->_h2_resolve_stream_drain_waiters($stream);
3111             # Drop (don't fire) the app's on_drain fires: the connection is going
3112             # away, not draining. Also break the $stream <-> transport_state cycle
3113             # so the stream state is freed when h2_streams is deleted below.
3114 30         88 $stream->{transport_drain_fires} = [];
3115 30         277 delete $stream->{transport_state};
3116             }
3117 284         1015 delete $self->{h2_streams};
3118             }
3119 284 100       993 if ($self->{h2_session}) {
3120 72         143 eval { $self->{h2_session}->terminate(0) };
  72         514  
3121 72         6346 delete $self->{h2_session};
3122             }
3123              
3124             # Clean up WebSocket frame parser to free memory immediately
3125 284         770 delete $self->{websocket_frame};
3126              
3127             # Remove from server's connection list (O(1) hash delete)
3128 284 50       997 if ($self->{server}) {
3129 284         1161 delete $self->{server}{connections}{refaddr($self)};
3130              
3131             # Signal drain complete if this was the last connection during shutdown
3132 284 100 100     1240 if ($self->{server}{shutting_down} &&
      100        
      66        
3133 137         961 keys %{$self->{server}{connections}} == 0 &&
3134             $self->{server}{drain_complete} &&
3135             !$self->{server}{drain_complete}->is_ready) {
3136 1         13 $self->{server}{drain_complete}->done;
3137             }
3138             }
3139              
3140             # Stop idle timer
3141 284         1683 $self->_stop_idle_timer;
3142              
3143             # Stop stall timer
3144 284         1325 $self->_stop_stall_timer;
3145              
3146             # Stop WS/SSE idle timers
3147 284         1997 $self->_stop_ws_idle_timer;
3148 284         1209 $self->_stop_sse_idle_timer;
3149              
3150             # Stop keepalive timers
3151 284         1283 $self->_stop_ws_keepalive;
3152 284         1380 $self->_stop_sse_keepalive;
3153              
3154             # Note: _close is resource cleanup ONLY. Callers should use
3155             # _handle_disconnect_and_close() which handles both protocol
3156             # notification and cleanup.
3157              
3158             # Determine disconnect event type based on mode
3159 284         475 my $disconnect_event;
3160 284 100       1095 if ($self->{websocket_mode}) {
    100          
3161 18         42 $disconnect_event = $self->_ws_disconnect_event;
3162             } elsif ($self->{sse_mode}) {
3163             $disconnect_event = {
3164             type => 'sse.disconnect',
3165 21   100     113 reason => $self->{sse_disconnect_reason} // 'client_closed',
3166             };
3167             } else {
3168 245         820 $disconnect_event = { type => 'http.disconnect' };
3169             }
3170              
3171             # Cancel any tracked receive Futures that are still pending
3172 284         518 for my $future (@{$self->{receive_futures}}) {
  284         822  
3173 18 50       97 if (!$future->is_ready) {
3174             # Complete with disconnect event instead of cancelling
3175             # This allows the async sub to complete cleanly
3176 0         0 $future->done($disconnect_event);
3177             }
3178             }
3179 284         804 $self->{receive_futures} = [];
3180              
3181 284 50       1032 if ($self->{stream}) {
3182 284         1442 $self->{stream}->close_when_empty;
3183             }
3184             }
3185              
3186             # Combined disconnect and close - use this from callbacks where $weak_self may
3187             # become undefined after _handle_disconnect completes its Future callbacks.
3188             # This method holds a strong reference to $self throughout the operation.
3189             sub _handle_disconnect_and_close {
3190 434     434   1241 my ($self, $reason) = @_;
3191 434         1869 $self->_handle_disconnect($reason);
3192 434         1710 $self->_close;
3193             }
3194              
3195             #
3196             # TLS Support Methods
3197             #
3198              
3199             sub _extract_tls_info {
3200 45     45   76 my ($self) = @_;
3201              
3202 45         137 my $stream = $self->{stream};
3203 45         168 my $handle = $stream->read_handle;
3204              
3205             # Check if handle is an IO::Socket::SSL
3206 45 50 33     554 return unless $handle && $handle->isa('IO::Socket::SSL');
3207              
3208 45         303 my $tls_info = {
3209             server_cert => undef,
3210             client_cert_chain => [],
3211             client_cert_name => undef,
3212             client_cert_error => undef,
3213             tls_version => undef,
3214             cipher_suite => undef,
3215             };
3216              
3217             # Get TLS version - IO::Socket::SSL returns something like 'TLSv1_3'
3218 45 50       282 if (my $version_str = $handle->get_sslversion) {
3219             # Map version string to numeric value per TLS spec
3220 45         1097 my %version_map = (
3221             'SSLv3' => 0x0300,
3222             'TLSv1' => 0x0301,
3223             'TLSv1_1' => 0x0302,
3224             'TLSv1_2' => 0x0303,
3225             'TLSv1_3' => 0x0304,
3226             );
3227 45         138 $tls_info->{tls_version} = $version_map{$version_str};
3228             }
3229              
3230             # Cipher suite (numeric IANA id). Net::SSLeay/IO::Socket::SSL expose only the
3231             # cipher *name*, not the 16-bit id the spec asks for. For TLS 1.3 the OpenSSL
3232             # name IS the IANA name and the registry is frozen at five suites, so we map
3233             # those exactly. For TLS 1.2 the names are OpenSSL-specific (a large, shifting
3234             # set), so we leave cipher_suite undef -- the spec permits undef when the
3235             # server cannot determine the value.
3236 45 50       161 if (my $cipher_name = $handle->get_cipher) {
3237 45         845 my %tls13_cipher_suites = (
3238             'TLS_AES_128_GCM_SHA256' => 0x1301,
3239             'TLS_AES_256_GCM_SHA384' => 0x1302,
3240             'TLS_CHACHA20_POLY1305_SHA256' => 0x1303,
3241             'TLS_AES_128_CCM_SHA256' => 0x1304,
3242             'TLS_AES_128_CCM_8_SHA256' => 0x1305,
3243             );
3244             $tls_info->{cipher_suite} = $tls13_cipher_suites{$cipher_name}
3245 45 50       229 if exists $tls13_cipher_suites{$cipher_name};
3246             }
3247              
3248             # Get server certificate (our certificate)
3249             # IO::Socket::SSL uses sock_certificate() for the server's own cert
3250 45         84 eval {
3251 45         195 my $cert = $handle->sock_certificate;
3252 45 50       638 if ($cert) {
3253 45         408 require Net::SSLeay;
3254 45         1147 $tls_info->{server_cert} = Net::SSLeay::PEM_get_string_X509($cert);
3255             }
3256             };
3257 45 50       209 if ($@) {
3258 0         0 warn "TLS server certificate extraction error: $@\n";
3259             }
3260              
3261             # Get client certificate if provided
3262 45         72 eval {
3263 45         197 my $client_cert = $handle->peer_certificate;
3264 45 100       2027 if ($client_cert) {
3265 8         116 require Net::SSLeay;
3266              
3267             # Get client cert chain
3268 8         20 my @chain;
3269 8         108 push @chain, Net::SSLeay::PEM_get_string_X509($client_cert);
3270              
3271             # Try to get additional certs in chain
3272 8 50       24 if (my $ssl = $handle->_get_ssl_object) {
3273 8         136 my $chain_obj = Net::SSLeay::get_peer_cert_chain($ssl);
3274 8 50       28 if ($chain_obj) {
3275 0         0 for my $i (0 .. Net::SSLeay::sk_X509_num($chain_obj) - 1) {
3276 0         0 my $cert = Net::SSLeay::sk_X509_value($chain_obj, $i);
3277 0 0       0 push @chain, Net::SSLeay::PEM_get_string_X509($cert) if $cert;
3278             }
3279             }
3280             }
3281 8         20 $tls_info->{client_cert_chain} = \@chain;
3282              
3283             # Get client cert DN (Subject)
3284 8         160 my $subject = Net::SSLeay::X509_NAME_oneline(
3285             Net::SSLeay::X509_get_subject_name($client_cert)
3286             );
3287 8 50       68 $tls_info->{client_cert_name} = $subject if $subject;
3288              
3289             # Check for verification errors
3290 8         52 my $verify_result = $handle->get_sslversion_int;
3291             # Actually, use verify_result
3292 8 50       80 if (my $ssl = $handle->_get_ssl_object) {
3293 8         96 my $result = Net::SSLeay::get_verify_result($ssl);
3294 8 50       52 if ($result != 0) { # X509_V_OK = 0
3295 0         0 $tls_info->{client_cert_error} = Net::SSLeay::X509_verify_cert_error_string($result);
3296             }
3297             }
3298             }
3299             };
3300 45 50       108 if ($@) {
3301 0         0 warn "TLS client certificate extraction error: $@\n";
3302             }
3303              
3304 45         102 $self->{tls_info} = $tls_info;
3305             }
3306              
3307             sub _get_scheme {
3308 193     193   441 my ($self) = @_;
3309              
3310 193 100       1785 return $self->{tls_enabled} ? 'https' : 'http';
3311             }
3312              
3313             sub _get_ws_scheme {
3314 39     39   119 my ($self) = @_;
3315              
3316 39 50       479 return $self->{tls_enabled} ? 'wss' : 'ws';
3317             }
3318              
3319             sub _get_extensions_for_scope {
3320 287     287   718 my ($self) = @_;
3321              
3322 287         509 my %extensions = %{$self->{extensions}};
  287         1010  
3323              
3324             # Add TLS info to extensions if this is a TLS connection
3325 287 100 66     1628 if ($self->{tls_enabled} && $self->{tls_info}) {
    50          
3326 45         82 $extensions{tls} = $self->{tls_info};
3327             }
3328             # Remove tls extension if not a TLS connection (per spec)
3329             elsif (!$self->{tls_enabled}) {
3330 242         505 delete $extensions{tls};
3331             }
3332              
3333 287         1847 return \%extensions;
3334             }
3335              
3336             #
3337             # SSE (Server-Sent Events) Support Methods
3338             #
3339              
3340 21     21   34 async sub _handle_sse_request {
3341 21         49 my ($self, $request) = @_;
3342              
3343 21         47 $self->{sse_mode} = 1;
3344 21         83 $self->_stop_idle_timer; # SSE connections are long-lived
3345 21         103 $self->_start_sse_idle_timer; # Start SSE-specific idle timer if configured
3346              
3347 21         79 my $scope = $self->_create_sse_scope($request);
3348 21         67 my $receive = $self->_create_sse_receive($request);
3349 21         76 my $send = $self->_create_sse_send($request);
3350              
3351 21         40 eval {
3352 21         94 await $self->{app}->($scope, $receive, $send);
3353             };
3354              
3355 21 50       1855 if (my $error = $@) {
3356             # If SSE not yet started, send HTTP error
3357 0 0       0 if (!$self->{sse_started}) {
3358 0         0 $self->_send_error_response(500, "Internal Server Error");
3359             }
3360 0         0 warn "PAGI application error (SSE): $error\n";
3361             }
3362              
3363             # End the stream (no-op if an explicit sse.close already finished it).
3364 21         74 $self->_finish_sse_stream('stream_complete');
3365             }
3366              
3367             # Idempotently end an SSE stream: write the chunked terminator (HTTP/1.1), write
3368             # the access log, and close the connection. Called both by the on-return path
3369             # above and by an explicit sse.close event, so it must run exactly once.
3370             sub _finish_sse_stream {
3371 22     22   53 my ($self, $reason) = @_;
3372 22 100       72 return if $self->{sse_finished};
3373 21         44 $self->{sse_finished} = 1;
3374              
3375             # Send chunked terminator if SSE was started and the stream is still writable
3376 21 100 66     202 if ($self->{sse_started} && !$self->{closed} &&
      66        
      66        
3377             $self->{stream} && $self->{stream}->write_handle) {
3378 17         162 $self->{stream}->write("0\r\n\r\n");
3379             }
3380              
3381             # Write access log entry (logs at connection close with total duration)
3382 21         1572 $self->_write_access_log;
3383              
3384             # Close connection after SSE stream ends
3385 21         105 $self->_handle_disconnect_and_close($reason);
3386             }
3387              
3388             sub _create_sse_scope {
3389 21     21   47 my ($self, $request) = @_;
3390              
3391             my $scope = {
3392             type => 'sse',
3393             pagi => {
3394             version => '0.3',
3395             spec_version => '0.3',
3396             },
3397             http_version => $request->{http_version},
3398             method => $request->{method},
3399             scheme => $self->_get_scheme,
3400             path => $request->{path},
3401             raw_path => $request->{raw_path},
3402             query_string => $request->{query_string},
3403             root_path => '',
3404             headers => $request->{headers},
3405             (defined $self->{client_host}
3406             ? (client => [$self->{client_host}, $self->{client_port}])
3407             : ()
3408             ),
3409             server => [$self->{server_host}, $self->{server_port}],
3410             # Optimized: avoid hash copy when state is empty (common case)
3411 21         114 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
3412             extensions => $self->_get_extensions_for_scope,
3413             # Outbound flow-control introspection (buffered_amount, watermarks,
3414             # on_high_water/on_drain). Stashed on the connection too, so the send
3415             # path can poke _check_watermarks after each write.
3416 21 50       154 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    50          
3417             };
3418              
3419 21         78 return $scope;
3420             }
3421              
3422             sub _create_sse_receive {
3423 21     21   67 my ($self, $request) = @_;
3424              
3425 21         39 my $content_length = $request->{content_length};
3426 21   66     65 my $has_body = defined($content_length) && $content_length > 0;
3427 21         34 my $body_complete = 0;
3428 21         28 my $bytes_read = 0;
3429              
3430 21         36 weaken(my $weak_self = $self);
3431              
3432             # Helper to create SSE disconnect event with reason
3433             my $sse_disconnect = sub {
3434             return {
3435             type => 'sse.disconnect',
3436 0 0 0 0   0 reason => ($weak_self ? $weak_self->{sse_disconnect_reason} : undef) // 'client_closed',
3437             };
3438 21         58 };
3439              
3440             return sub {
3441 7 50   7   169 return Future->done($sse_disconnect->())
3442             unless $weak_self;
3443             return Future->done($sse_disconnect->())
3444 7 50       20 if $weak_self->{closed};
3445              
3446 7         12 my $future = (async sub {
3447 7 50       16 return $sse_disconnect->()
3448             unless $weak_self;
3449             return $sse_disconnect->()
3450 7 50       17 if $weak_self->{closed};
3451              
3452             # Check queue first
3453 7 50       21 if (@{$weak_self->{receive_queue}}) {
  7         22  
3454 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3455             }
3456              
3457             # Handle request body for POST/PUT SSE requests
3458 7 100 66     32 if ($has_body && !$body_complete) {
3459 1         3 my $remaining = $content_length - $bytes_read;
3460              
3461             # Wait for data if buffer is empty
3462 1   33     7 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed} && $remaining > 0) {
      33        
3463 0 0       0 if (!$weak_self->{receive_pending}) {
3464 0         0 $weak_self->{receive_pending} = Future->new;
3465             }
3466 0         0 await $weak_self->{receive_pending};
3467 0         0 $weak_self->{receive_pending} = undef;
3468              
3469 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3470 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3471             }
3472             }
3473              
3474 1 50       5 return $sse_disconnect->() if $weak_self->{closed};
3475              
3476             # Read available data up to remaining
3477             my $to_read = $remaining < length($weak_self->{buffer})
3478             ? $remaining
3479 1 50       5 : length($weak_self->{buffer});
3480              
3481 1         6 my $chunk = substr($weak_self->{buffer}, 0, $to_read, '');
3482 1         63 $bytes_read += length($chunk);
3483              
3484 1 50       6 my $more = ($bytes_read < $content_length) ? 1 : 0;
3485 1 50       5 $body_complete = 1 if !$more;
3486              
3487             return {
3488 1         15 type => 'sse.request',
3489             body => $chunk,
3490             more => $more,
3491             };
3492             }
3493              
3494             # No body or body complete - return empty body if not yet returned
3495 6 100       16 if (!$body_complete) {
3496 3         5 $body_complete = 1;
3497             return {
3498 3         28 type => 'sse.request',
3499             body => '',
3500             more => 0,
3501             };
3502             }
3503              
3504             # Wait for disconnect
3505 3         15 while (1) {
3506 5 100       8 if (@{$weak_self->{receive_queue}}) {
  5         14  
3507 2         3 return shift @{$weak_self->{receive_queue}};
  2         13  
3508             }
3509              
3510             return $sse_disconnect->()
3511 3 50       9 if $weak_self->{closed};
3512              
3513 3 50       9 if (!$weak_self->{receive_pending}) {
3514 3         8 $weak_self->{receive_pending} = Future->new;
3515             }
3516 3         24 await $weak_self->{receive_pending};
3517 2         165 $weak_self->{receive_pending} = undef;
3518             }
3519 7         50 })->();
3520              
3521             # Track this Future
3522 7         400 push @{$weak_self->{receive_futures}}, $future;
  7         20  
3523 7         11 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  7         42  
  7         19  
  7         18  
3524              
3525 7         31 return $future;
3526 21         228 };
3527             }
3528              
3529             sub _format_sse_event {
3530 52     52   208493 my ($event) = @_;
3531 52         140 my $sse_data = '';
3532              
3533 52 100 66     232 if (defined $event->{event} && length $event->{event}) {
3534             die "Invalid SSE event name: contains newline\n"
3535 20 100       103 if $event->{event} =~ /[\r\n]/;
3536 19         62 $sse_data .= "event: $event->{event}\n";
3537             }
3538              
3539 51   50     154 my $data = $event->{data} // '';
3540 51         21212 for my $line (split /\r?\n|\r/, $data, -1) {
3541 66         355 $sse_data .= "data: $line\n";
3542             }
3543              
3544 51 100 66     233 if (defined $event->{id} && length $event->{id}) {
3545             die "Invalid SSE id: contains newline\n"
3546 7 100       73 if $event->{id} =~ /[\r\n]/;
3547 6         15 $sse_data .= "id: $event->{id}\n";
3548             }
3549              
3550 50 100       360 if (defined $event->{retry}) {
3551             die "Invalid SSE retry: must be a non-negative integer\n"
3552 7 100       47 unless $event->{retry} =~ /\A[0-9]+\z/;
3553 5         13 $sse_data .= "retry: $event->{retry}\n";
3554             }
3555              
3556 48         116 $sse_data .= "\n";
3557 48         129 return $sse_data;
3558             }
3559              
3560             sub _format_sse_comment {
3561 5     5   4251 my ($event) = @_;
3562 5   50     16 my $text = $event->{comment} // '';
3563 5         22 my $formatted = '';
3564 5         44 for my $line (split /\r?\n|\r/, $text, -1) {
3565 9 100       26 $line = ":$line" unless $line =~ /^:/;
3566 9         15 $formatted .= "$line\n";
3567             }
3568 5         10 $formatted .= "\n";
3569 5         12 return $formatted;
3570             }
3571              
3572             sub _create_sse_send {
3573 21     21   42 my ($self, $request) = @_;
3574              
3575 21         49 weaken(my $weak_self = $self);
3576              
3577 49     49   1531 return async sub {
3578 49         98 my ($event) = @_;
3579 49 50       125 return Future->done unless $weak_self;
3580              
3581 49   50     134 my $type = $event->{type} // '';
3582              
3583             # After an application-initiated sse.close the stream is closed: a second
3584             # sse.close is a no-op, but any other send is a programming error and
3585             # raises (a failed Future the application's `await $send` will throw).
3586 49 100       131 if ($weak_self->{sse_close_sent}) {
3587 1 50       3 return Future->done if $type eq 'sse.close';
3588 1         7 die "cannot send '$type' after sse.close\n";
3589             }
3590              
3591             # Transport already gone (client disconnect): sends are a silent no-op.
3592 48 50       117 return Future->done if $weak_self->{closed};
3593              
3594             # After an sse.http.response.start (decline), only the decline body may
3595             # follow; a stream event is a programming error (first-send-wins).
3596 48 100 100     143 if ($weak_self->{sse_decline_started} && $type !~ /^sse\.http\.response\./) {
3597 1         14 die "cannot send '$type' after sse.http.response.start\n";
3598             }
3599              
3600             # Reset SSE idle timer on send activity
3601 47         185 $weak_self->_reset_sse_idle_timer;
3602              
3603             # Dev-mode event validation (PAGI spec compliance)
3604 47 50       115 if ($weak_self->{validate_events}) {
3605 0         0 require PAGI::Server::EventValidator;
3606 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
3607             }
3608              
3609 47 100       209 if ($type eq 'sse.start') {
    100          
    100          
    50          
    100          
    100          
    100          
    50          
3610 18 50       50 return if $weak_self->{sse_started};
3611 18         33 $weak_self->{sse_started} = 1;
3612 18         65 $weak_self->{response_started} = 1;
3613              
3614 18   100     66 my $status = $event->{status} // 200;
3615 18         37 $weak_self->{response_status} = $status; # Track for access logging
3616 18   100     54 my $headers = $event->{headers} // [];
3617              
3618             # Ensure Content-Type is text/event-stream
3619 18         29 my $has_content_type = 0;
3620 18         40 for my $h (@$headers) {
3621 9 50       32 if (lc($h->[0]) eq 'content-type') {
3622 9         15 $has_content_type = 1;
3623 9         21 last;
3624             }
3625             }
3626              
3627 18         41 my @final_headers = @$headers;
3628 18 100       38 if (!$has_content_type) {
3629 9         21 push @final_headers, ['content-type', 'text/event-stream'];
3630             }
3631              
3632             # Add Cache-Control and Connection headers for SSE
3633 18         59 push @final_headers, ['cache-control', 'no-cache'];
3634 18         47 push @final_headers, ['connection', 'keep-alive'];
3635 18         117 push @final_headers, ['date', $weak_self->{protocol}->format_date];
3636              
3637             # SSE uses chunked encoding implicitly (no Content-Length)
3638             my $response = $weak_self->{protocol}->serialize_response_start(
3639 18         107 $status, \@final_headers, 1 # chunked = 1
3640             );
3641              
3642 18         103 $weak_self->{stream}->write($response);
3643              
3644             # Set protocol-specific keepalive writer (HTTP/1.1 chunked)
3645             $weak_self->{sse_keepalive_writer} = sub {
3646 0         0 my ($text) = @_;
3647 0 0       0 return unless $weak_self;
3648 0 0       0 return if $weak_self->{closed};
3649 0         0 my $len = sprintf("%x", length($text));
3650 0         0 $weak_self->{stream}->write("$len\r\n$text\r\n");
3651 18         4852 };
3652             }
3653             elsif ($type eq 'sse.send') {
3654 18 50       68 return unless $weak_self->{sse_started};
3655              
3656             # --- BACKPRESSURE CHECK ---
3657 18 50       85 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
3658 0         0 await $weak_self->_wait_for_drain;
3659 0 0       0 return Future->done unless $weak_self;
3660 0 0       0 return Future->done if $weak_self->{closed};
3661             }
3662             # --- END BACKPRESSURE CHECK ---
3663              
3664 18         72 my $sse_data = _format_sse_event($event);
3665              
3666             # Send as chunked data
3667 18         99 my $len = sprintf("%x", length($sse_data));
3668 18         123 $weak_self->{stream}->write("$len\r\n$sse_data\r\n");
3669 18         2649 $weak_self->_notify_transport_write;
3670             }
3671             elsif ($type eq 'sse.comment') {
3672 2 50       6 return unless $weak_self->{sse_started};
3673              
3674 2         5 my $comment = _format_sse_comment($event);
3675              
3676 2         7 my $len = sprintf("%x", length($comment));
3677 2         6 $weak_self->{stream}->write("$len\r\n$comment\r\n");
3678             }
3679             elsif ($type eq 'sse.keepalive') {
3680             # SSE keepalive - starts/stops periodic comment timer
3681 0   0     0 my $interval = $event->{interval} // 0;
3682 0         0 my $comment = $event->{comment};
3683              
3684 0 0       0 if ($interval > 0) {
3685 0         0 $weak_self->_start_sse_keepalive($interval, $comment);
3686             }
3687             else {
3688 0         0 $weak_self->_stop_sse_keepalive;
3689             }
3690             }
3691             elsif ($type eq 'sse.close') {
3692             # Explicit application-initiated end of the SSE stream. End it now,
3693             # decoupled from the application returning. `reason` is server-side
3694             # metadata only and is never written to the wire. Idempotency for a
3695             # repeat sse.close is handled by the sse_close_sent guard above.
3696 1         2 $weak_self->{sse_close_sent} = 1;
3697             $weak_self->{sse_disconnect_reason} = $event->{reason}
3698 1 50       4 if defined $event->{reason};
3699 1   50     13 $weak_self->_finish_sse_stream($event->{reason} // 'app_closed');
3700             }
3701             elsif ($type eq 'sse.http.response.start') {
3702             # Decline the SSE stream and return a normal HTTP response. Valid only
3703             # before sse.start; namespaced under sse. (mirrors websocket.http.response.*).
3704             die "cannot decline with sse.http.response.start after sse.start\n"
3705 4 100       22 if $weak_self->{sse_started};
3706 3 50       6 return if $weak_self->{sse_decline_started}; # idempotent
3707 3         6 $weak_self->{sse_decline_started} = 1;
3708 3   50     9 $weak_self->{sse_decline_status} = $event->{status} // 200;
3709             $weak_self->{sse_decline_headers} = [
3710 1         4 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
3711 3   50     4 @{$event->{headers} // []}
  3         10  
3712             ];
3713 3         6 $weak_self->{sse_decline_body} = '';
3714             }
3715             elsif ($type eq 'sse.http.response.body') {
3716 3 50       6 return unless $weak_self->{sse_decline_started};
3717 3 50       6 return if $weak_self->{response_started};
3718 3   50     9 $weak_self->{sse_decline_body} .= $event->{body} // '';
3719 3 50       8 return if $event->{more}; # more chunks coming — keep buffering
3720              
3721 3         4 my $status = $weak_self->{sse_decline_status};
3722 3         4 my $body = $weak_self->{sse_decline_body};
3723             my @headers = (
3724 3         18 @{$weak_self->{sse_decline_headers}},
3725             ['content-length', length $body],
3726 3         6 ['date', $weak_self->{protocol}->format_date],
3727             );
3728 3         13 my $response = $weak_self->{protocol}->serialize_response_start($status, \@headers, 0);
3729 3         7 $response .= $body;
3730 3         16 $weak_self->{stream}->write($response);
3731 3         744 $weak_self->{response_started} = 1;
3732 3         6 $weak_self->{response_status} = $status; # access log
3733             # Declined: close the connection (no event stream was started).
3734 3         11 $weak_self->_handle_disconnect_and_close('client_closed');
3735             }
3736             elsif ($type eq 'http.fullflush') {
3737             # Fullflush extension - force immediate TCP buffer flush
3738             # Per spec: servers that don't advertise the extension must reject
3739 1 50       16 unless (exists $weak_self->{extensions}{fullflush}) {
3740 0         0 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
3741 0         0 die "Extension not enabled: fullflush\n";
3742             }
3743              
3744             # Force flush by ensuring TCP_NODELAY
3745 1         4 my $handle = $weak_self->{stream}->write_handle;
3746 1 50 33     10 if ($handle && $handle->can('setsockopt')) {
3747 1         7 require Socket;
3748 1         4 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
3749             }
3750             }
3751             else {
3752             # Per PAGI spec: servers must raise exceptions for unrecognized event types
3753 0         0 _unrecognized_event_type($type, 'sse');
3754             }
3755              
3756 46         530 return;
3757 21         413 };
3758             }
3759              
3760             #
3761             # WebSocket Support Methods
3762             #
3763              
3764             # WebSocket handshake magic GUID per RFC 6455
3765 113     113   1575 use constant WS_GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
  113         253  
  113         627458  
3766              
3767 22     22   53 async sub _handle_websocket_request {
3768 22         48 my ($self, $request) = @_;
3769              
3770 22         83 $self->_stop_idle_timer; # WebSocket connections are long-lived
3771 22         114 $self->_start_ws_idle_timer; # Start WebSocket-specific idle timer if configured
3772              
3773 22         100 my $scope = $self->_create_websocket_scope($request);
3774 22         80 my $receive = $self->_create_websocket_receive($request);
3775 22         79 my $send = $self->_create_websocket_send($request);
3776              
3777 22         50 eval {
3778 22         105 await $self->{app}->($scope, $receive, $send);
3779             };
3780              
3781 21 100       2385 if (my $error = $@) {
3782             # If handshake not yet done, send HTTP error
3783 2 50       9 if (!$self->{websocket_accepted}) {
3784 2         12 $self->_send_error_response(500, "Internal Server Error");
3785             }
3786 2         94 warn "PAGI application error (WebSocket): $error\n";
3787             }
3788              
3789             # Write access log entry (logs at connection close with total duration)
3790 21         171 $self->_write_access_log;
3791              
3792             # Close connection after WebSocket session ends
3793 21         127 $self->_handle_disconnect_and_close('session_complete');
3794             }
3795              
3796             sub _create_websocket_scope {
3797 22     22   50 my ($self, $request) = @_;
3798              
3799             # Extract WebSocket key and subprotocols from headers
3800 22         43 my $ws_key;
3801             my @subprotocols;
3802              
3803 22         40 for my $header (@{$request->{headers}}) {
  22         68  
3804 123         219 my ($name, $value) = @$header;
3805 123 100       367 if ($name eq 'sec-websocket-key') {
    100          
3806 22         42 $ws_key = $value;
3807             }
3808             elsif ($name eq 'sec-websocket-protocol') {
3809             # Parse comma-separated list of subprotocols
3810 2         12 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  3         51  
3811             }
3812             }
3813              
3814             # Store ws_key for handshake response
3815 22         65 $self->{ws_key} = $ws_key;
3816              
3817             my $scope = {
3818             type => 'websocket',
3819             pagi => {
3820             version => '0.3',
3821             spec_version => '0.3',
3822             },
3823             http_version => $request->{http_version},
3824             scheme => $self->_get_ws_scheme,
3825             path => $request->{path},
3826             raw_path => $request->{raw_path},
3827             query_string => $request->{query_string},
3828             root_path => '',
3829             headers => $request->{headers},
3830             (defined $self->{client_host}
3831             ? (client => [$self->{client_host}, $self->{client_port}])
3832             : ()
3833             ),
3834             server => [$self->{server_host}, $self->{server_port}],
3835             subprotocols => \@subprotocols,
3836             # Optimized: avoid hash copy when state is empty (common case)
3837 22         97 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
3838 22         80 extensions => { %{$self->_get_extensions_for_scope}, 'websocket.http.response' => {} },
3839             # Outbound flow-control introspection (buffered_amount, watermarks,
3840             # on_high_water/on_drain). Stashed on the connection too, so the send
3841             # path can poke _check_watermarks after each write.
3842 22 50       177 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    50          
3843             };
3844              
3845 22         134 return $scope;
3846             }
3847              
3848             sub _create_websocket_receive {
3849 22     22   45 my ($self, $request) = @_;
3850              
3851 22         36 my $connect_sent = 0;
3852 22         40 weaken(my $weak_self = $self);
3853              
3854             return sub {
3855 155 50   155   668921 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3856             unless $weak_self;
3857              
3858             # Check queue first - drain queued messages even if closed
3859 155 100       265 if (@{$weak_self->{receive_queue}}) {
  155         375  
3860 113         204 return Future->done(shift @{$weak_self->{receive_queue}});
  113         370  
3861             }
3862              
3863             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3864 42 50       107 if $weak_self->{closed};
3865              
3866 42         73 my $future = (async sub {
3867 42 50       79 return { type => 'websocket.disconnect', code => 1006, reason => '' }
3868             unless $weak_self;
3869              
3870             # Check queue first - drain queued messages even if closed
3871 42 50       52 if (@{$weak_self->{receive_queue}}) {
  42         106  
3872 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3873             }
3874              
3875             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3876 42 50       114 if $weak_self->{closed};
3877              
3878             # First call returns websocket.connect
3879 42 100       103 if (!$connect_sent) {
3880 19         33 $connect_sent = 1;
3881 19         180 return { type => 'websocket.connect' };
3882             }
3883              
3884             # If not in WebSocket mode yet (waiting for accept), wait
3885 23   33     57 while (!$weak_self->{websocket_mode} && !$weak_self->{closed}) {
3886 0 0       0 if (!$weak_self->{receive_pending}) {
3887 0         0 $weak_self->{receive_pending} = Future->new;
3888             }
3889 0         0 await $weak_self->{receive_pending};
3890 0         0 $weak_self->{receive_pending} = undef;
3891              
3892 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3893 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3894             }
3895             }
3896              
3897             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3898 23 50       42 if $weak_self->{closed};
3899              
3900             # Wait for events from frame processing
3901 23         24 while (1) {
3902 46 100       59 if (@{$weak_self->{receive_queue}}) {
  46         137  
3903 23         29 return shift @{$weak_self->{receive_queue}};
  23         119  
3904             }
3905              
3906             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3907 23 50       50 if $weak_self->{closed};
3908              
3909 23 50       54 if (!$weak_self->{receive_pending}) {
3910 23         76 $weak_self->{receive_pending} = Future->new;
3911             }
3912 23         172 await $weak_self->{receive_pending};
3913 23         1905 $weak_self->{receive_pending} = undef;
3914             }
3915 42         245 })->();
3916              
3917             # Track this Future
3918 42         1890 push @{$weak_self->{receive_futures}}, $future;
  42         91  
3919 42         60 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  42         242  
  52         156  
  42         86  
3920              
3921 42         158 return $future;
3922 22         108 };
3923             }
3924              
3925             sub _create_websocket_send {
3926 22     22   46 my ($self, $request) = @_;
3927              
3928 22         41 weaken(my $weak_self = $self);
3929              
3930 42     42   1199 return async sub {
3931 42         75 my ($event) = @_;
3932 42 50       105 return Future->done unless $weak_self;
3933 42 50       97 return Future->done if $weak_self->{closed};
3934              
3935             # Reset WebSocket idle timer on send activity
3936 42         138 $weak_self->_reset_ws_idle_timer;
3937              
3938 42   50     142 my $type = $event->{type} // '';
3939              
3940             # Dev-mode event validation (PAGI spec compliance)
3941 42 50       121 if ($weak_self->{validate_events}) {
3942 0         0 require PAGI::Server::EventValidator;
3943 0         0 PAGI::Server::EventValidator::validate_websocket_send($event);
3944             }
3945              
3946 42 100       152 if ($type eq 'websocket.accept') {
    100          
    100          
    100          
    50          
    0          
3947 20 50       61 return if $weak_self->{websocket_accepted};
3948              
3949             # Complete the WebSocket handshake
3950 20         51 my $ws_key = $weak_self->{ws_key};
3951 20         243 my $accept_key = sha1_base64($ws_key . WS_GUID);
3952             # sha1_base64 doesn't add padding, but WebSocket requires it
3953 20         88 $accept_key .= '=' while length($accept_key) % 4;
3954              
3955 20         80 my @headers = (
3956             "HTTP/1.1 101 Switching Protocols\r\n",
3957             "Upgrade: websocket\r\n",
3958             "Connection: Upgrade\r\n",
3959             "Sec-WebSocket-Accept: $accept_key\r\n",
3960             );
3961              
3962             # Add subprotocol if specified (with validation)
3963 20 100       76 if (my $subprotocol = $event->{subprotocol}) {
3964 1         5 $subprotocol = _validate_subprotocol($subprotocol);
3965 0         0 push @headers, "Sec-WebSocket-Protocol: $subprotocol\r\n";
3966             }
3967              
3968             # Add custom headers if specified (with CRLF injection validation)
3969 19 100       48 if (my $extra_headers = $event->{headers}) {
3970 1         4 for my $h (@$extra_headers) {
3971 1         3 my ($name, $value) = @$h;
3972 1         6 $name = _validate_header_name($name);
3973 1         28 $value = _validate_header_value($value);
3974 0         0 push @headers, "$name: $value\r\n";
3975             }
3976             }
3977              
3978 18         45 push @headers, "\r\n";
3979              
3980 18         130 $weak_self->{stream}->write(join('', @headers));
3981              
3982             # Switch to WebSocket mode
3983 18         4250 $weak_self->{websocket_mode} = 1;
3984 18         36 $weak_self->{websocket_accepted} = 1;
3985             $weak_self->{websocket_frame} = Protocol::WebSocket::Frame->new(
3986             max_payload_size => $weak_self->{max_ws_frame_size},
3987 18         160 );
3988 18         632 $weak_self->{response_status} = 101; # Track for access logging
3989              
3990             # Notify any waiting receive
3991 18 50 33     61 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
3992 0         0 my $f = $weak_self->{receive_pending};
3993 0         0 $weak_self->{receive_pending} = undef;
3994 0         0 $f->done;
3995             }
3996              
3997             # Process any data that arrived before accept
3998 18 50       73 if (length($weak_self->{buffer}) > 0) {
3999 0         0 $weak_self->_process_websocket_frames;
4000             }
4001             }
4002             elsif ($type eq 'websocket.send') {
4003 17 50       36 return unless $weak_self->{websocket_mode};
4004              
4005             # --- BACKPRESSURE CHECK ---
4006 17 50       40 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
4007 0         0 await $weak_self->_wait_for_drain;
4008 0 0       0 return Future->done unless $weak_self;
4009 0 0       0 return Future->done if $weak_self->{closed};
4010             }
4011             # --- END BACKPRESSURE CHECK ---
4012              
4013 17         23 my $frame;
4014 17 100       31 if (defined $event->{text}) {
    50          
4015             $frame = Protocol::WebSocket::Frame->new(
4016             buffer => $event->{text},
4017 15         49 type => 'text',
4018             );
4019             }
4020             elsif (defined $event->{bytes}) {
4021             $frame = Protocol::WebSocket::Frame->new(
4022             buffer => $event->{bytes},
4023 2         46 type => 'binary',
4024             );
4025             }
4026             else {
4027 0         0 return; # Nothing to send
4028             }
4029              
4030 17         724 my $bytes = $frame->to_bytes;
4031 17         708 $weak_self->{stream}->write($bytes);
4032 17         2336 $weak_self->_notify_transport_write;
4033             }
4034             elsif ($type eq 'websocket.http.response.start') {
4035             # Custom handshake denial (websocket.http.response extension). Only
4036             # valid before accept; buffer the response head until the body
4037             # arrives.
4038 1 50       3 return if $weak_self->{websocket_accepted};
4039 1 50       9 return if $weak_self->{ws_denial_started};
4040 1         3 $weak_self->{ws_denial_started} = 1;
4041 1   50     4 $weak_self->{ws_denial_status} = $event->{status} // 403;
4042             $weak_self->{ws_denial_headers} = [
4043 2         6 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
4044 1   50     1 @{$event->{headers} // []}
  1         3  
4045             ];
4046 1         2 $weak_self->{ws_denial_body} = '';
4047             }
4048             elsif ($type eq 'websocket.http.response.body') {
4049 1 50       4 return unless $weak_self->{ws_denial_started};
4050 1 50       3 return if $weak_self->{response_started};
4051 1   50     13 $weak_self->{ws_denial_body} .= $event->{body} // '';
4052 1 50       6 return if $event->{more}; # more chunks coming — keep buffering
4053              
4054 1         3 my $status = $weak_self->{ws_denial_status};
4055 1         2 my $body = $weak_self->{ws_denial_body};
4056             my @headers = (
4057 1         10 @{$weak_self->{ws_denial_headers}},
4058             ['content-length', length $body],
4059 1         2 ['date', $weak_self->{protocol}->format_date],
4060             );
4061 1         6 my $response = $weak_self->{protocol}->serialize_response_start($status, \@headers, 0);
4062 1         2 $response .= $body;
4063 1         7 $weak_self->{stream}->write($response);
4064 1         242 $weak_self->{response_started} = 1;
4065 1         2 $weak_self->{response_status} = $status; # access log
4066             # Handshake rejected: close like the bare-403 path (no upgrade).
4067 1         4 $weak_self->_handle_disconnect_and_close('client_closed');
4068             }
4069             elsif ($type eq 'websocket.close') {
4070             # If not accepted yet, send 403 Forbidden
4071 3 100       13 if (!$weak_self->{websocket_accepted}) {
4072 1         10 $weak_self->_send_error_response(403, 'Forbidden');
4073 1         7 return;
4074             }
4075              
4076             # Send close frame
4077 2   50     14 my $code = $event->{code} // 1000;
4078 2   50     31 my $reason = $event->{reason} // '';
4079              
4080 2         23 my $frame = Protocol::WebSocket::Frame->new(
4081             type => 'close',
4082             buffer => pack('n', $code) . $reason,
4083             );
4084              
4085 2         125 $weak_self->{stream}->write($frame->to_bytes);
4086 2         444 $weak_self->{close_sent} = 1;
4087              
4088             # If we received a close frame, close immediately
4089             # Otherwise wait for close from client (handled in frame processing)
4090 2 50       16 if ($weak_self->{close_received}) {
4091 0         0 $weak_self->_handle_disconnect_and_close('client_closed');
4092             }
4093             }
4094             elsif ($type eq 'websocket.keepalive') {
4095 0 0       0 return unless $weak_self->{websocket_mode};
4096              
4097 0   0     0 my $interval = $event->{interval} // 0;
4098 0         0 my $timeout = $event->{timeout};
4099              
4100 0 0       0 if ($interval > 0) {
4101 0         0 $weak_self->_start_ws_keepalive($interval, $timeout);
4102             }
4103             else {
4104 0         0 $weak_self->_stop_ws_keepalive;
4105             }
4106             }
4107             else {
4108             # Per PAGI spec: servers must raise exceptions for unrecognized event types
4109 0         0 _unrecognized_event_type($type, 'websocket');
4110             }
4111              
4112 39         327 return;
4113 22         420 };
4114             }
4115              
4116             sub _process_websocket_frames {
4117 57     57   92 my ($self) = @_;
4118              
4119 57 50       133 return unless $self->{websocket_mode};
4120 57 50       133 return if $self->{closed};
4121              
4122             # Reset WebSocket idle timer on receive activity
4123 57         151 $self->_reset_ws_idle_timer;
4124              
4125 57         83 my $frame = $self->{websocket_frame};
4126              
4127             # Append buffer to frame parser
4128 57         235 $frame->append($self->{buffer});
4129 57         542 $self->{buffer} = '';
4130              
4131             # Process all complete frames - use next_bytes to get raw bytes
4132             # Protocol::WebSocket::Frame->next() decodes as UTF-8, which corrupts binary data
4133 57         164 while (defined(my $bytes = $frame->next_bytes)) {
4134 130         10692 my $opcode = $frame->opcode;
4135              
4136             # RFC 6455 Section 5.2: RSV1-3 MUST be 0 unless extension defines meaning
4137             # PAGI doesn't support compression extensions, so RSV must always be 0
4138 130         478 my $rsv = $frame->rsv;
4139 130 50 33     763 if ($rsv && ref($rsv) eq 'ARRAY') {
4140 130 50       240 if (grep { $_ } @$rsv) {
  390         662  
4141 0         0 $self->_send_close_frame(1002, 'RSV bits must be 0');
4142 0         0 $self->_handle_disconnect_and_close('protocol_error');
4143 0         0 return;
4144             }
4145             }
4146              
4147             # RFC 6455 Section 5.2: Opcodes 3-7 and 11-15 (0xB-0xF) are reserved
4148             # Must fail connection with 1002 Protocol Error
4149 130 50 33     485 if (($opcode >= 3 && $opcode <= 7) || ($opcode >= 11 && $opcode <= 15)) {
      33        
      33        
4150 0         0 $self->_send_close_frame(1002, 'Reserved opcode');
4151 0         0 $self->_handle_disconnect_and_close('protocol_error');
4152 0         0 return;
4153             }
4154              
4155             # RFC 6455 Section 5.5: Control frames (close/ping/pong) MUST have
4156             # payload length <= 125 bytes
4157 130 50 33     482 if (($opcode == 8 || $opcode == 9 || $opcode == 10) && length($bytes) > 125) {
      33        
4158 0         0 $self->_send_close_frame(1002, 'Control frame too large');
4159 0         0 $self->_handle_disconnect_and_close('protocol_error');
4160 0         0 return;
4161             }
4162              
4163 130 100       259 if ($opcode == 1) {
    50          
    0          
    0          
    0          
4164             # Text frame - decode as UTF-8
4165 129         162 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  129         740  
4166 129 100       4568 unless (defined $text) {
4167             # Invalid UTF-8 - close with 1007 per RFC 6455
4168 2         9 $self->_send_close_frame(1007, 'Invalid UTF-8');
4169 2         9 $self->_handle_disconnect_and_close('protocol_error');
4170 2         67 return;
4171             }
4172             # Check queue limit before adding (DoS protection)
4173 127 100       197 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  127         325  
4174 2         14 $self->_send_close_frame(1008, 'Message queue overflow');
4175 2         10 $self->_handle_disconnect_and_close('queue_overflow');
4176 2         32 return;
4177             }
4178 125         164 push @{$self->{receive_queue}}, {
  125         717  
4179             type => 'websocket.receive',
4180             text => $text,
4181             };
4182             }
4183             elsif ($opcode == 2) {
4184             # Binary frame - keep as raw bytes
4185             # Check queue limit before adding (DoS protection)
4186 1 50       1 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  1         5  
4187 0         0 $self->_send_close_frame(1008, 'Message queue overflow');
4188 0         0 $self->_handle_disconnect_and_close('queue_overflow');
4189 0         0 return;
4190             }
4191 1         1 push @{$self->{receive_queue}}, {
  1         7  
4192             type => 'websocket.receive',
4193             bytes => $bytes,
4194             };
4195             }
4196             elsif ($opcode == 8) {
4197             # Close frame
4198 0         0 $self->{close_received} = 1;
4199 0         0 my ($code, $reason) = (1005, '');
4200              
4201             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
4202             # 1 byte is invalid
4203 0 0       0 if (length($bytes) == 1) {
4204 0         0 $self->_send_close_frame(1002, 'Invalid close frame');
4205 0         0 $self->_handle_disconnect_and_close('protocol_error');
4206 0         0 return;
4207             }
4208              
4209 0 0       0 if (length($bytes) >= 2) {
4210 0         0 $code = unpack('n', substr($bytes, 0, 2));
4211 0   0     0 $reason = substr($bytes, 2) // '';
4212              
4213             # RFC 6455 Section 7.4.1: Validate close code
4214             # Valid codes: 1000-1003, 1007-1011, 3000-4999
4215             # Invalid: 0-999, 1004-1006, 1012-2999, 5000+
4216 0         0 my $valid_code = 0;
4217 0 0 0     0 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    0 0        
    0 0        
      0        
      0        
4218 0         0 $valid_code = 1;
4219             }
4220             elsif ($code >= 1007 && $code <= 1011) {
4221 0         0 $valid_code = 1;
4222             }
4223             elsif ($code >= 3000 && $code <= 4999) {
4224 0         0 $valid_code = 1;
4225             }
4226 0 0       0 unless ($valid_code) {
4227 0         0 $self->_send_close_frame(1002, 'Invalid close code');
4228 0         0 $self->_handle_disconnect_and_close('protocol_error');
4229 0         0 return;
4230             }
4231              
4232             # RFC 6455: Close reason must be valid UTF-8
4233 0 0       0 if (length($reason) > 0) {
4234 0         0 my $reason_copy = $reason;
4235 0         0 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  0         0  
4236 0 0       0 unless (defined $decoded) {
4237 0         0 $self->_send_close_frame(1007, 'Invalid UTF-8 in close reason');
4238 0         0 $self->_handle_disconnect_and_close('protocol_error');
4239 0         0 return;
4240             }
4241             }
4242             }
4243              
4244             # If we haven't sent close yet, send it now
4245 0 0       0 if (!$self->{close_sent}) {
4246 0         0 my $close_frame = Protocol::WebSocket::Frame->new(
4247             type => 'close',
4248             buffer => pack('n', $code) . $reason,
4249             );
4250 0         0 $self->{stream}->write($close_frame->to_bytes);
4251 0         0 $self->{close_sent} = 1;
4252             }
4253              
4254 0         0 push @{$self->{receive_queue}}, {
  0         0  
4255             type => 'websocket.disconnect',
4256             code => $code,
4257             reason => $reason,
4258             };
4259             }
4260             elsif ($opcode == 9) {
4261             # Ping - respond with pong (transparent to app)
4262 0         0 my $pong = Protocol::WebSocket::Frame->new(
4263             type => 'pong',
4264             buffer => $bytes,
4265             );
4266 0         0 $self->{stream}->write($pong->to_bytes);
4267             }
4268             elsif ($opcode == 10) {
4269             # Pong - cancel any pending timeout (response to our ping)
4270 0         0 $self->_cancel_ws_pong_timeout;
4271             }
4272             }
4273              
4274             # Notify any waiting receive
4275 52 50 66     615 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready && @{$self->{receive_queue}}) {
  10   66     111  
4276 10         15 my $f = $self->{receive_pending};
4277 10         20 $self->{receive_pending} = undef;
4278 10         24 $f->done;
4279             }
4280             }
4281              
4282             # Async file response - prioritizes speed based on file size:
4283             # 1. Small files (<=64KB): direct in-process read (fastest for small files)
4284             # 2. Large files: async chunked reads via worker pool (non-blocking)
4285 10     10   16 async sub _send_file_response {
4286 10         25 my ($self, $file, $offset, $length, $chunked) = @_;
4287              
4288             # Get file size if length not specified
4289 10         231 my $file_size = -s $file;
4290 10 50       28 die "Cannot stat file $file: $!" unless defined $file_size;
4291 10   66     43 $length //= $file_size - $offset;
4292              
4293 10         17 $self->{_response_size} += $length;
4294              
4295 10         17 my $stream = $self->{stream};
4296              
4297 10 100 66     44 if ($self->{sync_file_threshold} > 0 && $length <= $self->{sync_file_threshold}) {
4298             # Small file fast path: read directly in-process
4299             # For files <= 64KB, a simple read() is fast and avoids async overhead
4300 7 50       246 open my $fh, '<:raw', $file or die "Cannot open file $file: $!";
4301 7 100       27 seek($fh, $offset, 0) if $offset;
4302 7         189 my $bytes_read = read($fh, my $data, $length);
4303 7         59 close $fh;
4304              
4305 7 50       18 die "Failed to read file $file: $!" unless defined $bytes_read;
4306              
4307 7 100       14 if ($chunked) {
4308 1         6 my $len = sprintf("%x", length($data));
4309 1         6 $stream->write("$len\r\n$data\r\n");
4310 1         90 $stream->write("0\r\n\r\n");
4311             }
4312             else {
4313 6         20 $stream->write($data);
4314             }
4315             }
4316             else {
4317             # Large file path: async chunked reads via worker pool
4318 3 50       15 my $loop = $self->{server} ? $self->{server}->loop : undef;
4319 3 50       36 die "No event loop available for async file I/O" unless $loop;
4320              
4321             await PAGI::Server::AsyncFile->read_file_chunked(
4322             $loop, $file,
4323             sub {
4324 6     6   29 my ($chunk) = @_;
4325 6 100       22 if ($chunked) {
4326 2         8 my $len = sprintf("%x", length($chunk));
4327 2         42 $stream->write("$len\r\n$chunk\r\n");
4328             }
4329             else {
4330 4         29 $stream->write($chunk);
4331             }
4332 6         942 return; # Sync callback
4333             },
4334 3         54 offset => $offset,
4335             length => $length,
4336             chunk_size => FILE_CHUNK_SIZE,
4337             );
4338              
4339             # Send final chunk terminator if chunked
4340 3 100       248 if ($chunked) {
4341 1         16 $stream->write("0\r\n\r\n");
4342             }
4343             }
4344             }
4345              
4346             # Async filehandle response - uses worker pool for non-blocking reads
4347             # Note: Can't easily use sendfile for arbitrary filehandles (may not have fd,
4348             # may be pipes, may be in-memory). Falls back to chunked reads.
4349 6     6   9 async sub _send_fh_response {
4350 6         13 my ($self, $fh, $offset, $length, $chunked) = @_;
4351              
4352             # Seek to offset if specified
4353 6 100 66     18 if ($offset && $offset > 0) {
4354 1 50       10 seek($fh, $offset, 0) or die "Cannot seek: $!";
4355             }
4356              
4357             # For filehandles, we can't easily use the worker pool (can't pass fh across fork).
4358             # Use blocking reads in small chunks - not ideal but practical.
4359             # TODO: Consider IO::Async::FileStream for better event loop integration.
4360              
4361 6         9 my $remaining = $length; # undef means read to EOF
4362 6         13 my $stream = $self->{stream};
4363              
4364 6         7 while (1) {
4365 12         14 my $to_read = FILE_CHUNK_SIZE;
4366 12 100       22 if (defined $remaining) {
4367 4 50       8 $to_read = $remaining if $remaining < $to_read;
4368 4 100       10 last if $to_read <= 0;
4369             }
4370              
4371 10         173 my $bytes_read = read($fh, my $chunk, $to_read);
4372              
4373 10 50       45 last if !defined $bytes_read; # Error
4374 10 100       20 last if $bytes_read == 0; # EOF
4375              
4376 6         11 $self->{_response_size} += $bytes_read;
4377              
4378 6 100       11 if ($chunked) {
4379 3         10 my $len = sprintf("%x", length($chunk));
4380 3         29 $stream->write("$len\r\n$chunk\r\n");
4381             }
4382             else {
4383 3         9 $stream->write($chunk);
4384             }
4385              
4386 6 100       507 if (defined $remaining) {
4387 2         4 $remaining -= $bytes_read;
4388             }
4389             }
4390              
4391             # Send final chunk if chunked encoding
4392 6 100       51 if ($chunked) {
4393 3         6 $stream->write("0\r\n\r\n");
4394             }
4395             }
4396              
4397             1;
4398              
4399             __END__