File Coverage

blib/lib/PAGI/Server.pm
Criterion Covered Total %
statement 825 1120 73.6
branch 322 606 53.1
condition 200 364 54.9
subroutine 84 108 77.7
pod 10 12 83.3
total 1441 2210 65.2


line stmt bran cond sub pod time code
1             package PAGI::Server;
2 87     87   17860185 use strict;
  87         180  
  87         3032  
3 87     87   365 use warnings;
  87         137  
  87         15968  
4              
5             our $VERSION = '0.001012';
6              
7             # Future::XS support - opt-in via PAGI_FUTURE_XS=1 environment variable
8             # Must be loaded before Future to take effect, so we check env var in BEGIN
9             # Note: We declare these without initialization so BEGIN block values persist
10             our ($FUTURE_XS_AVAILABLE, $FUTURE_XS_ENABLED);
11             BEGIN {
12 87 50   87   238 $FUTURE_XS_AVAILABLE = eval { require Future::XS; 1 } ? 1 : 0;
  87         8345  
  0         0  
13 87         170 $FUTURE_XS_ENABLED = 0; # Default to disabled
14              
15 87 50       2881 if ($ENV{PAGI_FUTURE_XS}) {
    50          
16 0 0       0 if ($FUTURE_XS_AVAILABLE) {
17             # Future::XS is already loaded from the availability check
18 0         0 $FUTURE_XS_ENABLED = 1;
19             } else {
20 0         0 die <<"END_FUTURE_XS_ERROR";
21             PAGI_FUTURE_XS=1 set but Future::XS is not installed.
22              
23             To install Future::XS:
24             cpanm Future::XS
25              
26             Or unset the PAGI_FUTURE_XS environment variable.
27             END_FUTURE_XS_ERROR
28             }
29             } elsif ($FUTURE_XS_AVAILABLE) {
30             # Available but not requested - unload it
31 0         0 delete $INC{'Future/XS.pm'};
32             }
33             }
34              
35 87     87   5428 use parent 'IO::Async::Notifier';
  87         4270  
  87         660  
36 87     87   210070 use IO::Async::Listener;
  86         455071  
  86         4517  
37 86     86   15111 use IO::Async::Stream;
  86         891417  
  86         2204  
38 86     86   2359 use IO::Async::Loop;
  86         23874  
  86         1702  
39 86     86   30159 use IO::Async::Timer::Periodic;
  86         115226  
  86         3569  
40 86     86   26442 use IO::Socket::INET;
  86         576710  
  86         705  
41 86     86   31741 use Future;
  86         168  
  86         1614  
42 86     86   3139 use Future::AsyncAwait;
  86         18997  
  86         603  
43              
44 86     86   4825 use Scalar::Util qw(weaken refaddr);
  86         108  
  86         4522  
45 86     86   321 use Socket qw(sockaddr_family unpack_sockaddr_in unpack_sockaddr_un AF_UNIX AF_INET);
  86         117  
  86         4852  
46 86     86   403 use POSIX ();
  86         167  
  86         1378  
47              
48 86     86   61288 use PAGI::Server::Connection;
  86         307  
  86         33617  
49 86     86   46259 use PAGI::Server::Protocol::HTTP1;
  86         652  
  86         7954  
50              
51              
52             # Check TLS module availability (cached at load time for banner display)
53             our $TLS_AVAILABLE;
54             BEGIN {
55 86 50   86   606 $TLS_AVAILABLE = eval {
56 86         38830 require IO::Async::SSL;
57 86         7514805 require IO::Socket::SSL;
58 86         10345 1;
59             } ? 1 : 0;
60             }
61              
62 8     8 0 214555 sub has_tls { return $TLS_AVAILABLE }
63              
64             # Check HTTP/2 module availability (cached at load time)
65             our $HTTP2_AVAILABLE;
66             BEGIN {
67 86 50   86   259 $HTTP2_AVAILABLE = eval {
68 86         38395 require PAGI::Server::Protocol::HTTP2;
69 86         751 PAGI::Server::Protocol::HTTP2->available;
70             } ? 1 : 0;
71             }
72              
73 15     15 0 432482 sub has_http2 { return $HTTP2_AVAILABLE }
74              
75             # Windows doesn't support Unix signals - signal handling is conditional
76 86     86   550 use constant WIN32 => $^O eq 'MSWin32';
  86         119  
  86         1660719  
77              
78             =encoding utf8
79              
80             =head1 NAME
81              
82             PAGI::Server - PAGI Reference Server Implementation
83              
84             =head1 SYNOPSIS
85              
86             use IO::Async::Loop;
87             use PAGI::Server;
88              
89             # If using Future::IO libraries (Async::Redis, SSE->every, etc.)
90             # load the IO::Async implementation BEFORE loading them:
91             use Future::IO::Impl::IOAsync;
92              
93             my $loop = IO::Async::Loop->new;
94              
95             my $server = PAGI::Server->new(
96             app => \&my_pagi_app,
97             host => '127.0.0.1',
98             port => 5000,
99             );
100              
101             $loop->add($server);
102             $server->listen->get; # Start accepting connections
103              
104             See L for details on Future::IO configuration.
105              
106             =head1 DESCRIPTION
107              
108             PAGI::Server is a reference implementation of a PAGI-compliant HTTP server.
109             It supports HTTP/1.1, WebSocket, and Server-Sent Events (SSE) as defined
110             in the PAGI specification. It prioritizes spec compliance and code clarity
111             over performance optimization. It serves as the canonical reference for how
112             PAGI servers should behave.
113              
114             =head1 PROTOCOL SUPPORT
115              
116             B
117              
118             =over 4
119              
120             =item * HTTP/1.1 (full support including chunked encoding, trailers, keepalive)
121              
122             =item * HTTP/2 (B - via nghttp2, h2 over TLS and h2c cleartext)
123              
124             =item * WebSocket (RFC 6455, including over HTTP/2 via RFC 8441)
125              
126             =item * Server-Sent Events (SSE, including over HTTP/2)
127              
128             =back
129              
130             B
131              
132             =over 4
133              
134             =item * HTTP/3 (QUIC) - Under consideration
135              
136             =back
137              
138             For HTTP/2, see L.
139              
140             =head1 UNIX DOMAIN SOCKET SUPPORT (EXPERIMENTAL)
141              
142             B The API is subject to change in future
143             releases. Please report issues at L.
144              
145             Unix domain sockets provide efficient local communication between a reverse
146             proxy (nginx, HAProxy, etc.) and PAGI::Server running on the same machine.
147             They bypass the TCP/IP stack entirely, reducing latency and overhead compared
148             to connecting over C<127.0.0.1>.
149              
150             =head2 When to Use Unix Sockets
151              
152             =over 4
153              
154             =item * B — nginx or HAProxy on the same host handles
155             TLS termination, HTTP/2 negotiation, and static files, forwarding dynamic
156             requests to PAGI over a Unix socket.
157              
158             =item * B — frameworks like TechEmpower FrameworkBenchmarks use
159             Unix sockets to eliminate network variable from application benchmarks.
160              
161             =item * B — services on the same host communicate without
162             network overhead.
163              
164             =back
165              
166             B If clients connect over the network (remote browsers,
167             API consumers), use TCP. Unix sockets only accept connections from processes
168             on the same machine.
169              
170             =head2 Basic Usage
171              
172             B
173              
174             my $server = PAGI::Server->new(
175             app => $app,
176             socket => '/tmp/pagi.sock',
177             );
178              
179             B
180              
181             pagi-server --socket /tmp/pagi.sock ./app.pl
182              
183             B
184              
185             pagi-server --socket /tmp/pagi.sock --workers 4 ./app.pl
186              
187             In multi-worker mode, the parent process creates the Unix socket and all
188             worker processes inherit the file descriptor via C. The kernel
189             distributes incoming connections across workers.
190              
191             =head2 How It Works
192              
193             =over 4
194              
195             =item 1. On startup, any existing file at the socket path is removed (stale
196             socket cleanup).
197              
198             =item 2. The server creates and binds a C Unix domain socket
199             at the specified path using C (multi-worker) or
200             C with C<< family => 'unix' >> (single-worker).
201              
202             =item 3. If C is set, C is called immediately after
203             binding to set the file permissions.
204              
205             =item 4. In multi-worker mode, the parent creates the socket before forking.
206             Workers inherit the listening fd and each runs its own C
207             wrapping the inherited handle.
208              
209             =item 5. On graceful shutdown (SIGTERM/SIGINT), the socket file is unlinked
210             by both the single-worker C path and the multi-worker
211             C<_initiate_multiworker_shutdown()> path.
212              
213             =back
214              
215             =head2 nginx Configuration
216              
217             B
218              
219             upstream pagi_backend {
220             server unix:/var/run/myapp/pagi.sock;
221             keepalive 32;
222             }
223              
224             server {
225             listen 80;
226             server_name myapp.example.com;
227              
228             location / {
229             proxy_pass http://pagi_backend;
230              
231             # Required for upstream keepalive:
232             proxy_http_version 1.1;
233             proxy_set_header Connection "";
234              
235             # Forward client info (since PAGI can't see it over Unix socket):
236             proxy_set_header Host $host;
237             proxy_set_header X-Real-IP $remote_addr;
238             proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
239             proxy_set_header X-Forwarded-Proto $scheme;
240             }
241             }
242              
243             B The C directive is critical for performance. Without
244             it, nginx opens a new Unix socket connection for every request.
245             C and C are required
246             for keepalive to work.
247              
248             B
249              
250             server {
251             listen 443 ssl http2;
252             ssl_certificate /etc/ssl/myapp.crt;
253             ssl_certificate_key /etc/ssl/myapp.key;
254              
255             location / {
256             proxy_pass http://pagi_backend;
257             # ... same proxy headers as above
258             }
259             }
260              
261             nginx handles TLS and HTTP/2 with clients, then speaks plain HTTP/1.1 to
262             PAGI over the Unix socket. This is the recommended production pattern.
263              
264             =head2 Socket Permissions and Security
265              
266             By default, the socket file inherits permissions from the process umask.
267             Use C to set explicit permissions:
268              
269             # CLI
270             pagi-server --socket /var/run/myapp/pagi.sock --socket-mode 0660 ./app.pl
271              
272             # Programmatic
273             PAGI::Server->new(
274             app => $app,
275             socket => '/var/run/myapp/pagi.sock',
276             socket_mode => 0660,
277             );
278              
279             B
280              
281             =over 4
282              
283             =item * B, not C. Directories like
284             C or C prevent symlink attacks and provide
285             an additional permission layer.
286              
287             =item * B with a shared group.> Create a group (e.g., C)
288             that both the application user and the nginx user belong to:
289              
290             sudo groupadd myapp
291             sudo usermod -aG myapp www-data # nginx user
292             sudo usermod -aG myapp myappuser # app user
293             sudo mkdir -p /var/run/myapp
294             sudo chown myappuser:myapp /var/run/myapp
295             sudo chmod 0750 /var/run/myapp
296              
297             =item * B> for automatic directory management:
298              
299             # /etc/systemd/system/myapp.service
300             [Service]
301             User=myappuser
302             Group=myapp
303             RuntimeDirectory=myapp
304             RuntimeDirectoryMode=0750
305             ExecStart=/usr/local/bin/pagi-server \
306             --socket /run/myapp/pagi.sock \
307             --socket-mode 0660 \
308             --workers 4 \
309             /opt/myapp/app.pl
310              
311             systemd creates C on service start and cleans it up on stop.
312              
313             =back
314              
315             =head2 TLS Over Unix Sockets
316              
317             TLS can be used over Unix sockets, though this is unusual — normally the
318             reverse proxy handles TLS termination. When TLS is configured on a Unix
319             socket listener, the server logs an info-level note suggesting reverse proxy
320             TLS termination instead.
321              
322             The combination is allowed because it has legitimate uses (encrypted
323             inter-container communication, compliance requirements). All major ASGI
324             servers (Uvicorn, Hypercorn, Granian) also allow it.
325              
326             =head2 HTTP/2 Over Unix Sockets
327              
328             h2c (HTTP/2 cleartext) works over Unix sockets. This is useful for gRPC
329             backends or reverse proxies that support HTTP/2 to upstreams (e.g., Envoy).
330             Note that nginx does B currently support HTTP/2 to upstream backends
331             (except for gRPC via C).
332              
333             =head2 Scope Differences
334              
335             For Unix socket connections, the PAGI scope differs from TCP connections:
336              
337             =over 4
338              
339             =item * B is absent> — Unix sockets have no peer IP address or
340             port. The C key is omitted entirely from the scope hashref (not set
341             to C). This is spec-compliant: the PAGI specification marks C
342             as optional.
343              
344             =item * B is C<[$socket_path, undef]>>> — instead of C<[$host, $port]>.
345              
346             =back
347              
348             B Any middleware that accesses C<< $scope->{client} >>
349             must check C<< exists $scope->{client} >> first. For client IP identification
350             behind a reverse proxy, use C or C headers
351             instead of C<< $scope->{client} >>. The C
352             middleware (if available) handles this automatically.
353              
354             B Unix socket connections log C as the client IP in the
355             access log instead of an IP address.
356              
357             =head2 Stale Socket Cleanup
358              
359             If a socket file already exists at the configured path (e.g., from a previous
360             crash), it is automatically removed before binding. This matches the behavior
361             of Starman, Gunicorn, Uvicorn, and other production servers. The socket file
362             is also removed during graceful shutdown (SIGTERM/SIGINT).
363              
364             If the server is killed with SIGKILL (C), the socket file will
365             B be cleaned up. It will be removed on the next startup.
366              
367             =head1 MULTI-LISTENER SUPPORT (EXPERIMENTAL)
368              
369             B The API is subject to change in future
370             releases.
371              
372             A single PAGI::Server instance can listen on multiple endpoints
373             simultaneously. This is useful for:
374              
375             =over 4
376              
377             =item * B — load
378             balancers probe a TCP port while nginx uses the Unix socket.
379              
380             =item * B — serve different interfaces on different ports.
381              
382             =item * B — listen on both old and new ports during
383             a transition.
384              
385             =back
386              
387             =head2 Programmatic API
388              
389             my $server = PAGI::Server->new(
390             app => $app,
391             listen => [
392             { host => '0.0.0.0', port => 8080 },
393             { socket => '/tmp/pagi.sock', socket_mode => 0660 },
394             ],
395             );
396              
397             Each spec in the C array is a hashref with either C<< { host, port } >>
398             for TCP or C<< { socket } >> (with optional C) for Unix sockets.
399              
400             B Per-listener TLS configuration is not yet supported. TLS is configured
401             server-wide via the C constructor option and applies to all TCP listeners.
402             Unix socket listeners behind a reverse proxy do not need TLS — the proxy handles
403             TLS termination.
404              
405             =head2 CLI
406              
407             The C<--listen> flag is repeatable. The server auto-detects TCP vs Unix
408             socket: values containing C<:> are parsed as C, everything else
409             is treated as a Unix socket path.
410              
411             # TCP + Unix socket
412             pagi-server --listen 0.0.0.0:8080 --listen /tmp/pagi.sock ./app.pl
413              
414             # Multiple TCP ports
415             pagi-server --listen 0.0.0.0:8080 --listen 0.0.0.0:8443 ./app.pl
416              
417             # With workers
418             pagi-server --listen 0.0.0.0:8080 --listen /tmp/pagi.sock -w 4 ./app.pl
419              
420             # IPv6
421             pagi-server --listen [::1]:5000 ./app.pl
422              
423             C<--listen> is B with C<--host>, C<--port>, and
424             C<--socket>. C<--socket-mode> applies to all Unix socket listeners when
425             using C<--listen>.
426              
427             =head2 How It Works
428              
429             =over 4
430              
431             =item * In B, one C is created per
432             endpoint. All listeners share the same event loop and connection handler.
433              
434             =item * In B, the parent process creates all listening
435             sockets (Unix and TCP) before forking. Workers inherit all file descriptors
436             and create their own C for each inherited socket.
437              
438             =item * The C option applies only to TCP listeners. Unix socket
439             listeners always use the shared-socket model (parent creates, workers inherit).
440              
441             =item * On shutdown, all listeners are stopped and all Unix socket files are
442             cleaned up.
443              
444             =back
445              
446             =head2 Accessors
447              
448             $server->port; # Bound port of first TCP listener, or undef
449             $server->socket_path; # Path of first Unix socket listener, or undef
450             $server->listeners; # Arrayref of all listener specs
451              
452             =head2 Backward Compatibility
453              
454             The existing C/C constructor options continue to work exactly
455             as before. They are internally normalized to a single-element listener
456             array. The C option is similarly sugar for a single Unix socket
457             listener. Only C enables true multi-listener mode.
458              
459             =head1 SYSTEMD SOCKET ACTIVATION (EXPERIMENTAL)
460              
461             B The API is subject to change in future
462             releases.
463              
464             PAGI::Server supports systemd socket activation, which allows systemd to
465             create and hold listening sockets on behalf of the server. This enables
466             zero-downtime restarts: when the server is restarted, the kernel continues
467             to queue incoming connections on the socket without refusing or dropping
468             them, even during the gap between the old process exiting and the new one
469             starting.
470              
471             Benefits of systemd socket activation:
472              
473             =over 4
474              
475             =item * B — the kernel queues connections during restarts
476              
477             =item * B — systemd creates the socket as root, then drops privileges before exec
478              
479             =item * B — the server is started automatically when the first connection arrives
480              
481             =back
482              
483             =head2 Basic Setup
484              
485             Create two systemd unit files: a C<.socket> unit that describes the socket,
486             and a C<.service> unit for the server itself.
487              
488             B):>
489              
490             [Unit]
491             Description=PAGI Application Socket
492              
493             [Socket]
494             ListenStream=0.0.0.0:8080
495             Accept=no
496              
497             [Install]
498             WantedBy=sockets.target
499              
500             B):>
501              
502             [Unit]
503             Description=PAGI Application Socket
504              
505             [Socket]
506             ListenStream=/run/pagi/app.sock
507             SocketMode=0660
508             SocketUser=www-data
509             SocketGroup=www-data
510             Accept=no
511              
512             [Install]
513             WantedBy=sockets.target
514              
515             B):>
516              
517             [Unit]
518             Description=PAGI Application Server
519             Requires=pagi.socket
520              
521             [Service]
522             User=www-data
523             ExecStart=/usr/local/bin/pagi-server -E production ./app.pl
524             Restart=on-failure
525              
526             [Install]
527             WantedBy=multi-user.target
528              
529             C is B. PAGI::Server accepts connections itself via
530             C; systemd must not accept on its behalf.
531              
532             =head2 How Auto-Detection Works
533              
534             When PAGI::Server starts, it checks the C and C
535             environment variables set by systemd. If C matches the current
536             process PID, the server inspects each inherited file descriptor (starting at
537             fd 3) with C to determine its address.
538              
539             Each inherited socket is then matched against the configured listeners. For
540             example, if you configure C<< port => 8080 >> and systemd has a socket bound
541             to C<0.0.0.0:8080>, PAGI::Server will use the inherited fd instead of
542             creating a new socket.
543              
544             The same application code works identically with or without systemd:
545              
546             # Without systemd: PAGI::Server binds the socket itself
547             # With systemd: PAGI::Server inherits the socket from systemd
548             my $server = PAGI::Server->new(
549             app => $app,
550             host => '0.0.0.0',
551             port => 8080,
552             );
553              
554             After reading the inherited fds, PAGI::Server removes C,
555             C, and C from the environment (per the
556             C specification), so child processes do not re-inherit them.
557              
558             =head2 Unix Socket Cleanup
559              
560             Normally, PAGI::Server unlinks its Unix socket file on shutdown. For
561             systemd-activated Unix sockets, the socket file is B unlinked because
562             systemd owns the socket and will recreate it for the next activation.
563              
564             =head1 FD REUSE INTERNALS (EXPERIMENTAL)
565              
566             B The API is subject to change in future
567             releases.
568              
569             PAGI::Server uses a C environment variable to pass inherited
570             listening socket file descriptors to re-exec'd processes during hot restart
571             (see L). This mechanism also supports systemd
572             socket activation (see L).
573              
574             =head2 PAGI_REUSE Format
575              
576             The variable is a comma-separated list of C entries:
577              
578             # TCP listeners
579             PAGI_REUSE=127.0.0.1:8080:3,0.0.0.0:8443:4
580              
581             # Unix socket listeners
582             PAGI_REUSE=unix:/run/pagi/app.sock:5
583              
584             # Mixed
585             PAGI_REUSE=0.0.0.0:8080:3,unix:/run/pagi/app.sock:4
586              
587             Each entry encodes the address the socket is bound to and the file descriptor
588             number to use.
589              
590             =head2 Fd Matching
591              
592             When starting, C<_collect_inherited_fds()> parses C and/or
593             C, building a table of C<< address => fd >> pairs. During
594             C, each configured listener looks up its own address in the table.
595             If a match is found, the existing fd is used instead of calling C and
596             C. This allows the kernel's accept queue to be preserved across
597             restarts.
598              
599             =head2 File Descriptor Inheritance
600              
601             To ensure that listening socket fds are inherited across C,
602             PAGI::Server sets C<$^F = 1023> before creating sockets. Perl uses C<$^F>
603             (the maximum system file descriptor, equivalent to C in
604             spirit) to decide which fds receive the C close-on-exec flag:
605             fds with numbers greater than C<$^F> get C set automatically.
606             By raising C<$^F> to 1023, listen socket fds remain open across C
607             without requiring explicit C calls.
608              
609             =head1 HOT RESTART (EXPERIMENTAL)
610              
611             B The API and signal behaviour are subject
612             to change in future releases.
613              
614             Deploying new code normally requires a server restart. During that restart
615             there is a gap — however brief — where the listening socket is closed and
616             incoming connections are dropped. Hot restart eliminates this gap by having
617             the old master fork and exec a brand-new master process that B the
618             already-open listening sockets. Both the old master and the new master serve
619             requests during the transition; clients never see a refused connection.
620              
621             =head2 How It Works
622              
623             =over 4
624              
625             =item 1.
626              
627             An admin sends C to the running master: Cmaster_pidE>
628              
629             =item 2.
630              
631             The old master sets C (encoding each listening socket fd) and
632             C (its own PID) in the environment, then calls C
633             followed immediately by C of the original C command
634             (reconstructed from C).
635              
636             =item 3.
637              
638             The new master starts, finds the inherited file descriptors via C,
639             and reuses the existing listening sockets rather than calling C/C
640             again. The kernel's accept queue is preserved — no connections are dropped.
641              
642             =item 4.
643              
644             The new master spawns its worker pool and waits for each worker to complete
645             the lifespan startup handshake (heartbeat).
646              
647             =item 5.
648              
649             Once all workers are healthy, the new master sends C to the old
650             master (read from C).
651              
652             =item 6.
653              
654             The old master receives C, finishes in-flight requests within its
655             shutdown timeout, and exits cleanly.
656              
657             =back
658              
659             =head2 HUP vs USR2
660              
661             HUP — Rolling worker restart. Workers are replaced one by one.
662             Code loaded at master startup (middleware, startup modules)
663             is NOT reloaded. Use for: config changes picked up per-worker.
664              
665             USR2 — Full master re-exec. Everything reloaded from disk including
666             the perl binary, all modules, middleware stack. Use for:
667             code deploys, Perl upgrades, PAGI::Server upgrades.
668              
669             =head2 Deploy Workflow
670              
671             # Deploy new code
672             rsync -a ./lib/ /opt/myapp/lib/
673              
674             # Hot restart (zero downtime)
675             kill -USR2 $(cat /var/run/myapp/pagi.pid)
676              
677             # Verify (optional)
678             curl http://localhost:8080/health
679              
680             =head2 systemd Unit File
681              
682             [Service]
683             Type=forking
684             PIDFile=/var/run/myapp/pagi.pid
685             ExecStart=/usr/local/bin/pagi-server \
686             --host 0.0.0.0 --port 8080 \
687             --workers 4 --pid /var/run/myapp/pagi.pid \
688             --daemonize /opt/myapp/app.pl
689             ExecReload=/bin/kill -USR2 $MAINPID
690             KillMode=process
691              
692             =head2 Failure Handling
693              
694             The design ensures the old master never stops until the new master explicitly
695             sends C:
696              
697             =over 4
698              
699             =item * B — The old master logs the error and continues serving.
700             No new master is started.
701              
702             =item * B — The forked child exits before loading any code.
703             The old master notices (via C) and continues serving.
704              
705             =item * B — The old master never receives
706             C and continues serving indefinitely.
707              
708             =item * B — The new master exits without
709             sending C. The old master is unaffected.
710              
711             =back
712              
713             =head2 PERL5LIB and Module Paths
714              
715             When the new master is C'd it inherits the process environment, but
716             B any C<-I> flags that were on the original command line. If your
717             application uses C<-Ilib>, ensure C is set in the environment or
718             in the systemd unit file so the re-exec'd process can find your modules.
719             Alternatively, use the C<--lib> flag (C),
720             which is captured in C and replayed on re-exec.
721              
722             =head1 WINDOWS SUPPORT
723              
724             B
725              
726             The server relies on Unix-specific features that are not available on Windows:
727              
728             =over 4
729              
730             =item * B - SIGTERM, SIGINT, SIGHUP for graceful shutdown and worker management
731              
732             =item * B - Multi-worker mode requires real process forking, not thread emulation
733              
734             =item * B - The event loop has Unix-specific optimizations
735              
736             =back
737              
738             For Windows development, consider using WSL (Windows Subsystem for Linux) to
739             run PAGI::Server in a Linux environment. The PAGI specification and middleware
740             components can still be developed and unit-tested on Windows, but the reference
741             server implementation requires a Unix-like operating system.
742              
743              
744             =head1 CONSTRUCTOR
745              
746             =head2 new
747              
748             my $server = PAGI::Server->new(%options);
749              
750             Creates a new PAGI::Server instance. Options:
751              
752             =over 4
753              
754             =item app => \&coderef (required)
755              
756             The PAGI application coderef with signature: async sub ($scope, $receive, $send)
757              
758             =item host => $host
759              
760             Bind address (IP address or hostname). Default: C<'127.0.0.1'>
761              
762             The default binds only to the loopback interface, accepting connections only
763             from localhost. This is B - development
764             servers won't accidentally be exposed to the network.
765              
766             B
767              
768             '127.0.0.1' - Localhost only (default, secure for development)
769             '0.0.0.0' - All IPv4 interfaces (required for remote access)
770             '::' - All IPv6 interfaces (may also accept IPv4)
771             '192.168.1.100' - Specific interface only
772              
773             B where remote clients need
774             to connect, bind to all interfaces:
775              
776             my $server = PAGI::Server->new(
777             app => $app,
778             host => '0.0.0.0',
779             port => 8080,
780             );
781              
782             B When binding to C<0.0.0.0>, ensure appropriate firewall
783             rules are in place. For production, consider a reverse proxy (nginx, etc.)
784              
785             =item port => $port
786              
787             Bind port. Default: 5000
788              
789             =item socket => $path
790              
791             B Unix domain socket path for listening instead of TCP host:port.
792             B with C, C, and C.
793             See L for details.
794              
795             my $server = PAGI::Server->new(
796             app => $app,
797             socket => '/tmp/pagi.sock',
798             );
799              
800             =item socket_mode => $mode
801              
802             Set file permissions on the Unix domain socket after creation. The value
803             should be a numeric mode (e.g., C<0660>). If not specified, the socket
804             inherits the default permissions from the process umask. Silently ignored
805             if C is not set.
806              
807             my $server = PAGI::Server->new(
808             app => $app,
809             socket => '/tmp/pagi.sock',
810             socket_mode => 0660,
811             );
812              
813             =item listen => \@specs
814              
815             B Array of listener specifications for multi-endpoint listening.
816             Each spec is a hashref with either C<< { host, port } >> for TCP or
817             C<< { socket, socket_mode } >> for Unix domain sockets.
818             B with C, C, C, and C.
819             See L for details.
820              
821             my $server = PAGI::Server->new(
822             app => $app,
823             listen => [
824             { host => '0.0.0.0', port => 8080 },
825             { socket => '/tmp/pagi.sock', socket_mode => 0660 },
826             ],
827             );
828              
829             =item ssl => \%config
830              
831             Optional TLS/HTTPS configuration. B - see
832             L below.
833              
834             Configuration keys:
835              
836             =over 4
837              
838             =item cert_file => $path
839              
840             Path to the SSL certificate file (PEM format).
841              
842             =item key_file => $path
843              
844             Path to the SSL private key file (PEM format).
845              
846             =item ca_file => $path
847              
848             Optional path to CA certificate for client verification.
849              
850             =item verify_client => $bool
851              
852             If true, require and verify client certificates.
853              
854             =item min_version => $version
855              
856             Minimum TLS version. Default: C<'TLSv1_2'>. Options: C<'TLSv1_2'>, C<'TLSv1_3'>.
857              
858             =item cipher_list => $string
859              
860             OpenSSL cipher list. Default uses modern secure ciphers.
861              
862             =back
863              
864             Example:
865              
866             my $server = PAGI::Server->new(
867             app => $app,
868             ssl => {
869             cert_file => '/path/to/server.crt',
870             key_file => '/path/to/server.key',
871             },
872             );
873              
874             =item disable_tls => $bool
875              
876             Force-disable TLS even if ssl config is provided. Useful for testing
877             TLS configuration parsing without actually enabling TLS. Default: false.
878              
879             =item extensions => \%extensions
880              
881             Extensions to advertise (e.g., { fullflush => {} })
882              
883             =item on_error => \&callback
884              
885             Error callback receiving ($error)
886              
887             =item access_log => $filehandle | undef
888              
889             Access log filehandle. Default: STDERR
890              
891             Set to C to disable access logging entirely. This eliminates
892             per-request I/O overhead, improving throughput by 5-15% depending on
893             workload. Useful for benchmarking or when access logs are handled
894             externally (e.g., by a reverse proxy).
895              
896             # Disable access logging
897             my $server = PAGI::Server->new(
898             app => $app,
899             access_log => undef,
900             );
901              
902             =item access_log_format => $format_or_preset
903              
904             Access log format string or preset name. Default: C<'clf'>
905              
906             Named presets:
907              
908             clf - PAGI default: IP, timestamp, method/path, status, duration
909             combined - Apache combined: adds Referer and User-Agent
910             common - Apache common: adds response size
911             tiny - Minimal: method, path, status, duration
912              
913             Custom format strings use Apache-style atoms. See L.
914              
915             my $server = PAGI::Server->new(
916             app => $app,
917             access_log_format => 'combined',
918             );
919              
920             =item log_level => $level
921              
922             Controls the verbosity of server log messages. Default: 'info'
923              
924             Valid levels (from least to most verbose):
925              
926             =over 4
927              
928             =item * B - Only errors (application errors, fatal conditions)
929              
930             =item * B - Warnings and errors (connection issues, timeouts)
931              
932             =item * B - Informational messages and above (startup, shutdown, worker spawning)
933              
934             =item * B - Everything (verbose diagnostics, frame-level details)
935              
936             =back
937              
938             my $server = PAGI::Server->new(
939             app => $app,
940             log_level => 'debug', # Very verbose
941             );
942              
943             B C<--log-level debug>
944              
945             =item workers => $count
946              
947             Number of worker processes for multi-worker mode. Default: 0 (single process mode).
948              
949             When set to a value greater than 0, the server uses a pre-fork model:
950              
951             =item listener_backlog => $number
952              
953             Value for the listener queue size. Default: 2048
954              
955             When in multi worker mode, the queue size for those workers inherits
956             from this value.
957              
958             =item reuseport => $bool
959              
960             Enable SO_REUSEPORT mode for multi-worker servers. Default: 0 (disabled).
961              
962             When enabled, each worker process creates its own listening socket with
963             SO_REUSEPORT, allowing the kernel to load-balance incoming connections
964             across workers. This can reduce accept() contention and improve p99
965             latency under high concurrency.
966              
967             B Parent creates one socket before forking,
968             all workers inherit and share that socket. Workers compete on a single
969             accept queue (potential thundering herd).
970              
971             B Each worker creates its own socket with
972             SO_REUSEPORT. The kernel distributes connections across sockets, each
973             worker has its own accept queue (reduced contention).
974              
975             B
976              
977             =over 4
978              
979             =item * B: Full kernel-level load balancing. Recommended for high
980             concurrency workloads.
981              
982             =item * B: SO_REUSEPORT allows multiple binds but does NOT provide
983             kernel load balancing. May actually decrease performance compared to shared
984             socket mode. Use with caution - benchmark before deploying.
985              
986             =back
987              
988             =item max_receive_queue => $count
989              
990             Maximum number of messages that can be queued in the WebSocket receive queue
991             before the connection is closed. This is a DoS protection mechanism.
992              
993             B Message count (not bytes). Each WebSocket text or binary frame counts
994             as one message regardless of size.
995              
996             B 1000 messages
997              
998             B The server sends a WebSocket close frame with code 1008
999             (Policy Violation) and reason "Message queue overflow", then closes the
1000             connection.
1001              
1002             B
1003              
1004             =over 4
1005              
1006             =item * B Each queued message holds the full message payload.
1007             With default of 1000 messages and average 1KB messages, worst case is ~1MB
1008             per slow connection.
1009              
1010             =item * B Total memory risk = workers × max_connections × max_receive_queue × avg_message_size.
1011             For 4 workers, 100 connections each, 1000 queue, 1KB average = 400MB worst case.
1012              
1013             =item * B If your app processes messages quickly, the queue
1014             rarely grows. Default of 1000 is generous for most applications.
1015              
1016             =item * B If your app does expensive processing per message,
1017             consider lowering to 100-500 to limit memory exposure.
1018              
1019             =item * B If you have trusted clients sending rapid bursts,
1020             you may increase to 5000-10000, but monitor memory usage.
1021              
1022             =back
1023              
1024             B C<--max-receive-queue 500>
1025              
1026             =item max_ws_frame_size => $bytes
1027              
1028             Maximum size in bytes for a single WebSocket frame payload. When a client
1029             sends a frame larger than this limit, the connection is closed with a
1030             protocol error.
1031              
1032             B Bytes
1033              
1034             B 65536 (64KB) - matches Protocol::WebSocket default
1035              
1036             B The server closes the connection. The error is logged as
1037             "PAGI connection error: Payload is too big."
1038              
1039             B
1040              
1041             =over 4
1042              
1043             =item * B For chat apps or control messages, default 64KB is plenty.
1044              
1045             =item * B For binary data transfer via WebSocket, increase to 1MB-16MB
1046             depending on expected file sizes.
1047              
1048             =item * B Each connection can buffer up to max_ws_frame_size bytes
1049             during frame parsing. High values increase memory per connection.
1050              
1051             =item * B Lower values limit memory exhaustion from malicious clients
1052             sending oversized frames.
1053              
1054             =back
1055              
1056             B C<--max-ws-frame-size 1048576>
1057              
1058             =item max_connections => $count
1059              
1060             Maximum number of concurrent connections before returning HTTP 503.
1061             B (same as Mojolicious).
1062              
1063             When at capacity, new connections receive a 503 Service Unavailable
1064             response with a Retry-After header. This prevents resource exhaustion
1065             under heavy load.
1066              
1067             B
1068              
1069             my $server = PAGI::Server->new(
1070             app => $app,
1071             max_connections => 5000, # Higher limit for production
1072             );
1073              
1074             B C<--max-connections 5000>
1075              
1076             B Use C<< $server->connection_count >> and
1077             C<< $server->effective_max_connections >> to monitor usage.
1078              
1079             B
1080              
1081             For high-traffic production deployments, you'll likely want to increase
1082             this value. The optimal setting depends on your workload, available
1083             memory, and system file descriptor limits.
1084              
1085             I
1086              
1087             # Check current limits
1088             ulimit -n # Soft limit (per-process)
1089             cat /proc/sys/fs/file-max # System-wide limit
1090              
1091             # Increase for current session
1092             ulimit -n 65536
1093              
1094             # Permanent: add to /etc/security/limits.conf
1095             * soft nofile 65536
1096             * hard nofile 65536
1097              
1098             # Or for systemd services, in your unit file:
1099             [Service]
1100             LimitNOFILE=65536
1101              
1102             I
1103              
1104             # Check current limits
1105             ulimit -n # Soft limit
1106             sysctl kern.maxfilesperproc # Per-process max
1107              
1108             # Increase for current session
1109             ulimit -n 65536
1110              
1111             # Permanent: add to /etc/launchd.conf or use launchctl
1112             sudo launchctl limit maxfiles 65536 200000
1113              
1114             I Set C to roughly 80% of your file
1115             descriptor limit to leave headroom for database connections, log files,
1116             and other resources.
1117              
1118             =item write_high_watermark => $bytes
1119              
1120             B Maximum bytes to buffer in the socket write queue
1121             before applying backpressure. When exceeded, C<< $send->() >> calls will
1122             pause until the buffer drains below C.
1123             Default: 65536 (64KB).
1124              
1125             This prevents unbounded memory growth when the server writes data faster
1126             than the client can receive it. The default matches Python's asyncio
1127             transport defaults, providing a good balance between throughput and
1128             memory efficiency.
1129              
1130             B
1131              
1132             =over 4
1133              
1134             =item * B (e.g., 256KB-1MB) for high-throughput bulk transfers
1135             where you want fewer context switches and higher throughput at the cost
1136             of more per-connection memory.
1137              
1138             =item * B (e.g., 16KB-32KB) if supporting many concurrent
1139             connections where memory efficiency is critical.
1140              
1141             =back
1142              
1143             B
1144              
1145             # High-throughput file server - larger buffers
1146             my $server = PAGI::Server->new(
1147             app => $app,
1148             write_high_watermark => 262144, # 256KB
1149             write_low_watermark => 65536, # 64KB
1150             );
1151              
1152             =item write_low_watermark => $bytes
1153              
1154             B Threshold below which sending resumes after
1155             backpressure was applied. Must be less than or equal to
1156             C. Default: 16384 (16KB, which is high/4).
1157              
1158             A larger gap between high and low watermarks reduces oscillation
1159             (frequent pause/resume cycles). A smaller gap provides more responsive
1160             backpressure but may increase context switching.
1161              
1162             B
1163              
1164             # Minimize oscillation with wider gap
1165             my $server = PAGI::Server->new(
1166             app => $app,
1167             write_high_watermark => 131072, # 128KB
1168             write_low_watermark => 16384, # 16KB (8:1 ratio)
1169             );
1170              
1171             =item max_body_size => $bytes
1172              
1173             Maximum request body size in bytes. Default: 10,000,000 (10MB).
1174             Set to 0 for unlimited (not recommended for public-facing servers).
1175              
1176             Requests with Content-Length exceeding this limit receive HTTP 413
1177             (Payload Too Large). Chunked requests are also checked as data arrives.
1178              
1179             B
1180              
1181             my $server = PAGI::Server->new(
1182             app => $app,
1183             max_body_size => 50_000_000, # 50MB for file uploads
1184             );
1185              
1186             # Unlimited (use with caution)
1187             my $server = PAGI::Server->new(
1188             app => $app,
1189             max_body_size => 0,
1190             );
1191              
1192             B C<--max-body-size 50000000>
1193              
1194             B Without a body size limit, attackers can exhaust server
1195             memory with large requests. The 10MB default balances security with common
1196             use cases (file uploads, JSON payloads). Increase for specific needs, or
1197             use 0 only behind a reverse proxy that enforces its own limit.
1198              
1199             =over 4
1200              
1201             =item * A listening socket is created before forking
1202              
1203             =item * Worker processes are spawned using C<< $loop->fork() >> which properly
1204             handles IO::Async's C<$ONE_TRUE_LOOP> singleton
1205              
1206             =item * Each worker gets a fresh event loop and runs lifespan startup independently
1207              
1208             =item * Workers that exit are automatically respawned via C<< $loop->watch_process() >>
1209              
1210             =item * SIGTERM/SIGINT triggers graceful shutdown of all workers
1211              
1212             =back
1213              
1214             =item sync_file_threshold => $bytes
1215              
1216             Threshold in bytes for synchronous file reads. Files smaller than this value
1217             are read synchronously in the event loop; larger files use async I/O via
1218             a worker pool.
1219              
1220             B 65536 (64KB)
1221              
1222             Set to 0 for fully async file reads. This is recommended for:
1223              
1224             =over 4
1225              
1226             =item * Network filesystems (NFS, SMB, cloud storage)
1227              
1228             =item * High-latency storage (spinning disks under load)
1229              
1230             =item * Docker volumes with overlay filesystem
1231              
1232             =back
1233              
1234             The default (64KB) is optimized for local SSDs where small synchronous reads
1235             are faster than the overhead of async I/O.
1236              
1237             B C<--sync-file-threshold NUM>
1238              
1239             =item max_requests => $count
1240              
1241             Maximum number of requests a worker process will handle before restarting.
1242             After serving this many requests, the worker gracefully shuts down and the
1243             parent spawns a replacement.
1244              
1245             B 0 (disabled - workers run indefinitely)
1246              
1247             B
1248              
1249             =over 4
1250              
1251             =item * Long-running deployments where gradual memory growth is a concern
1252              
1253             =item * Applications with known memory leaks that can't be easily fixed
1254              
1255             =item * Defense against slow memory growth (~6.5 bytes/request observed in PAGI)
1256              
1257             =back
1258              
1259             B Only applies in multi-worker mode (C<< workers > 0 >>). In single-worker
1260             mode, this setting is ignored.
1261              
1262             B C<--max-requests 10000>
1263              
1264             Example: With 4 workers and max_requests=10000, total capacity before any
1265             restart is 40,000 requests. Workers restart individually without downtime.
1266              
1267             =item timeout => $seconds
1268              
1269             Connection idle timeout in seconds. Closes connections that are idle between
1270             requests (applies to keep-alive connections waiting for the next request).
1271              
1272             B 60
1273              
1274             B Each connection with a non-zero timeout creates a timer
1275             that is reset on every read event. For maximum throughput in high-performance
1276             scenarios, set C 0> to disable the idle timer entirely. This
1277             eliminates timer management overhead but means idle connections will never
1278             be automatically closed.
1279              
1280             B
1281              
1282             # Disable idle timeout for maximum performance
1283             my $server = PAGI::Server->new(
1284             app => $app,
1285             timeout => 0,
1286             );
1287              
1288             # Short timeout to reclaim connections quickly
1289             my $server = PAGI::Server->new(
1290             app => $app,
1291             timeout => 30,
1292             );
1293              
1294             B C<--timeout 0> or C<--timeout 30>
1295              
1296             B This differs from C (stall timeout during active
1297             request processing). The C applies between requests; the
1298             C applies during a request. For WebSocket and SSE, use
1299             C and C respectively.
1300              
1301             =item request_timeout => $seconds
1302              
1303             Maximum time in seconds a request can stall without any I/O activity before
1304             being terminated. This is a "stall timeout" - the timer resets whenever data
1305             is read from the client or written to the client.
1306              
1307             B 0 (disabled)
1308              
1309             B Creating per-request timers adds overhead that
1310             impacts throughput on high-performance workloads. For maximum performance,
1311             this is disabled by default. Most production deployments run behind a reverse
1312             proxy (nginx, haproxy) which provides its own timeout protection.
1313              
1314             B
1315              
1316             =over 4
1317              
1318             =item * Running PAGI directly without a reverse proxy
1319              
1320             =item * Small/internal apps where simplicity matters more than max throughput
1321              
1322             =item * Untrusted clients that might send data slowly or hang
1323              
1324             =item * Defense against application bugs that cause requests to hang indefinitely
1325              
1326             =back
1327              
1328             B
1329              
1330             =over 4
1331              
1332             =item * Timer starts when request processing begins
1333              
1334             =item * Timer resets on any read activity (receiving request body)
1335              
1336             =item * Timer resets on any write activity (sending response)
1337              
1338             =item * If timer expires (no I/O for N seconds), connection is closed
1339              
1340             =item * Not used for WebSocket/SSE (they have C/C)
1341              
1342             =back
1343              
1344             B
1345              
1346             # Enable 30 second stall timeout (recommended when not behind proxy)
1347             my $server = PAGI::Server->new(
1348             app => $app,
1349             request_timeout => 30,
1350             );
1351              
1352             B C<--request-timeout 30>
1353              
1354             B This differs from C (idle connection timeout). The
1355             C applies between requests on keep-alive connections. The
1356             C applies during active request processing.
1357              
1358             =item ws_idle_timeout => $seconds
1359              
1360             Maximum time in seconds a WebSocket connection can be idle without any
1361             activity (no messages sent or received) before being closed.
1362              
1363             B 0 (disabled - WebSocket connections can be idle indefinitely)
1364              
1365             When enabled, the timer resets on:
1366              
1367             =over 4
1368              
1369             =item * Sending any WebSocket frame (accept, send, ping, close)
1370              
1371             =item * Receiving any WebSocket frame from client
1372              
1373             =back
1374              
1375             B
1376              
1377             # Close idle WebSocket connections after 5 minutes
1378             my $server = PAGI::Server->new(
1379             app => $app,
1380             ws_idle_timeout => 300,
1381             );
1382              
1383             B C<--ws-idle-timeout 300>
1384              
1385             B For more sophisticated keep-alive behavior with ping/pong, use
1386             C<< $ws->keepalive($interval, $timeout) >> for protocol-level ping/pong.
1387              
1388             =item sse_idle_timeout => $seconds
1389              
1390             Maximum time in seconds an SSE connection can be idle without any events
1391             being sent before being closed.
1392              
1393             B 0 (disabled - SSE connections can be idle indefinitely)
1394              
1395             The timer resets each time an event is sent to the client (including
1396             comments and the initial headers).
1397              
1398             B
1399              
1400             # Close idle SSE connections after 2 minutes
1401             my $server = PAGI::Server->new(
1402             app => $app,
1403             sse_idle_timeout => 120,
1404             );
1405              
1406             B C<--sse-idle-timeout 120>
1407              
1408             B For SSE connections that may be legitimately idle, use
1409             C<< $sse->keepalive($interval) >> to send periodic comment keepalives.
1410              
1411             B Over HTTP/2 this timeout applies at the connection level,
1412             not per-stream. See L
1413             for details and recommendations.
1414              
1415             =item heartbeat_timeout => $seconds
1416              
1417             Worker liveness timeout in seconds. Only active in multi-worker mode
1418             (C<< workers >= 2 >>). Has no effect in single-worker mode — use
1419             C for idle connection management there.
1420              
1421             Each worker sends a heartbeat to the parent process via a Unix pipe at
1422             an interval of C. The parent checks for missed
1423             heartbeats every C. If a worker has not sent a
1424             heartbeat within C seconds, the parent kills it with
1425             SIGKILL and respawns a replacement.
1426              
1427             B Event loop starvation — when the worker's event
1428             loop is completely blocked and cannot process any events. This happens
1429             with blocking syscalls (C, synchronous DNS, blocking database
1430             drivers), deadlocks, runaway CPU-bound computation, or any code that
1431             does not yield to the event loop.
1432              
1433             B Slow async operations. A request handler
1434             that does C<< await $db->query(...) >> for 5 minutes is fine — the
1435             C returns control to the event loop, so heartbeats continue
1436             normally. This value should be larger than the maximum time you expect
1437             any single operation to block the event loop without yielding.
1438              
1439             B 50 (seconds). Set to 0 to disable.
1440              
1441             B
1442              
1443             # Tighter heartbeat for latency-sensitive service
1444             my $server = PAGI::Server->new(
1445             app => $app,
1446             workers => 4,
1447             heartbeat_timeout => 20,
1448             );
1449              
1450             # Disable heartbeat monitoring
1451             my $server = PAGI::Server->new(
1452             app => $app,
1453             workers => 4,
1454             heartbeat_timeout => 0,
1455             );
1456              
1457             B C<--heartbeat-timeout 20>
1458              
1459             =item loop_type => $backend
1460              
1461             Specifies the IO::Async::Loop subclass to use when calling C.
1462             This option is ignored when embedding the server in an existing loop.
1463              
1464             B Auto-detect (IO::Async chooses the best available backend)
1465              
1466             B
1467              
1468             'EPoll' - Linux epoll (recommended for Linux)
1469             'EV' - libev-based (cross-platform, requires EV module)
1470             'Poll' - POSIX poll() (portable fallback)
1471             'Select' - select() (most portable, least scalable)
1472              
1473             B
1474              
1475             my $server = PAGI::Server->new(
1476             app => $app,
1477             loop_type => 'EPoll',
1478             );
1479             $server->run;
1480              
1481             B C<--loop EPoll> (via pagi-server)
1482              
1483             B The specified backend module must be installed. For example,
1484             C 'EPoll'> requires L.
1485              
1486             =item h2_max_concurrent_streams => $count
1487              
1488             B<(Experimental - HTTP/2 support may change in future releases.)>
1489              
1490             Maximum number of concurrent HTTP/2 streams per connection. Each stream
1491             represents an in-flight request/response exchange. Limits resource consumption
1492             and provides protection against rapid-reset attacks.
1493              
1494             B 100
1495              
1496             This matches Apache httpd, H2O, and Hypercorn defaults. The RFC 7540 default
1497             is unlimited, but 100 is the industry consensus for a safe maximum.
1498              
1499             B Increase for API gateways handling many small concurrent requests.
1500             Decrease for memory-constrained environments or when each request is expensive.
1501              
1502             =item h2_initial_window_size => $bytes
1503              
1504             B<(Experimental)>
1505              
1506             Initial HTTP/2 flow control window size per stream, in bytes. Controls how
1507             much data a client can send before the server must acknowledge receipt. Also
1508             affects how much response data the server can buffer per stream before the
1509             client acknowledges.
1510              
1511             B 65535 (64KB minus 1, the RFC 7540 default)
1512              
1513             B Increase to 131072-262144 for high-throughput file upload/download
1514             workloads where the default window causes flow control stalls on high-latency
1515             connections. The tradeoff is higher per-stream memory usage.
1516              
1517             =item h2_max_frame_size => $bytes
1518              
1519             B<(Experimental)>
1520              
1521             Maximum size of a single HTTP/2 frame payload, in bytes. Must be between
1522             16384 (16KB, the RFC minimum) and 16777215 (16MB, the RFC maximum).
1523              
1524             B 16384 (16KB, the RFC 7540 default)
1525              
1526             Most servers use the RFC default. Larger frames reduce framing overhead but
1527             increase head-of-line blocking within a stream.
1528              
1529             =item h2_enable_push => $bool
1530              
1531             B<(Experimental)>
1532              
1533             Enable HTTP/2 server push (SETTINGS_ENABLE_PUSH). When enabled, the server
1534             can proactively push resources to the client before they are requested.
1535              
1536             B 0 (disabled)
1537              
1538             Server push is effectively deprecated. Chrome removed support in 2022,
1539             and nginx deprecated it in version 1.25.1. Unless you have a specific use
1540             case requiring server push, leave this disabled.
1541              
1542             =item h2_enable_connect_protocol => $bool
1543              
1544             B<(Experimental)>
1545              
1546             Enable the Extended CONNECT protocol (RFC 8441, SETTINGS_ENABLE_CONNECT_PROTOCOL).
1547             Required for WebSocket-over-HTTP/2 tunneling.
1548              
1549             B 1 (enabled)
1550              
1551             When enabled, clients can use the Extended CONNECT method with a C<:protocol>
1552             pseudo-header to establish WebSocket connections over HTTP/2 streams. Disable
1553             this only if you do not need WebSocket support over HTTP/2.
1554              
1555             =item h2_max_header_list_size => $bytes
1556              
1557             B<(Experimental)>
1558              
1559             Maximum total size of the header block that the server will accept, in bytes.
1560             This is the sum of all header name lengths, value lengths, and 32-byte per-entry
1561             overhead as defined by RFC 7540 Section 6.5.2.
1562              
1563             B 65536 (64KB)
1564              
1565             Matches Hypercorn and Node.js defaults. Provides a guard against header-based
1566             memory exhaustion attacks while being generous enough for normal use including
1567             large cookies and authorization tokens.
1568              
1569             =back
1570              
1571             =head1 METHODS
1572              
1573             =head2 listen
1574              
1575             my $future = $server->listen;
1576              
1577             Starts listening for connections. Returns a Future that completes when
1578             the server is ready to accept connections.
1579              
1580             =head2 shutdown
1581              
1582             my $future = $server->shutdown;
1583              
1584             Initiates graceful shutdown. Returns a Future that completes when
1585             shutdown is complete.
1586              
1587             =head2 port
1588              
1589             my $port = $server->port;
1590              
1591             Returns the bound port number. Useful when port => 0 is used.
1592              
1593             =head2 socket_path
1594              
1595             my $path = $server->socket_path;
1596              
1597             Returns the Unix socket path of the first Unix socket listener,
1598             or C if no Unix socket listeners are configured.
1599              
1600             =head2 listeners
1601              
1602             my $listeners = $server->listeners;
1603              
1604             Returns an arrayref of all normalized listener specifications.
1605             Each entry is a hashref with C (C<'tcp'> or C<'unix'>)
1606             and type-specific keys (C/C for TCP, C for Unix).
1607              
1608             =head2 is_running
1609              
1610             my $bool = $server->is_running;
1611              
1612             Returns true if the server is accepting connections.
1613              
1614             =head2 connection_count
1615              
1616             my $count = $server->connection_count;
1617              
1618             Returns the current number of active connections.
1619              
1620             =head2 effective_max_connections
1621              
1622             my $max = $server->effective_max_connections;
1623              
1624             Returns the effective maximum connections limit. If C
1625             was set explicitly, returns that value. Otherwise returns the default
1626             of 1000.
1627              
1628             =head1 FILE RESPONSE STREAMING
1629              
1630             PAGI::Server supports efficient file streaming via the C and C
1631             keys in C events:
1632              
1633             # Stream entire file
1634             await $send->({
1635             type => 'http.response.body',
1636             file => '/path/to/file.mp4',
1637             more => 0,
1638             });
1639              
1640             # Stream partial file (for Range requests)
1641             await $send->({
1642             type => 'http.response.body',
1643             file => '/path/to/file.mp4',
1644             offset => 1000,
1645             length => 5000,
1646             more => 0,
1647             });
1648              
1649             # Stream from filehandle
1650             open my $fh, '<:raw', $file;
1651             await $send->({
1652             type => 'http.response.body',
1653             fh => $fh,
1654             length => $size,
1655             more => 0,
1656             });
1657             close $fh;
1658              
1659             The server streams files in 64KB chunks to avoid memory bloat. Small files
1660             (under 64KB) are read synchronously for speed; larger files use async I/O
1661             via a worker pool to avoid blocking the event loop.
1662              
1663             =head2 Production Recommendations for Static Files
1664              
1665             B
1666             serving to a reverse proxy:>
1667              
1668             =over 4
1669              
1670             =item 1. B
1671              
1672             Place a reverse proxy in front of PAGI::Server and let it handle static
1673             files directly. This provides:
1674              
1675             =over 4
1676              
1677             =item * Optimized file serving with kernel sendfile
1678              
1679             =item * Efficient caching and compression
1680              
1681             =item * Protection from slow client attacks
1682              
1683             =item * HTTP/2 and HTTP/3 support
1684              
1685             =back
1686              
1687             =item 2. B>
1688              
1689             For files that require authentication or authorization, use the XSendfile
1690             middleware to delegate file serving to the reverse proxy:
1691              
1692             use PAGI::Middleware::Builder;
1693              
1694             my $app = builder {
1695             enable 'XSendfile',
1696             type => 'X-Accel-Redirect', # For Nginx
1697             mapping => { '/var/www/protected/' => '/internal/' };
1698             $my_app;
1699             };
1700              
1701             See L for details.
1702              
1703             =back
1704              
1705             =head1 ENABLING TLS SUPPORT
1706              
1707             PAGI::Server supports HTTPS/TLS connections, but requires additional modules
1708             that are not installed by default. This keeps the base installation minimal
1709             for users who don't need TLS.
1710              
1711             =head2 When You Need TLS
1712              
1713             You need TLS if you want to:
1714              
1715             =over 4
1716              
1717             =item * Serve HTTPS traffic directly from PAGI::Server
1718              
1719             =item * Test TLS locally during development
1720              
1721             =item * Use client certificate authentication
1722              
1723             =back
1724              
1725             You B need TLS if you:
1726              
1727             =over 4
1728              
1729             =item * Use a reverse proxy (nginx, Apache) that handles TLS termination
1730              
1731             =item * Only serve HTTP traffic on localhost for development
1732              
1733             =item * Deploy behind a load balancer that provides TLS
1734              
1735             =back
1736              
1737             B Use a reverse proxy (nginx, HAProxy, etc.) for
1738             TLS termination. They offer better performance, easier certificate management,
1739             and battle-tested security. PAGI::Server's TLS support is primarily for
1740             development and testing.
1741              
1742             =head2 Installing TLS Modules
1743              
1744             To enable TLS support, install the required modules:
1745              
1746             B
1747              
1748             cpanm IO::Async::SSL IO::Socket::SSL
1749              
1750             B
1751              
1752             apt-get install libio-socket-ssl-perl
1753              
1754             B
1755              
1756             yum install perl-IO-Socket-SSL
1757              
1758             B
1759              
1760             perl -MIO::Async::SSL -MIO::Socket::SSL -e 'print "TLS modules installed\n"'
1761              
1762             =head2 Basic TLS Configuration
1763              
1764             Once the modules are installed, configure TLS with certificate and key files:
1765              
1766             my $server = PAGI::Server->new(
1767             app => $app,
1768             host => '0.0.0.0',
1769             port => 5000,
1770             ssl => {
1771             cert_file => '/path/to/server.crt',
1772             key_file => '/path/to/server.key',
1773             },
1774             );
1775              
1776             =head2 Generating Self-Signed Certificates (Development)
1777              
1778             For local development and testing, you can generate a self-signed certificate:
1779              
1780             B
1781              
1782             openssl req -x509 -newkey rsa:4096 -nodes \
1783             -keyout server.key -out server.crt -days 365 \
1784             -subj "/CN=localhost"
1785              
1786             B
1787              
1788             # Create config file
1789             cat > ssl.conf <
1790             [req]
1791             distinguished_name = req_distinguished_name
1792             x509_extensions = v3_req
1793             prompt = no
1794              
1795             [req_distinguished_name]
1796             CN = localhost
1797              
1798             [v3_req]
1799             subjectAltName = @alt_names
1800              
1801             [alt_names]
1802             DNS.1 = localhost
1803             DNS.2 = *.localhost
1804             IP.1 = 127.0.0.1
1805             EOF
1806              
1807             # Generate certificate
1808             openssl req -x509 -newkey rsa:4096 -nodes \
1809             -keyout server.key -out server.crt -days 365 \
1810             -config ssl.conf -extensions v3_req
1811              
1812             B
1813              
1814             # Start server
1815             pagi-server --app myapp.pl --ssl-cert server.crt --ssl-key server.key
1816              
1817             # Test with curl (ignore self-signed cert warning)
1818             curl -k https://localhost:5000/
1819              
1820             B
1821              
1822             For production, use certificates from a trusted CA (Let's Encrypt, etc.):
1823              
1824             # Let's Encrypt with certbot
1825             certbot certonly --standalone -d yourdomain.com
1826              
1827             # Then configure PAGI::Server
1828             my $server = PAGI::Server->new(
1829             app => $app,
1830             ssl => {
1831             cert_file => '/etc/letsencrypt/live/yourdomain.com/fullchain.pem',
1832             key_file => '/etc/letsencrypt/live/yourdomain.com/privkey.pem',
1833             },
1834             );
1835              
1836             =head2 Advanced TLS Configuration
1837              
1838             See the C option in L for details on:
1839              
1840             =over 4
1841              
1842             =item * Client certificate verification (C, C)
1843              
1844             =item * TLS version requirements (C)
1845              
1846             =item * Custom cipher suites (C)
1847              
1848             =back
1849              
1850             =head1 ENABLING HTTP/2 SUPPORT (EXPERIMENTAL)
1851              
1852             B The API and behavior may change in future
1853             releases. Please report issues and provide feedback.
1854              
1855             PAGI::Server provides native HTTP/2 support via the nghttp2 C library
1856             (L). When enabled, the server supports both TLS-based
1857             HTTP/2 (h2 via ALPN negotiation) and cleartext HTTP/2 (h2c).
1858              
1859             =head2 Requirements
1860              
1861             L must be installed, which requires the nghttp2 C library:
1862              
1863             # Install the C library
1864             brew install nghttp2 # macOS
1865             apt-get install libnghttp2-dev # Debian/Ubuntu
1866              
1867             # Install the Perl bindings
1868             cpanm Net::HTTP2::nghttp2
1869              
1870             =head2 Enabling HTTP/2
1871              
1872             B
1873              
1874             # HTTP/2 over TLS (recommended for production)
1875             pagi-server --http2 --ssl-cert cert.pem --ssl-key key.pem --app myapp.pl
1876              
1877             # HTTP/2 cleartext (h2c, for development/testing)
1878             pagi-server --http2 --app myapp.pl
1879              
1880             B
1881              
1882             my $server = PAGI::Server->new(
1883             app => $app,
1884             http2 => 1,
1885             ssl => { cert_file => 'cert.pem', key_file => 'key.pem' },
1886             );
1887              
1888             =head2 How It Works
1889              
1890             With TLS, the server advertises C

and C via ALPN during the

1891             TLS handshake. Clients that support HTTP/2 will negotiate C

automatically;

1892             others fall back to HTTP/1.1 transparently.
1893              
1894             Without TLS, the server detects HTTP/2 via the client connection preface
1895             (h2c mode). HTTP/1.1 clients are handled normally.
1896              
1897             =head2 HTTP/2 Features
1898              
1899             =over 4
1900              
1901             =item * Stream multiplexing (100 concurrent streams per connection by default)
1902              
1903             =item * HPACK header compression
1904              
1905             =item * Per-stream and connection-level flow control
1906              
1907             =item * GOAWAY graceful session shutdown
1908              
1909             =item * Stream state validation (RST_STREAM on protocol violations)
1910              
1911             =item * WebSocket over HTTP/2 via Extended CONNECT (RFC 8441)
1912              
1913             =back
1914              
1915             =head2 Conformance
1916              
1917             Tested against h2spec (the HTTP/2 conformance test suite): B<137/146 (93.8%)>.
1918             All 9 remaining failures are shared with the bare nghttp2 C library and cannot
1919             be fixed at the application level.
1920              
1921             Load tested with h2load: 60,000 requests across 50 concurrent connections with
1922             zero failures.
1923              
1924             See L for full compliance details.
1925              
1926             =head2 Configuration
1927              
1928             HTTP/2 protocol settings are tuned via constructor options prefixed with
1929             C. See L for details on:
1930              
1931             =over 4
1932              
1933             =item * C - Max streams per connection (default: 100)
1934              
1935             =item * C - Flow control window (default: 65535)
1936              
1937             =item * C - Max frame payload (default: 16384)
1938              
1939             =item * C - Server push (default: disabled)
1940              
1941             =item * C - WebSocket over HTTP/2 (default: enabled)
1942              
1943             =item * C - Max header block size (default: 65536)
1944              
1945             =back
1946              
1947             =head1 SIGNAL HANDLING
1948              
1949             PAGI::Server responds to Unix signals for process management. Signal behavior
1950             differs between single-worker and multi-worker modes.
1951              
1952             =head2 Supported Signals
1953              
1954             =over 4
1955              
1956             =item B - Graceful shutdown
1957              
1958             Initiates graceful shutdown. The server stops accepting new connections,
1959             waits for active requests to complete (up to C seconds),
1960             then exits. In multi-worker mode, SIGTERM is forwarded to all workers.
1961              
1962             kill -TERM
1963              
1964             =item B - Graceful shutdown (Ctrl-C)
1965              
1966             Same behavior as SIGTERM. Triggered by Ctrl-C in the terminal. In multi-worker
1967             mode, the parent process catches SIGINT and coordinates shutdown of all workers
1968             to ensure proper lifespan.shutdown handling.
1969              
1970             kill -INT
1971             # or press Ctrl-C in terminal
1972              
1973             =item B - Graceful worker restart (multi-worker only)
1974              
1975             Performs a zero-downtime worker restart by spawning new workers before
1976             terminating old ones. Useful for recycling workers to reclaim leaked memory
1977             or reset per-worker state without dropping active connections.
1978              
1979             B This does NOT reload application code. New workers fork from the
1980             existing parent process and inherit the same loaded code. For code deploys,
1981             perform a full server restart (SIGTERM + start).
1982              
1983             kill -HUP
1984              
1985             In single-worker mode, SIGHUP is logged but ignored (no graceful restart
1986             possible without multiple workers).
1987              
1988             =item B - Increase worker count (multi-worker only)
1989              
1990             Spawns an additional worker process. Use this to scale up capacity dynamically.
1991              
1992             kill -TTIN
1993              
1994             =item B - Decrease worker count (multi-worker only)
1995              
1996             Gracefully terminates one worker process. The minimum worker count is 1;
1997             sending SIGTTOU when only one worker remains has no effect.
1998              
1999             kill -TTOU
2000              
2001             =back
2002              
2003             =head2 Signal Handling in Multi-Worker Mode
2004              
2005             When running with C<< workers => N >> (where N > 1):
2006              
2007             =over 4
2008              
2009             =item * Parent process manages the worker pool
2010              
2011             =item * Workers handle requests; parent handles signals
2012              
2013             =item * SIGTERM/SIGINT to parent triggers coordinated shutdown of all workers
2014              
2015             =item * Each worker runs lifespan.shutdown before exiting
2016              
2017             =item * Workers that crash are automatically respawned
2018              
2019             =item * Heartbeat monitoring detects workers with blocked event loops and replaces them automatically (see C)
2020              
2021             =back
2022              
2023             =head2 Examples
2024              
2025             B
2026              
2027             # Deploy new code, then signal graceful restart
2028             kill -HUP $(cat /var/run/pagi.pid)
2029              
2030             B
2031              
2032             # Add workers during peak hours
2033             kill -TTIN $(cat /var/run/pagi.pid)
2034             kill -TTIN $(cat /var/run/pagi.pid)
2035              
2036             # Remove workers during quiet periods
2037             kill -TTOU $(cat /var/run/pagi.pid)
2038              
2039             B
2040              
2041             # Stop accepting new connections, drain existing ones
2042             kill -TERM $(cat /var/run/pagi.pid)
2043              
2044             =cut
2045              
2046             sub _init {
2047 427     427   47011770 my ($self, $params) = @_;
2048              
2049 427 100       3857 $self->{app} = delete $params->{app} or die "app is required";
2050              
2051             # Extract listener-related params
2052 426         1581 my $listen = delete $params->{listen};
2053 426         1496 my $socket = delete $params->{socket};
2054 426         1483 my $socket_mode = delete $params->{socket_mode};
2055 426         1399 my $host = delete $params->{host};
2056 426         1823 my $port = delete $params->{port};
2057 426         1366 $self->{ssl} = delete $params->{ssl};
2058 426   100     4055 $self->{disable_tls} = delete $params->{disable_tls} // 0; # Extract early for validation
2059              
2060             # Validate SSL certificate files at startup (fail fast)
2061             # Skip validation if TLS is explicitly disabled
2062 426 100       1817 if (my $ssl = $self->{ssl}) {
2063 56 100       234 if ($self->{disable_tls}) {
2064             # Skip TLS setup and cert validation — ssl config is stored but not applied
2065 2         40 warn "PAGI::Server: TLS disabled via disable_tls option, ssl config ignored\n";
2066             } else {
2067 54 100       498 if (my $cert = $ssl->{cert_file}) {
2068 52 100       3169 die "SSL certificate file not found: $cert\n" unless -e $cert;
2069 50 50       744 die "SSL certificate file not readable: $cert\n" unless -r $cert;
2070             }
2071 52 100       503 if (my $key = $ssl->{key_file}) {
2072 51 100       667 die "SSL key file not found: $key\n" unless -e $key;
2073 50 50       552 die "SSL key file not readable: $key\n" unless -r $key;
2074             }
2075 51 100       354 if (my $ca = $ssl->{ca_file}) {
2076 9 100       277 die "SSL CA file not found: $ca\n" unless -e $ca;
2077 8 50       124 die "SSL CA file not readable: $ca\n" unless -r $ca;
2078             }
2079             }
2080             }
2081              
2082             # Normalize all listener forms to $self->{listeners}
2083 422 100       2464 if ($listen) {
    100          
2084             # Explicit listen array
2085 25 100       284 die "Cannot specify both 'listen' and 'host' options\n" if defined $host;
2086 23 50       255 die "Cannot specify both 'listen' and 'port' options\n" if defined $port;
2087 23 50       281 die "Cannot specify both 'listen' and 'socket' options\n" if defined $socket;
2088 23 50       259 die "Cannot specify both 'listen' and 'socket_mode' options\n" if defined $socket_mode;
2089 23 100 66     665 die "'listen' must be a non-empty arrayref\n"
2090             unless ref $listen eq 'ARRAY' && @$listen;
2091              
2092 21         254 $self->{listeners} = [];
2093 21         354 for my $spec (@$listen) {
2094 29 50       284 die "Each listen spec must be a hashref\n" unless ref $spec eq 'HASH';
2095 29 100       254 if ($spec->{socket}) {
2096 10 100       44 die "Cannot specify both 'socket' and 'host' in a listen spec\n" if $spec->{host};
2097 8 50       14 die "Cannot specify both 'socket' and 'port' in a listen spec\n" if $spec->{port};
2098 8         41 push @{$self->{listeners}}, {
2099             type => 'unix',
2100             path => $spec->{socket},
2101             socket_mode => $spec->{socket_mode},
2102 8         11 };
2103             } else {
2104             die "TCP listen spec requires both 'host' and 'port'\n"
2105 19 100 66     494 unless defined $spec->{host} && defined $spec->{port};
2106 17         397 push @{$self->{listeners}}, {
2107             type => 'tcp',
2108             host => $spec->{host},
2109             port => $spec->{port},
2110 17         94 };
2111             }
2112             }
2113 17         100 $self->{host} = undef;
2114 17         95 $self->{port} = undef;
2115             } elsif (defined $socket) {
2116             # Socket sugar
2117 20 100       57 die "Cannot specify both 'socket' and 'host' options\n" if defined $host;
2118 18 100       48 die "Cannot specify both 'socket' and 'port' options\n" if defined $port;
2119             $self->{listeners} = [{
2120 16         117 type => 'unix',
2121             path => $socket,
2122             socket_mode => $socket_mode,
2123             }];
2124 16         24 $self->{host} = undef;
2125 16         38 $self->{port} = undef;
2126             } else {
2127             # Host/port sugar (backward compatible default)
2128 377   100     1406 $host //= '127.0.0.1';
2129 377   100     1448 $port //= 5000;
2130             $self->{listeners} = [{
2131 377         3365 type => 'tcp',
2132             host => $host,
2133             port => $port,
2134             }];
2135 377         1010 $self->{host} = $host;
2136 377         1475 $self->{port} = $port;
2137             }
2138              
2139             # Apply server-wide SSL to all TCP listeners
2140 410 100       1640 if ($self->{ssl}) {
2141 52         95 for my $listener (@{$self->{listeners}}) {
  52         300  
2142 52 50       227 if ($listener->{type} eq 'tcp') {
2143 52         234 $listener->{ssl} = $self->{ssl};
2144             }
2145             }
2146             }
2147              
2148 410   100     9320 $self->{extensions} = delete $params->{extensions} // {};
2149 410   66 0   5056 $self->{on_error} = delete $params->{on_error} // sub { warn @_ };
  0         0  
2150 410 100       2417 $self->{access_log} = exists $params->{access_log} ? delete $params->{access_log} : \*STDERR;
2151 410   100     3457 $self->{access_log_format} = delete $params->{access_log_format} // 'clf';
2152             $self->{_access_log_formatter} = $self->_compile_access_log_format(
2153             $self->{access_log_format}
2154 410         3168 );
2155 410   100     1867 $self->{quiet} = delete $params->{quiet} // 0;
2156 410   100     2798 $self->{log_level} = delete $params->{log_level} // 'info';
2157             # Validate log level
2158 410         3563 my %valid_levels = (debug => 1, info => 2, warn => 3, error => 4);
2159             die "Invalid log_level '$self->{log_level}' - must be one of: debug, info, warn, error\n"
2160 410 50       1859 unless $valid_levels{$self->{log_level}};
2161 410         1503 $self->{_log_level_num} = $valid_levels{$self->{log_level}};
2162 410   100     2791 $self->{timeout} = delete $params->{timeout} // 60; # Connection idle timeout (seconds)
2163 410   100     2920 $self->{max_header_size} = delete $params->{max_header_size} // 8192; # Max header size in bytes
2164 410   100     5158 $self->{max_header_count} = delete $params->{max_header_count} // 100; # Max number of headers
2165 410   100     2543 $self->{max_body_size} = delete $params->{max_body_size} // 10_000_000; # Max body size in bytes (10MB default, 0 = unlimited)
2166 410   100     2238 $self->{workers} = delete $params->{workers} // 0; # Number of worker processes (0 = single process)
2167 410   100     2182 $self->{max_requests} = delete $params->{max_requests} // 0; # 0 = unlimited
2168 410   50     1836 $self->{listener_backlog} = delete $params->{listener_backlog} // 2048; # Listener queue size
2169 410   100     2175 $self->{shutdown_timeout} = delete $params->{shutdown_timeout} // 30; # Graceful shutdown timeout (seconds)
2170 410   50     2023 $self->{reuseport} = delete $params->{reuseport} // 0; # SO_REUSEPORT mode for multi-worker
2171 410   100     2017 $self->{max_receive_queue} = delete $params->{max_receive_queue} // 1000; # Max WebSocket receive queue size (messages)
2172 410   100     2085 $self->{max_ws_frame_size} = delete $params->{max_ws_frame_size} // 65536; # Max WebSocket frame size in bytes (64KB default)
2173 410   100     2025 $self->{max_connections} = delete $params->{max_connections} // 0; # 0 = use default (1000)
2174 410   100     1798 $self->{sync_file_threshold} = delete $params->{sync_file_threshold} // 65536; # Threshold for sync file reads (0=always async)
2175 410   100     2062 $self->{request_timeout} = delete $params->{request_timeout} // 0; # Request stall timeout in seconds (0 = disabled, default for performance)
2176 410   100     1919 $self->{ws_idle_timeout} = delete $params->{ws_idle_timeout} // 0; # WebSocket idle timeout (0 = disabled)
2177 410   100     1952 $self->{sse_idle_timeout} = delete $params->{sse_idle_timeout} // 0; # SSE idle timeout (0 = disabled)
2178 410   50     1762 $self->{heartbeat_timeout} = delete $params->{heartbeat_timeout} // 50; # Worker heartbeat timeout (0 = disabled)
2179 410   100     1950 $self->{write_high_watermark} = delete $params->{write_high_watermark} // 65536; # 64KB - pause sending above this
2180 410   100     1918 $self->{write_low_watermark} = delete $params->{write_low_watermark} // 16384; # 16KB - resume sending below this
2181 410         1041 $self->{loop_type} = delete $params->{loop_type}; # Optional loop backend (EPoll, EV, Poll, etc.)
2182 410 100       1384 if (my $lt = $self->{loop_type}) {
2183 2 50       53 die "Invalid loop_type '$lt': must contain only letters, digits, and ::\n"
2184             unless $lt =~ /\A[A-Za-z][A-Za-z0-9_]*(?:::[A-Za-z][A-Za-z0-9_]*)*\z/;
2185             }
2186             # Dev-mode event validation: explicit flag, or auto-enable in development mode
2187             $self->{validate_events} = delete $params->{validate_events}
2188 408 50 100     3946 // (($ENV{PAGI_ENV} // '') eq 'development' ? 1 : 0);
      33        
2189              
2190             # HTTP/2 support (opt-in, experimental)
2191 408   100     2663 $self->{http2} = delete $params->{http2} // $ENV{_PAGI_SERVER_HTTP2} // 0;
      100        
2192              
2193             # HTTP/2 protocol settings (only used when http2 is enabled)
2194 408   100     1929 my $h2_max_concurrent_streams = delete $params->{h2_max_concurrent_streams} // 100;
2195 408   100     1539 my $h2_initial_window_size = delete $params->{h2_initial_window_size} // 65535;
2196 408   100     1452 my $h2_max_frame_size = delete $params->{h2_max_frame_size} // 16384;
2197 408   100     1585 my $h2_enable_push = delete $params->{h2_enable_push} // 0;
2198 408   100     1616 my $h2_enable_connect_protocol = delete $params->{h2_enable_connect_protocol} // 1;
2199 408   100     1614 my $h2_max_header_list_size = delete $params->{h2_max_header_list_size} // 65536;
2200              
2201 408         1120 $self->{running} = 0;
2202 408         993 $self->{bound_port} = undef;
2203 408         1162 $self->{listener} = undef;
2204 408         1230 $self->{connections} = {}; # Hash keyed by refaddr for O(1) add/remove
2205             $self->{protocol} = PAGI::Server::Protocol::HTTP1->new(
2206             max_header_size => $self->{max_header_size},
2207             max_header_count => $self->{max_header_count},
2208 408         6692 );
2209 408         1250 $self->{state} = {}; # Shared state from lifespan
2210 408         2212 $self->{worker_pids} = {}; # Track worker PIDs in multi-worker mode
2211 408         14144 $self->{is_worker} = 0; # True if this is a worker process
2212              
2213             # Initialize HTTP/2 protocol handler if enabled and available
2214 408 100       1288 if ($self->{http2}) {
2215 88 50       239 if ($HTTP2_AVAILABLE) {
2216 88         851 $self->{http2_protocol} = PAGI::Server::Protocol::HTTP2->new(
2217             max_concurrent_streams => $h2_max_concurrent_streams,
2218             initial_window_size => $h2_initial_window_size,
2219             max_frame_size => $h2_max_frame_size,
2220             enable_push => $h2_enable_push,
2221             enable_connect_protocol => $h2_enable_connect_protocol,
2222             max_header_list_size => $h2_max_header_list_size,
2223             );
2224 88         284 $self->{http2_enabled} = 1;
2225              
2226             # h2c mode: HTTP/2 over cleartext (no TLS)
2227 88 100       224 if (!$self->{ssl}) {
2228 80         196 $self->{h2c_enabled} = 1;
2229             }
2230             } else {
2231 0         0 die <<"END_HTTP2_ERROR";
2232             HTTP/2 support requested but Net::HTTP2::nghttp2 is not installed.
2233              
2234             To install:
2235             cpanm Net::HTTP2::nghttp2
2236              
2237             Or disable HTTP/2:
2238             http2 => 0
2239             END_HTTP2_ERROR
2240             }
2241             }
2242              
2243 408         2851 $self->SUPER::_init($params);
2244             }
2245              
2246             sub configure {
2247 409     409 1 5034 my ($self, %params) = @_;
2248              
2249 409 50       1264 if (exists $params{app}) {
2250 0         0 $self->{app} = delete $params{app};
2251             }
2252 409 50       1348 if (exists $params{host}) {
2253 0         0 $self->{host} = delete $params{host};
2254             }
2255 409 50       1128 if (exists $params{port}) {
2256 0         0 $self->{port} = delete $params{port};
2257             }
2258 409 50       1426 if (exists $params{socket}) {
2259 0         0 delete $params{socket};
2260             }
2261 409 50       1027 if (exists $params{socket_mode}) {
2262 0         0 delete $params{socket_mode};
2263             }
2264 409 50       1146 if (exists $params{listen}) {
2265 0         0 delete $params{listen};
2266             }
2267 409 50       1055 if (exists $params{ssl}) {
2268 0         0 $self->{ssl} = delete $params{ssl};
2269             }
2270 409 50       1082 if (exists $params{extensions}) {
2271 0         0 $self->{extensions} = delete $params{extensions};
2272             }
2273 409 50       959 if (exists $params{on_error}) {
2274 0         0 $self->{on_error} = delete $params{on_error};
2275             }
2276 409 50       1138 if (exists $params{access_log}) {
2277 0         0 $self->{access_log} = delete $params{access_log};
2278             }
2279 409 100       1326 if (exists $params{access_log_format}) {
2280 1         4 $self->{access_log_format} = delete $params{access_log_format};
2281             $self->{_access_log_formatter} = $self->_compile_access_log_format(
2282             $self->{access_log_format}
2283 1         4 );
2284             }
2285 409 50       1173 if (exists $params{quiet}) {
2286 0         0 $self->{quiet} = delete $params{quiet};
2287             }
2288 409 50       1117 if (exists $params{log_level}) {
2289 0         0 my $level = delete $params{log_level};
2290 0         0 my %valid_levels = (debug => 1, info => 2, warn => 3, error => 4);
2291             die "Invalid log_level '$level' - must be one of: debug, info, warn, error\n"
2292 0 0       0 unless $valid_levels{$level};
2293 0         0 $self->{log_level} = $level;
2294 0         0 $self->{_log_level_num} = $valid_levels{$level};
2295             }
2296 409 50       1256 if (exists $params{timeout}) {
2297 0         0 $self->{timeout} = delete $params{timeout};
2298             }
2299 409 50       1103 if (exists $params{max_header_size}) {
2300 0         0 $self->{max_header_size} = delete $params{max_header_size};
2301             }
2302 409 50       1031 if (exists $params{max_header_count}) {
2303 0         0 $self->{max_header_count} = delete $params{max_header_count};
2304             }
2305 409 50       1002 if (exists $params{max_body_size}) {
2306 0         0 $self->{max_body_size} = delete $params{max_body_size};
2307             }
2308 409 50       1131 if (exists $params{workers}) {
2309 0         0 $self->{workers} = delete $params{workers};
2310             }
2311 409 50       1567 if (exists $params{max_requests}) {
2312 0         0 $self->{max_requests} = delete $params{max_requests};
2313             }
2314 409 50       1414 if (exists $params{listener_backlog}) {
2315 0         0 $self->{listener_backlog} = delete $params{listener_backlog};
2316             }
2317 409 50       1209 if (exists $params{shutdown_timeout}) {
2318 0         0 $self->{shutdown_timeout} = delete $params{shutdown_timeout};
2319             }
2320 409 50       1431 if (exists $params{max_receive_queue}) {
2321 0         0 $self->{max_receive_queue} = delete $params{max_receive_queue};
2322             }
2323 409 50       1011 if (exists $params{max_ws_frame_size}) {
2324 0         0 $self->{max_ws_frame_size} = delete $params{max_ws_frame_size};
2325             }
2326 409 50       1051 if (exists $params{max_connections}) {
2327 0         0 $self->{max_connections} = delete $params{max_connections};
2328             }
2329 409 50       1077 if (exists $params{request_timeout}) {
2330 0         0 $self->{request_timeout} = delete $params{request_timeout};
2331             }
2332 409 50       968 if (exists $params{ws_idle_timeout}) {
2333 0         0 $self->{ws_idle_timeout} = delete $params{ws_idle_timeout};
2334             }
2335 409 50       915 if (exists $params{sse_idle_timeout}) {
2336 0         0 $self->{sse_idle_timeout} = delete $params{sse_idle_timeout};
2337             }
2338 409 50       999 if (exists $params{http2}) {
2339 0         0 $self->{http2} = delete $params{http2};
2340             }
2341              
2342 409         2061 $self->SUPER::configure(%params);
2343             }
2344              
2345             # Log levels: debug=1, info=2, warn=3, error=4
2346             my %_LOG_LEVELS = (debug => 1, info => 2, warn => 3, error => 4);
2347              
2348             sub _log {
2349 592     592   1603 my ($self, $level, $msg) = @_;
2350              
2351 592   50     1955 my $level_num = $_LOG_LEVELS{$level} // 2;
2352 592 100       1878 return if $level_num < $self->{_log_level_num};
2353 344 100 100     2096 return if $self->{quiet} && $level ne 'error';
2354 5         155 warn "$msg\n";
2355             }
2356              
2357             # Returns a human-readable TLS status string for the startup banner
2358             sub _tls_status_string {
2359 258     258   541 my ($self) = @_;
2360              
2361 258 50       792 if ($self->{disable_tls}) {
2362 0 0       0 return $TLS_AVAILABLE ? 'disabled' : 'n/a (disabled)';
2363             }
2364 258 100       795 if ($self->{tls_enabled}) {
2365 39         156 return 'on';
2366             }
2367 219 50       939 return $TLS_AVAILABLE ? 'available' : 'not installed';
2368             }
2369              
2370             # Returns a human-readable HTTP/2 status string for the startup banner
2371             sub _http2_status_string {
2372 258     258   507 my ($self) = @_;
2373              
2374 258 100       793 if ($self->{http2_enabled}) {
2375 4 50       29 return $self->{h2c_enabled} ? 'on (h2c)' : 'on';
2376             }
2377 254 50       990 return $HTTP2_AVAILABLE ? 'available' : 'not installed';
2378             }
2379              
2380             # Returns a human-readable Future::XS status string for the startup banner
2381             sub _future_xs_status_string {
2382 258 50   258   654 return 'on' if $FUTURE_XS_ENABLED;
2383 258 50       638 return 'available' if $FUTURE_XS_AVAILABLE;
2384 258         774 return 'not installed';
2385             }
2386              
2387             # Check if TLS modules are available
2388             sub _check_tls_available {
2389 49     49   273 my ($self) = @_;
2390              
2391             # Allow forcing TLS off for testing — return false to skip TLS setup
2392 49 50       162 if ($self->{disable_tls}) {
2393 0         0 return 0;
2394             }
2395              
2396 49 50       295 return 1 if $TLS_AVAILABLE;
2397              
2398 0         0 die <<"END_TLS_ERROR";
2399             TLS support requested but required modules not installed.
2400              
2401             To enable HTTPS/TLS support, install:
2402              
2403             cpanm IO::Async::SSL IO::Socket::SSL
2404              
2405             Or on Debian/Ubuntu:
2406              
2407             apt-get install libio-socket-ssl-perl
2408              
2409             Then restart your application.
2410             END_TLS_ERROR
2411             }
2412              
2413             # Build SSL configuration parameters for use by both single-worker and multi-worker modes.
2414             # Returns a hashref of SSL params (including SSL_reuse_ctx) or undef if no SSL configured.
2415             sub _build_ssl_config {
2416 252     252   3082 my ($self) = @_;
2417 252 100       1159 my $ssl = $self->{ssl} or return;
2418              
2419 43 50       266 return unless $self->_check_tls_available;
2420              
2421 43         104 my %ssl_params;
2422 43         249 $ssl_params{SSL_server} = 1;
2423 43 50       292 $ssl_params{SSL_cert_file} = $ssl->{cert_file} if $ssl->{cert_file};
2424 43 50       220 $ssl_params{SSL_key_file} = $ssl->{key_file} if $ssl->{key_file};
2425             # Trailing colon means "this version or higher" — allows TLS 1.3 negotiation
2426 43   100     425 $ssl_params{SSL_version} = ($ssl->{min_version} // 'TLSv1_2') . ':';
2427             $ssl_params{SSL_cipher_list} = $ssl->{cipher_list}
2428 43   100     309 // 'ECDHE+AESGCM:DHE+AESGCM:ECDHE+CHACHA20:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:!EDH-DSS-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA';
2429              
2430 43 100       121 if ($ssl->{verify_client}) {
2431             # SSL_VERIFY_PEER (0x01) | SSL_VERIFY_FAIL_IF_NO_PEER_CERT (0x02)
2432 8         48 $ssl_params{SSL_verify_mode} = 0x03;
2433 8 50       52 $ssl_params{SSL_ca_file} = $ssl->{ca_file} if $ssl->{ca_file};
2434             } else {
2435 35         165 $ssl_params{SSL_verify_mode} = 0x00; # SSL_VERIFY_NONE
2436             }
2437              
2438             # ALPN negotiation for HTTP/2 support
2439 43 50 66     218 if ($self->{http2} && $HTTP2_AVAILABLE) {
2440 4         28 $ssl_params{SSL_alpn_protocols} = ['h2', 'http/1.1'];
2441 4         8 $self->{http2_enabled} = 1;
2442 4         23 $self->{h2c_enabled} = 0; # TLS mode, not cleartext
2443             }
2444              
2445             # Pre-create shared SSL context to avoid per-connection CA bundle parsing
2446 43         950 my $ssl_ctx = IO::Socket::SSL::SSL_Context->new(\%ssl_params);
2447 43         2825266 $self->{_ssl_ctx} = $ssl_ctx;
2448 43         247 $ssl_params{SSL_reuse_ctx} = $ssl_ctx;
2449              
2450             # Mark TLS enabled and auto-add tls extension
2451 43         221 $self->{tls_enabled} = 1;
2452 43 50       460 $self->{extensions}{tls} = {} unless exists $self->{extensions}{tls};
2453              
2454 43         355 return \%ssl_params;
2455             }
2456              
2457             =head2 run
2458              
2459             $server->run;
2460              
2461             Standalone entry point that creates an event loop, starts the server,
2462             and runs until shutdown. This is the simplest way to run a PAGI server:
2463              
2464             my $server = PAGI::Server->new(
2465             app => $app,
2466             port => 8080,
2467             );
2468             $server->run;
2469              
2470             For embedding in an existing IO::Async application, use the traditional
2471             pattern instead:
2472              
2473             my $loop = IO::Async::Loop->new;
2474             $loop->add($server);
2475             $server->listen->get;
2476             $loop->run;
2477              
2478             The C method handles:
2479              
2480             =over 4
2481              
2482             =item * Creating the event loop (respecting C if set)
2483              
2484             =item * Adding the server to the loop
2485              
2486             =item * Starting the listener
2487              
2488             =item * Setting up signal handlers for graceful shutdown
2489              
2490             =item * Running the event loop until shutdown
2491              
2492             =back
2493              
2494             =cut
2495              
2496             sub run {
2497 1     1 1 1 my ($self) = @_;
2498              
2499 1         2 my $loop = $self->_create_loop;
2500 1         4388 $loop->add($self);
2501              
2502             # Start listening with error handling
2503 1         91 eval { $self->listen->get };
  1         2  
2504 0 0       0 if ($@) {
2505 0         0 my $error = $@;
2506 0         0 my $port = $self->{port};
2507 0 0       0 if ($error =~ /Address already in use/i) {
    0          
2508 0         0 die "Error: Port $port is already in use\n";
2509             }
2510             elsif ($error =~ /Permission denied/i) {
2511 0         0 die "Error: Permission denied to bind to port $port\n";
2512             }
2513 0         0 die "Error starting server: $error\n";
2514             }
2515              
2516             # Run the event loop (signal handlers were set up by listen())
2517 0         0 $loop->run;
2518             }
2519              
2520             # Create an event loop, respecting loop_type config
2521             sub _create_loop {
2522 1     1   1 my ($self) = @_;
2523              
2524 1 50       2 if (my $loop_type = $self->{loop_type}) {
2525 0 0       0 die "Invalid loop_type '$loop_type': must contain only letters, digits, and ::\n"
2526             unless $loop_type =~ /\A[A-Za-z][A-Za-z0-9_]*(?:::[A-Za-z][A-Za-z0-9_]*)*\z/;
2527 0         0 my $loop_class = "IO::Async::Loop::$loop_type";
2528 0         0 (my $loop_file = "$loop_class.pm") =~ s{::}{/}g;
2529 0 0       0 eval { require $loop_file }
  0         0  
2530             or die "Cannot load loop backend '$loop_type': $@\n" .
2531             "Install it with: cpanm $loop_class\n";
2532 0         0 return $loop_class->new;
2533             }
2534              
2535 1         7 require IO::Async::Loop;
2536 1         6 return IO::Async::Loop->new;
2537             }
2538              
2539 261     261 1 37918 async sub listen {
2540 261         689 my ($self) = @_;
2541              
2542 261 50       1101 return if $self->{running};
2543              
2544             # Multi-worker mode uses a completely different code path
2545 261 100 66     1347 if ($self->{workers} && $self->{workers} > 0) {
2546 15         617 return $self->_listen_multiworker;
2547             }
2548              
2549 246         926 return await $self->_listen_singleworker;
2550             }
2551              
2552             # Single-worker mode - uses IO::Async normally
2553 246     246   406 async sub _listen_singleworker {
2554 246         449 my ($self) = @_;
2555              
2556 246         442 weaken(my $weak_self = $self);
2557              
2558             # Run lifespan startup before accepting connections
2559 246         1009 my $startup_result = await $self->_run_lifespan_startup;
2560              
2561 245 100       7420 if (!$startup_result->{success}) {
2562 2   50     34 my $message = $startup_result->{message} // 'Lifespan startup failed';
2563 2         62 $self->_log(error => "PAGI Server startup failed: $message");
2564 2         48 die "Lifespan startup failed: $message\n";
2565             }
2566              
2567             # Collect any inherited fds (from PAGI_REUSE or LISTEN_FDS)
2568 243         1214 my $inherited = $self->_collect_inherited_fds;
2569              
2570             # Iterate over listeners array, creating one IO::Async::Listener per spec
2571 243         469 my @listen_entries;
2572 243         346 for my $spec (@{$self->{listeners}}) {
  243         812  
2573              
2574             # Check for inherited fd matching this spec
2575 246 100       1504 my $match_key = $spec->{type} eq 'unix'
2576             ? "unix:$spec->{path}"
2577             : "$spec->{host}:$spec->{port}";
2578              
2579 246 100       784 if (my $inh = delete $inherited->{$match_key}) {
2580             # Reuse inherited fd — skip bind/listen entirely
2581 3         7 $spec->{_inherited} = 1;
2582              
2583 3         5 my $handle = $inh->{handle};
2584 3 100       5 if (!$handle) {
2585 2 100       6 my $class = $inh->{type} eq 'unix'
2586             ? 'IO::Socket::UNIX' : 'IO::Socket::INET';
2587 2 100       9 require IO::Socket::UNIX if $inh->{type} eq 'unix';
2588 2 50       19 $handle = $class->new_from_fd($inh->{fd}, 'r')
2589             or die "Cannot open inherited fd $inh->{fd}: $!\n";
2590             }
2591              
2592 3 100 100     155 if ($inh->{type} eq 'tcp' && $handle->can('sockport')) {
2593 1         4 $spec->{port} = $handle->sockport;
2594 1   33     24 $self->{bound_port} //= $spec->{port};
2595             }
2596              
2597 3         3 my $spec_ref = $spec;
2598 3         6 weaken(my $weak_inner = $self);
2599             my $listener = IO::Async::Listener->new(
2600             handle => $handle,
2601             on_stream => sub {
2602 2     2   8458 my ($l, $stream) = @_;
2603 2 50       6 return unless $weak_inner;
2604 2         10 $weak_inner->_on_connection($stream, $spec_ref);
2605             },
2606 3         37 );
2607 3         596 $self->add_child($listener);
2608              
2609 3         593 $self->_log(info => "Reusing inherited fd $inh->{fd} for $match_key"
2610             . " (source: $inh->{source})");
2611              
2612 3         11 push @listen_entries, { listener => $listener, spec => $spec };
2613 3         11 next; # Skip normal bind/listen
2614             }
2615              
2616 243         349 my $spec_copy = $spec; # capture for closure
2617             my $listener = IO::Async::Listener->new(
2618             on_stream => sub {
2619 253     253   20299499 my ($listener, $stream) = @_;
2620 253 50       1005 return unless $weak_self;
2621 253         1756 $weak_self->_on_connection($stream, $spec_copy);
2622             },
2623 243         3115 );
2624              
2625 243         17738 $self->add_child($listener);
2626              
2627             # Build listener options
2628             my %listen_opts = (
2629             queuesize => $self->{listener_backlog},
2630 243         21900 );
2631              
2632 243 100       815 if ($spec->{type} eq 'unix') {
2633             # Remove stale socket file if it exists
2634 10 100       801 unlink $spec->{path} if -e $spec->{path};
2635              
2636             $listen_opts{addr} = {
2637             family => 'unix',
2638             socktype => 'stream',
2639             path => $spec->{path},
2640 10         64 };
2641              
2642 10 50       24 if ($self->{tls_enabled}) {
2643 0         0 $self->_log(info => "Note: TLS is configured but does not apply to Unix socket $spec->{path}");
2644             }
2645             } else {
2646             # TCP listener
2647             $listen_opts{addr} = {
2648             family => 'inet',
2649             socktype => 'stream',
2650             ip => $spec->{host},
2651             port => $spec->{port},
2652 233         1992 };
2653              
2654             # Add SSL options if configured (TCP only)
2655 233 100       1031 if (my $ssl_params = $self->_build_ssl_config) {
2656 33         177 $listen_opts{extensions} = ['SSL'];
2657 33         833 %listen_opts = (%listen_opts, %$ssl_params);
2658              
2659             $listen_opts{on_ssl_error} = sub {
2660 4 50   4   423772 return unless $weak_self;
2661 4         36 $weak_self->_log(debug => "SSL handshake failed: $_[0]");
2662 33         442 };
2663             }
2664             }
2665              
2666             # Set restrictive umask for Unix socket bind to prevent brief
2667             # permission window (CVE-2023-45145 pattern in Redis)
2668 243         422 my $old_umask;
2669 243 100       833 if ($spec->{type} eq 'unix') {
2670 10         27 $old_umask = umask(0177); # Owner-only until chmod
2671             }
2672              
2673             # Start listening ($^F raised so fd survives exec for hot restart)
2674             {
2675 243         309 local $^F = 1023;
  243         1168  
2676 243         1381 await $listener->listen(%listen_opts);
2677             }
2678              
2679             # Restore umask after bind
2680 243 100       270917 umask($old_umask) if defined $old_umask;
2681              
2682             # Configure accept error handler after listen() to avoid SSL extension conflicts
2683 243         417 eval {
2684             $listener->configure(
2685             on_accept_error => sub {
2686 0     0   0 my ($listener, $error) = @_;
2687 0 0       0 return unless $weak_self;
2688 0         0 $weak_self->_on_accept_error($error);
2689             },
2690 243         1762 );
2691             };
2692 243 50       60617 if ($@) {
2693 243         1325 $self->_log(debug => "Could not configure on_accept_error (likely SSL listener): $@");
2694             }
2695              
2696             # Post-listen setup
2697 243 100       935 if ($spec->{type} eq 'unix') {
2698             # Apply socket permissions if configured
2699 10 100       29 if (defined $spec->{socket_mode}) {
2700 1         33 chmod $spec->{socket_mode}, $spec->{path};
2701             }
2702             } else {
2703             # Store the actual bound port from the listener's read handle
2704 233         917 my $socket = $listener->read_handle;
2705 233 50 33     2764 if ($socket && $socket->can('sockport')) {
2706 233         807 my $bound = $socket->sockport;
2707 233         11946 $spec->{port} = $bound; # update spec with actual port
2708 233   33     1762 $self->{bound_port} //= $bound; # first TCP port wins
2709             }
2710             }
2711              
2712             # Register in PAGI_REUSE for hot restart fd inheritance
2713 243         641 my $rh = $listener->read_handle;
2714 243 50       1188 if ($rh) {
2715 243         563 my $fd = fileno($rh);
2716 243 50       579 if (defined $fd) {
2717 243 100       1269 my $reuse_key = $spec->{type} eq 'unix'
2718             ? "unix:$spec->{path}:$fd"
2719             : "$spec->{host}:$spec->{port}:$fd";
2720 243         558 $spec->{_reuse_key} = $reuse_key;
2721 243 100 100     3203 $ENV{PAGI_REUSE} = length($ENV{PAGI_REUSE} // '')
2722             ? "$ENV{PAGI_REUSE},$reuse_key"
2723             : $reuse_key;
2724             }
2725             }
2726              
2727 243         1993 push @listen_entries, { listener => $listener, spec => $spec };
2728             }
2729              
2730             # Warn about unmatched inherited fds
2731 243         846 for my $key (sort keys %$inherited) {
2732 0         0 my $inh = $inherited->{$key};
2733 0         0 $self->_log(warn => "Inherited fd $inh->{fd} ($key) does not match "
2734             . "any listener spec — closing");
2735 0 0       0 if ($inh->{handle}) {
2736 0         0 close($inh->{handle});
2737             } else {
2738 0         0 POSIX::close($inh->{fd});
2739             }
2740             }
2741              
2742 243         680 $self->{_listen_entries} = \@listen_entries;
2743             # Backward compat: keep $self->{listener} pointing to first entry
2744 243 50       828 $self->{listener} = $listen_entries[0]{listener} if @listen_entries;
2745 243         527 $self->{running} = 1;
2746              
2747             # Set up signal handlers for graceful shutdown (single-worker mode)
2748             # Note: Windows doesn't support Unix signals, so this is skipped there
2749 243         303 unless (WIN32) {
2750 243         401 my $shutdown_triggered = 0;
2751             my $shutdown_handler = sub {
2752 0 0   0   0 return if $shutdown_triggered;
2753 0         0 $shutdown_triggered = 1;
2754             $self->adopt_future(
2755             $self->shutdown->on_done(sub {
2756 0         0 $self->loop->stop;
2757             })->on_fail(sub {
2758 0         0 my ($error) = @_;
2759 0         0 $self->_log(error => "Shutdown error: $error");
2760 0         0 $self->loop->stop; # Still stop even on error
2761             })
2762 0         0 );
2763 243         1258 };
2764 243         964 $self->loop->watch_signal(TERM => $shutdown_handler);
2765 243         44729 $self->loop->watch_signal(INT => $shutdown_handler);
2766              
2767             # HUP in single-worker mode just warns (graceful restart requires multi-worker)
2768 243         8719 my $weak_self = $self;
2769 243         566 weaken($weak_self);
2770             $self->loop->watch_signal(HUP => sub {
2771             $weak_self->_log(warn => "Received HUP signal (graceful restart only works in multi-worker mode)")
2772 0 0 0 0   0 if $weak_self && !$weak_self->{quiet};
2773 243         644 });
2774              
2775             $self->loop->watch_signal(USR2 => sub {
2776             $weak_self->_log(warn => "Received USR2 signal (hot restart only works in multi-worker mode)")
2777 0 0 0 0   0 if $weak_self && !$weak_self->{quiet};
2778 243         8545 });
2779             }
2780              
2781 243         7978 my $loop_class = ref($self->loop);
2782 243         1707 $loop_class =~ s/^IO::Async::Loop:://; # Shorten for display
2783 243         990 my $max_conn = $self->effective_max_connections;
2784 243         931 my $tls_status = $self->_tls_status_string;
2785 243         783 my $http2_status = $self->_http2_status_string;
2786 243         652 my $future_xs_status = $self->_future_xs_status_string;
2787              
2788             # Warn if access_log is a terminal (slow for benchmarks)
2789 243 50 66     2251 if ($self->{access_log} && -t $self->{access_log}) {
2790 0         0 $self->_log(warn =>
2791             "access_log is a terminal; this may impact performance. " .
2792             "Consider redirecting to a file or setting access_log => undef for benchmarks."
2793             );
2794             }
2795              
2796             # Log listening banner
2797 243 100       824 my $scheme = $self->{tls_enabled} ? 'https' : 'http';
2798 243 100       638 if (@listen_entries == 1) {
2799 240         556 my $spec = $listen_entries[0]{spec};
2800 240 100       608 if ($spec->{type} eq 'unix') {
2801 8         39 $self->_log(info => "PAGI Server listening on unix:$spec->{path} (loop: $loop_class, max_conn: $max_conn, http2: $http2_status, tls: $tls_status, future_xs: $future_xs_status)");
2802             } else {
2803 232         1329 $self->_log(info => "PAGI Server listening on $scheme://$spec->{host}:$spec->{port}/ (loop: $loop_class, max_conn: $max_conn, http2: $http2_status, tls: $tls_status, future_xs: $future_xs_status)");
2804             }
2805             } else {
2806 3         3 my @addrs;
2807 3         6 for my $entry (@listen_entries) {
2808 6         9 my $s = $entry->{spec};
2809 6 100       27 if ($s->{type} eq 'unix') {
2810 3         9 push @addrs, "unix:$s->{path}";
2811             } else {
2812 3         12 push @addrs, "$scheme://$s->{host}:$s->{port}/";
2813             }
2814             }
2815 3         18 $self->_log(info => "PAGI Server listening on: " . join(', ', @addrs) . " (loop: $loop_class, max_conn: $max_conn, http2: $http2_status, tls: $tls_status, future_xs: $future_xs_status)");
2816             }
2817              
2818             # Warn in production if using default max_connections
2819 243 50 50     1359 if (($ENV{PAGI_ENV} // '') eq 'production' && !$self->{max_connections}) {
      33        
2820 0         0 $self->_log(warn =>
2821             "Using default max_connections (1000). For production, consider tuning this value " .
2822             "based on your workload. See 'perldoc PAGI::Server' for guidance."
2823             );
2824             }
2825              
2826 243         2513 return $self;
2827             }
2828              
2829             # Multi-worker mode - forks workers, each with their own event loop
2830             sub _listen_multiworker {
2831 15     15   255 my ($self) = @_;
2832              
2833 15         242 my $workers = $self->{workers};
2834 15         144 my $reuseport = $self->{reuseport};
2835              
2836             # Create all listening sockets before forking workers
2837 15         216 my @listen_entries;
2838              
2839             # Collect any inherited fds (from PAGI_REUSE or LISTEN_FDS)
2840 15         1023 my $inherited = $self->_collect_inherited_fds;
2841              
2842 15         135 for my $spec (@{$self->{listeners}}) {
  15         372  
2843 15         285 my $socket;
2844              
2845             # Check for inherited fd matching this spec
2846 15 50       844 my $match_key = $spec->{type} eq 'unix'
2847             ? "unix:$spec->{path}"
2848             : "$spec->{host}:$spec->{port}";
2849              
2850 15 50       832 if (my $inh = delete $inherited->{$match_key}) {
    50          
    50          
2851 0         0 $spec->{_inherited} = 1;
2852              
2853 0 0       0 if ($inh->{handle}) {
2854 0         0 $socket = $inh->{handle};
2855             } else {
2856 0 0       0 my $class = $inh->{type} eq 'unix'
2857             ? 'IO::Socket::UNIX' : 'IO::Socket::INET';
2858 0 0       0 require IO::Socket::UNIX if $inh->{type} eq 'unix';
2859 0 0       0 $socket = $class->new_from_fd($inh->{fd}, 'r')
2860             or die "Cannot open inherited fd $inh->{fd}: $!\n";
2861             }
2862              
2863 0 0 0     0 if ($inh->{type} eq 'tcp' && $socket->can('sockport')) {
2864 0         0 $spec->{bound_port} = $socket->sockport;
2865 0   0     0 $self->{bound_port} //= $spec->{bound_port};
2866             }
2867              
2868 0         0 $self->_log(info => "Reusing inherited fd $inh->{fd} for $match_key"
2869             . " (source: $inh->{source})");
2870             }
2871             elsif ($spec->{type} eq 'unix') {
2872             # Unix socket: parent creates, workers inherit
2873 0 0       0 unlink $spec->{path} if -e $spec->{path};
2874              
2875             # Set restrictive umask for bind (CVE-2023-45145 mitigation)
2876 0         0 my $old_umask = umask(0177);
2877              
2878 0         0 require IO::Socket::UNIX;
2879             {
2880 0         0 local $^F = 1023;
  0         0  
2881             $socket = IO::Socket::UNIX->new(
2882             Local => $spec->{path},
2883             Type => Socket::SOCK_STREAM(),
2884             Listen => $self->{listener_backlog},
2885 0 0       0 ) or die "Cannot create Unix socket $spec->{path}: $!";
2886             }
2887              
2888 0         0 umask($old_umask);
2889              
2890 0 0       0 if (defined $spec->{socket_mode}) {
2891             chmod($spec->{socket_mode}, $spec->{path})
2892 0 0       0 or die "Cannot chmod $spec->{path}: $!\n";
2893             }
2894              
2895             # Register in PAGI_REUSE for hot restart fd inheritance
2896 0 0 0     0 if ($socket && !$spec->{_inherited}) {
2897 0         0 my $fd = fileno($socket);
2898 0         0 my $reuse_key = "unix:$spec->{path}:$fd";
2899 0         0 $spec->{_reuse_key} = $reuse_key;
2900 0 0 0     0 $ENV{PAGI_REUSE} = length($ENV{PAGI_REUSE} // '')
2901             ? "$ENV{PAGI_REUSE},$reuse_key"
2902             : $reuse_key;
2903             }
2904             } elsif ($reuseport) {
2905             # reuseport TCP: probe to get port, workers create their own
2906             # Note: reuseport sockets are not registered in PAGI_REUSE because
2907             # each worker creates its own socket. fd inheritance for reuseport
2908             # mode is not currently supported — use shared-socket mode for
2909             # hot restart / systemd socket activation.
2910             my $probe_socket = IO::Socket::INET->new(
2911             LocalAddr => $spec->{host},
2912             LocalPort => $spec->{port},
2913 0 0       0 Proto => 'tcp',
2914             Listen => 1,
2915             ReuseAddr => 1,
2916             ReusePort => 1,
2917             ) or die "Cannot bind to $spec->{host}:$spec->{port}: $!";
2918 0         0 $spec->{bound_port} = $probe_socket->sockport;
2919 0   0     0 $self->{bound_port} //= $spec->{bound_port};
2920 0         0 close($probe_socket);
2921             } else {
2922             # Shared-socket TCP: parent creates, workers inherit
2923             {
2924 15         102 local $^F = 1023;
  15         704  
2925             $socket = IO::Socket::INET->new(
2926             LocalAddr => $spec->{host},
2927             LocalPort => $spec->{port},
2928             Proto => 'tcp',
2929             Listen => $self->{listener_backlog},
2930 15 50       1528 ReuseAddr => 1,
2931             Blocking => 0,
2932             ) or die "Cannot create listening socket on $spec->{host}:$spec->{port}: $!";
2933             }
2934 15         25283 $spec->{bound_port} = $socket->sockport;
2935 15   33     1810 $self->{bound_port} //= $spec->{bound_port};
2936              
2937             # Register in PAGI_REUSE for hot restart fd inheritance
2938 15 50 33     225 if ($socket && !$spec->{_inherited}) {
2939 15         208 my $fd = fileno($socket);
2940 15         234 my $reuse_key = "$spec->{host}:" . $socket->sockport . ":$fd";
2941 15         618 $spec->{_reuse_key} = $reuse_key;
2942 15 50 100     1740 $ENV{PAGI_REUSE} = length($ENV{PAGI_REUSE} // '')
2943             ? "$ENV{PAGI_REUSE},$reuse_key"
2944             : $reuse_key;
2945             }
2946             }
2947              
2948 15         332 push @listen_entries, { socket => $socket, spec => $spec };
2949             }
2950              
2951             # Warn about unmatched inherited fds
2952 15         480 for my $key (sort keys %$inherited) {
2953 0         0 my $inh = $inherited->{$key};
2954 0         0 $self->_log(warn => "Inherited fd $inh->{fd} ($key) does not match "
2955             . "any listener spec — closing");
2956 0 0       0 if ($inh->{handle}) {
2957 0         0 close($inh->{handle});
2958             } else {
2959 0         0 POSIX::close($inh->{fd});
2960             }
2961             }
2962              
2963 15         221 $self->{_listen_entries} = \@listen_entries;
2964             # Backward compat: keep listen_socket pointing to first entry's socket
2965 15 50 33     501 $self->{listen_socket} = $listen_entries[0]{socket} if @listen_entries && $listen_entries[0]{socket};
2966              
2967 15         97 $self->{running} = 1;
2968              
2969             # Validate TLS modules and set tls_enabled before forking workers
2970 15 100       200 if ($self->{ssl}) {
2971 6         300 $self->_check_tls_available;
2972 6         138 $self->{tls_enabled} = 1;
2973             }
2974              
2975 15 100       528 my $scheme = $self->{ssl} ? 'https' : 'http';
2976 15         317 my $loop_class = ref($self->loop);
2977 15         855 $loop_class =~ s/^IO::Async::Loop:://; # Shorten for display
2978 15 50       303 my $mode = $reuseport ? 'reuseport' : 'shared-socket';
2979 15         409 my $max_conn = $self->effective_max_connections;
2980 15         326 my $tls_status = $self->_tls_status_string;
2981 15         187 my $http2_status = $self->_http2_status_string;
2982 15         73 my $future_xs_status = $self->_future_xs_status_string;
2983              
2984             # Warn if access_log is a terminal (slow for benchmarks)
2985 15 50 33     348 if ($self->{access_log} && -t $self->{access_log}) {
2986 0         0 $self->_log(warn =>
2987             "access_log is a terminal; this may impact performance. " .
2988             "Consider redirecting to a file or setting access_log => undef for benchmarks."
2989             );
2990             }
2991              
2992             # Log listening banner for all listeners
2993 15         125 my @addrs;
2994 15         74 for my $entry (@listen_entries) {
2995 15         123 my $s = $entry->{spec};
2996 15 50       137 if ($s->{type} eq 'unix') {
2997 0         0 push @addrs, "unix:$s->{path}";
2998             } else {
2999 15   33     259 my $port = $s->{bound_port} // $s->{port};
3000 15         114 push @addrs, "$scheme://$s->{host}:$port/";
3001             }
3002             }
3003 15         152 my $addr_str = join(', ', @addrs);
3004 15         470 $self->_log(info => "PAGI Server (multi-worker, $mode) listening on $addr_str with $workers workers (loop: $loop_class, max_conn: $max_conn/worker, http2: $http2_status, tls: $tls_status, future_xs: $future_xs_status)");
3005              
3006             # Warn in production if using default max_connections
3007 15 50 50     331 if (($ENV{PAGI_ENV} // '') eq 'production' && !$self->{max_connections}) {
      33        
3008 0         0 $self->_log(warn =>
3009             "Using default max_connections (1000). For production, consider tuning this value " .
3010             "based on your workload. See 'perldoc PAGI::Server' for guidance."
3011             );
3012             }
3013              
3014 15         124 my $loop = $self->loop;
3015              
3016             # Fork the workers FIRST, before setting up signal handlers.
3017             # This prevents children from inheriting the parent's sigpipe setup,
3018             # which can cause issues with Ctrl-C signal delivery on macOS.
3019 15         218 for my $i (1 .. $workers) {
3020 24         516 $self->_spawn_worker(\@listen_entries, $i);
3021             }
3022              
3023             # Set up signal handlers for parent process AFTER forking
3024             # Note: Windows doesn't support Unix signals, so this is skipped there
3025 6         261 unless (WIN32) {
3026 6     3   715 $loop->watch_signal(TERM => sub { $self->_initiate_multiworker_shutdown });
  3         4391602  
3027 6     1   1825 $loop->watch_signal(INT => sub { $self->_initiate_multiworker_shutdown });
  1         2018580  
3028 6     0   744 $loop->watch_signal(HUP => sub { $self->_graceful_restart });
  0         0  
3029 6     0   741 $loop->watch_signal(TTIN => sub { $self->_increase_workers });
  0         0  
3030 6     0   760 $loop->watch_signal(TTOU => sub { $self->_decrease_workers });
  0         0  
3031 6     0   1272 $loop->watch_signal(USR2 => sub { $self->_hot_restart });
  0         0  
3032             }
3033              
3034             # Start heartbeat monitor if enabled
3035 6 50 33     1155 if ($self->{heartbeat_timeout} && $self->{heartbeat_timeout} > 0) {
3036 6         118 my $hb_timeout = $self->{heartbeat_timeout};
3037 6         64 my $check_interval = $hb_timeout / 2;
3038 6         59 weaken(my $weak_self = $self);
3039              
3040             my $hb_check_timer = IO::Async::Timer::Periodic->new(
3041             interval => $check_interval,
3042             on_tick => sub {
3043 0 0   0   0 return unless $weak_self;
3044 0 0       0 return if $weak_self->{shutting_down};
3045              
3046 0         0 my $now = time();
3047 0         0 for my $pid (keys %{$weak_self->{worker_pids}}) {
  0         0  
3048 0         0 my $info = $weak_self->{worker_pids}{$pid};
3049 0 0       0 next unless $info->{heartbeat_rd};
3050              
3051             # Drain all available heartbeat bytes
3052 0         0 while (sysread($info->{heartbeat_rd}, my $buf, 64)) {
3053 0         0 $info->{last_heartbeat} = $now;
3054             }
3055              
3056             # Kill if heartbeat expired
3057 0 0       0 if ($now - $info->{last_heartbeat} > $hb_timeout) {
3058 0         0 $weak_self->_log(warn =>
3059             "Worker $pid (worker $info->{worker_num}) heartbeat " .
3060             "timeout after ${hb_timeout}s, sending SIGKILL");
3061 0         0 kill 'KILL', $pid;
3062             }
3063             }
3064             },
3065 6         1267 );
3066              
3067 6         3084 $self->add_child($hb_check_timer);
3068 6         2003 $hb_check_timer->start;
3069 6         28611 $self->{_heartbeat_check_timer} = $hb_check_timer;
3070             }
3071              
3072             # Hot restart handoff: if we were spawned by USR2, signal the old master
3073 6 50       337 if (my $old_master_pid = delete $ENV{PAGI_MASTER_PID}) {
3074 0         0 $old_master_pid = int($old_master_pid);
3075              
3076             # Wait for workers to be healthy before retiring old master
3077             # Delay = half the heartbeat timeout + 1 second buffer
3078 0   0     0 my $handoff_delay = ($self->{heartbeat_timeout} || 10) / 2 + 1;
3079 0         0 weaken(my $weak_self_handoff = $self);
3080              
3081             $self->loop->watch_time(
3082             after => $handoff_delay,
3083             code => sub {
3084 0 0   0   0 return unless $weak_self_handoff;
3085              
3086 0         0 my $worker_count = scalar keys %{$weak_self_handoff->{worker_pids}};
  0         0  
3087 0 0       0 if ($worker_count == 0) {
3088 0         0 $weak_self_handoff->_log(error =>
3089             "Hot restart: no workers running, not retiring old master $old_master_pid");
3090 0         0 return;
3091             }
3092              
3093 0 0       0 if (kill(0, $old_master_pid)) {
3094 0         0 $weak_self_handoff->_log(info =>
3095             "Hot restart: $worker_count workers healthy, "
3096             . "sending SIGTERM to old master $old_master_pid");
3097 0         0 kill('TERM', $old_master_pid);
3098             } else {
3099 0         0 $weak_self_handoff->_log(warn =>
3100             "Hot restart: old master $old_master_pid is no longer running");
3101             }
3102             },
3103 0         0 );
3104             }
3105              
3106             # Return immediately - caller (Runner) will call $loop->run()
3107             # This is consistent with single-worker mode behavior
3108 6         1570 return $self;
3109             }
3110              
3111             # Collect inherited file descriptors from PAGI_REUSE and LISTEN_FDS.
3112             # Returns a hashref keyed by "host:port" or "unix:path", values are
3113             # { fd, type, host/port/path, handle?, source }
3114             sub _collect_inherited_fds {
3115 266     266   823 my ($self) = @_;
3116 266         616 my %inherited;
3117              
3118             # Source 1: PAGI_REUSE (format: addr:port:fd,unix:path:fd,...)
3119 266 100       7594 if (my $reuse = $ENV{PAGI_REUSE}) {
3120 7         18 for my $entry (split /,/, $reuse) {
3121 10 100       68 if ($entry =~ /^unix:(.+):(\d+)$/) {
    100          
    100          
3122 3         11 my ($path, $fd) = ($1, int($2));
3123 3         18 $inherited{"unix:$path"} = {
3124             fd => $fd, type => 'unix', path => $path,
3125             source => 'pagi_reuse',
3126             };
3127             } elsif ($entry =~ /^(\[.+?\]):(\d+):(\d+)$/) {
3128 1         5 my ($host, $port, $fd) = ($1, int($2), int($3));
3129 1         9 $inherited{"$host:$port"} = {
3130             fd => $fd, type => 'tcp', host => $host, port => $port,
3131             source => 'pagi_reuse',
3132             };
3133             } elsif ($entry =~ /^(.+):(\d+):(\d+)$/) {
3134 4         22 my ($host, $port, $fd) = ($1, int($2), int($3));
3135 4         25 $inherited{"$host:$port"} = {
3136             fd => $fd, type => 'tcp', host => $host, port => $port,
3137             source => 'pagi_reuse',
3138             };
3139             }
3140             # Malformed entries silently skipped
3141             }
3142             }
3143              
3144             # Source 2: LISTEN_FDS (systemd socket activation)
3145 266         916 my $listen_fds = $ENV{LISTEN_FDS};
3146 266 50 66     1141 if (defined $listen_fds && $listen_fds =~ /^\d+$/ && $listen_fds > 0) {
      66        
3147 3 100 66     61 if (defined $ENV{LISTEN_PID} && $ENV{LISTEN_PID} == $$) {
3148 1         2 my $n = int($listen_fds);
3149 1         3 for my $i (0 .. $n - 1) {
3150 1         1 my $fd = 3 + $i; # SD_LISTEN_FDS_START
3151              
3152 1         1 my $fh;
3153 1 50       21 unless (open($fh, '+<&=', $fd)) {
3154 0         0 $self->_log(warn => "Cannot fdopen inherited fd $fd: $!");
3155 0         0 next;
3156             }
3157              
3158 1         6 my $addr = getsockname($fh);
3159 1 50       9 unless ($addr) {
3160 0         0 $self->_log(warn => "Cannot getsockname on inherited fd $fd: $!");
3161 0         0 next;
3162             }
3163              
3164 1         4 my $family = sockaddr_family($addr);
3165              
3166 1 50       4 if ($family == AF_UNIX) {
    50          
3167 0         0 my $path = unpack_sockaddr_un($addr);
3168 0         0 my $key = "unix:$path";
3169 0   0     0 $inherited{$key} //= {
3170             fd => $fd, type => 'unix', path => $path,
3171             handle => $fh, source => 'systemd',
3172             };
3173             } elsif ($family == AF_INET) {
3174 1         3 my ($port, $host_packed) = unpack_sockaddr_in($addr);
3175 1         4 my $host = Socket::inet_ntoa($host_packed);
3176 1         3 my $key = "$host:$port";
3177 1   50     16 $inherited{$key} //= {
3178             fd => $fd, type => 'tcp', host => $host, port => $port,
3179             handle => $fh, source => 'systemd',
3180             };
3181             } else {
3182 0         0 $self->_log(warn =>
3183             "Inherited fd $fd has unsupported address family $family");
3184             }
3185             }
3186             }
3187              
3188             # Always clean up systemd env vars (per sd_listen_fds spec)
3189 3         27 delete @ENV{qw(LISTEN_FDS LISTEN_PID LISTEN_FDNAMES)};
3190             }
3191              
3192 266         860 return \%inherited;
3193             }
3194              
3195             # Initiate graceful shutdown in multi-worker mode
3196             sub _initiate_multiworker_shutdown {
3197 5     5   2097466 my ($self) = @_;
3198              
3199 5 50       85 return if $self->{shutting_down};
3200 5         83 $self->{shutting_down} = 1;
3201 5         45 $self->{running} = 0;
3202              
3203             # Stop heartbeat monitoring — shutdown escalation timer handles stuck workers
3204 5 50       44 if ($self->{_heartbeat_check_timer}) {
3205 5         91 $self->{_heartbeat_check_timer}->stop;
3206 5         784 $self->remove_child($self->{_heartbeat_check_timer});
3207 5         1107 delete $self->{_heartbeat_check_timer};
3208             }
3209              
3210             # Close all listen sockets to stop accepting new connections
3211             # (skip during hot restart — new master is using these fds)
3212 5 50       75 if (!$self->{_hot_restart_in_progress}) {
3213 5   50     12 for my $entry (@{$self->{_listen_entries} // []}) {
  5         166  
3214 5 50       57 if ($entry->{socket}) {
3215 5         137 close($entry->{socket});
3216             }
3217             }
3218 5 50       33 if ($self->{listen_socket}) {
3219 5         66 delete $self->{listen_socket};
3220             }
3221             }
3222              
3223             # Clean up PAGI_REUSE entries (skip during hot restart)
3224 5 50       41 if (!$self->{_hot_restart_in_progress}) {
3225 5   50     13 for my $entry (@{$self->{_listen_entries} // []}) {
  5         76  
3226 5         87 my $key = $entry->{spec}{_reuse_key};
3227 5 50 33     127 if ($key && defined $ENV{PAGI_REUSE}) {
3228 5         887 $ENV{PAGI_REUSE} =~ s/(?:^|,)\Q$key\E//;
3229 5 50       87 $ENV{PAGI_REUSE} =~ s/^,// if defined $ENV{PAGI_REUSE};
3230             }
3231             }
3232             }
3233              
3234             # Clean up Unix socket files (skip inherited, skip during hot restart)
3235 5 50       54 if (!$self->{_hot_restart_in_progress}) {
3236 5   50     18 for my $entry (@{$self->{_listen_entries} // []}) {
  5         35  
3237 5 0 33     64 if ($entry->{spec}{type} eq 'unix'
      33        
3238             && !$entry->{spec}{_inherited}
3239             && -e $entry->{spec}{path}) {
3240 0         0 unlink $entry->{spec}{path};
3241             }
3242             }
3243             }
3244              
3245             # Signal all workers to shutdown
3246 5         15 for my $pid (keys %{$self->{worker_pids}}) {
  5         57  
3247 9         4256 kill 'TERM', $pid;
3248             }
3249              
3250             # If no workers, stop the loop immediately
3251 5 50       43 if (!keys %{$self->{worker_pids}}) {
  5         48  
3252 0         0 $self->loop->stop;
3253 0         0 return;
3254             }
3255              
3256             # Escalate to SIGKILL after shutdown_timeout for workers that ignore SIGTERM
3257 5   50     54 my $timeout = $self->{shutdown_timeout} // 30;
3258 5         54 weaken(my $weak_self = $self);
3259             $self->{_shutdown_kill_timer} = $self->loop->watch_time(
3260             after => $timeout,
3261             code => sub {
3262 0 0   0   0 return unless $weak_self;
3263 0         0 for my $pid (keys %{$weak_self->{worker_pids}}) {
  0         0  
3264 0         0 $weak_self->_log(warn =>
3265             "Worker $pid did not exit after ${timeout}s, sending SIGKILL");
3266 0         0 kill 'KILL', $pid;
3267             }
3268             },
3269 5         45 );
3270             }
3271              
3272             # Graceful restart: replace all workers one by one
3273             sub _graceful_restart {
3274 0     0   0 my ($self) = @_;
3275              
3276 0 0       0 return if $self->{shutting_down};
3277              
3278 0         0 $self->_log(info => "Received HUP, performing graceful restart");
3279              
3280             # Signal all current workers to shutdown
3281             # watch_process callbacks will respawn them
3282 0   0     0 my $timeout = $self->{shutdown_timeout} // 30;
3283 0         0 for my $pid (keys %{$self->{worker_pids}}) {
  0         0  
3284 0         0 kill 'TERM', $pid;
3285              
3286             # Escalate to SIGKILL if worker doesn't exit within shutdown_timeout
3287 0         0 weaken(my $weak_self = $self);
3288             $self->{_restart_kill_timers}{$pid} = $self->loop->watch_time(
3289             after => $timeout,
3290             code => sub {
3291 0 0   0   0 return unless $weak_self;
3292 0 0       0 if (exists $weak_self->{worker_pids}{$pid}) {
3293 0         0 $weak_self->_log(warn =>
3294             "Worker $pid did not exit after ${timeout}s during restart, sending SIGKILL");
3295 0         0 kill 'KILL', $pid;
3296             }
3297             },
3298 0         0 );
3299             }
3300             }
3301              
3302             # Hot restart: fork+exec a new master that inherits listen sockets via PAGI_REUSE
3303             sub _hot_restart {
3304 0     0   0 my ($self) = @_;
3305              
3306 0 0       0 if ($self->{_hot_restart_in_progress}) {
3307 0         0 $self->_log(warn => "Hot restart already in progress, ignoring USR2");
3308 0         0 return;
3309             }
3310              
3311 0 0       0 if ($self->{shutting_down}) {
3312 0         0 $self->_log(warn => "Server is shutting down, ignoring USR2");
3313 0         0 return;
3314             }
3315              
3316 0         0 $self->{_hot_restart_in_progress} = 1;
3317 0         0 $self->_log(info => "Received USR2, starting hot restart");
3318              
3319             # Store our PID so the new master can signal us when ready
3320 0         0 $ENV{PAGI_MASTER_PID} = $$;
3321              
3322             # Fork and exec a new master process
3323 0         0 my $pid = fork();
3324              
3325 0 0       0 if (!defined $pid) {
3326 0         0 $self->_log(error => "Hot restart fork failed: $!");
3327 0         0 $self->{_hot_restart_in_progress} = 0;
3328 0         0 delete $ENV{PAGI_MASTER_PID};
3329 0         0 return;
3330             }
3331              
3332 0 0       0 if ($pid == 0) {
3333             # Child: exec new master
3334             my @args = defined $ENV{PAGI_ARGV}
3335             ? split(/\0/, $ENV{PAGI_ARGV})
3336 0 0       0 : ();
3337             exec($^X, $0, @args)
3338 0 0       0 or do {
3339 0         0 warn "Hot restart exec failed: $!\n";
3340 0         0 POSIX::_exit(1);
3341             };
3342             }
3343              
3344             # Parent: log and continue running
3345 0         0 $self->_log(info => "Hot restart: new master spawned as PID $pid");
3346             }
3347              
3348             # Increase worker pool by 1
3349             sub _increase_workers {
3350 0     0   0 my ($self) = @_;
3351              
3352 0 0       0 return if $self->{shutting_down};
3353              
3354 0         0 my $current = scalar keys %{$self->{worker_pids}};
  0         0  
3355 0         0 my $new_worker_num = $current + 1;
3356              
3357 0         0 $self->_log(info => "Received TTIN, spawning worker $new_worker_num (total: $new_worker_num)");
3358 0         0 $self->_spawn_worker($self->{_listen_entries}, $new_worker_num);
3359             }
3360              
3361             # Decrease worker pool by 1
3362             sub _decrease_workers {
3363 0     0   0 my ($self) = @_;
3364              
3365 0 0       0 return if $self->{shutting_down};
3366              
3367 0         0 my @pids = keys %{$self->{worker_pids}};
  0         0  
3368 0 0       0 return unless @pids > 1; # Keep at least 1 worker
3369              
3370 0         0 my $victim_pid = $pids[-1]; # Kill most recent
3371 0         0 my $remaining = scalar(@pids) - 1;
3372              
3373 0         0 $self->_log(info => "Received TTOU, killing worker (remaining: $remaining)");
3374              
3375             # Mark as "don't respawn" by setting a flag before killing
3376 0         0 $self->{_dont_respawn}{$victim_pid} = 1;
3377 0         0 kill 'TERM', $victim_pid;
3378             }
3379              
3380             sub _spawn_worker {
3381 24     24   134 my ($self, $listen_entries, $worker_num) = @_;
3382              
3383 24         380 my $loop = $self->loop;
3384 24         182 weaken(my $weak_self = $self);
3385              
3386             # Create heartbeat pipe if enabled
3387 24         60 my ($hb_rd, $hb_wr);
3388 24 50 33     566 if ($self->{heartbeat_timeout} && $self->{heartbeat_timeout} > 0) {
3389 24 50       1467 pipe($hb_rd, $hb_wr) or die "Cannot create heartbeat pipe: $!";
3390             }
3391              
3392             # Set IGNORE before fork - child inherits it. IO::Async only resets
3393             # CODE refs, so 'IGNORE' (a string) survives. Child must NOT call
3394             # watch_signal(INT) or it will overwrite the IGNORE.
3395 24         381 my $old_sigint = $SIG{INT};
3396 24         350 $SIG{INT} = 'IGNORE' unless WIN32;
3397              
3398             my $pid = $loop->fork(
3399             code => sub {
3400 9 50   9   47063 close($hb_rd) if $hb_rd;
3401 9         792 $self->_run_as_worker($listen_entries, $worker_num, $hb_wr);
3402 0         0 return 0;
3403             },
3404 24         1566 );
3405              
3406             # Restore parent's SIGINT handler
3407 15         62676 $SIG{INT} = $old_sigint unless WIN32;
3408              
3409 15 50       706 die "Fork failed" unless defined $pid;
3410              
3411             # Parent — close write end, set read end non-blocking
3412 15 50       514 if ($hb_wr) {
3413 15         878 close($hb_wr);
3414 15         1320 $hb_rd->blocking(0);
3415             }
3416              
3417             # Parent - track the worker
3418 15         1731 $self->{worker_pids}{$pid} = {
3419             worker_num => $worker_num,
3420             started => time(),
3421             heartbeat_rd => $hb_rd,
3422             last_heartbeat => time(),
3423             };
3424              
3425             # Use watch_process to handle worker exit (replaces manual SIGCHLD handling)
3426             $loop->watch_process($pid => sub {
3427 7     7   10716244 my ($exit_pid, $exitcode) = @_;
3428 7 50       74 return unless $weak_self;
3429              
3430             # Close heartbeat pipe read end
3431 7 50       91 if (my $info = $weak_self->{worker_pids}{$exit_pid}) {
3432 7 50       295 close($info->{heartbeat_rd}) if $info->{heartbeat_rd};
3433             }
3434              
3435             # Remove from tracking
3436 7         38 delete $weak_self->{worker_pids}{$exit_pid};
3437              
3438             # Cancel per-worker restart kill timer if one exists
3439 7 50       148 if (my $timer_id = delete $weak_self->{_restart_kill_timers}{$exit_pid}) {
3440 0         0 $loop->unwatch_time($timer_id);
3441             }
3442              
3443             # Check exit code: exit(2) = startup failure, don't respawn
3444 7         79 my $exit_code = $exitcode >> 8;
3445 7 50 33     119 if ($exit_code == 2) {
    50          
3446 0         0 $weak_self->_log(warn => "Worker $worker_num startup failed, not respawning");
3447             # Don't respawn - startup failure would just repeat
3448             }
3449             # Respawn if still running and not shutting down
3450             elsif ($weak_self->{running} && !$weak_self->{shutting_down}) {
3451             # Don't respawn if this was a TTOU reduction
3452 0 0       0 unless (delete $weak_self->{_dont_respawn}{$exit_pid}) {
3453 0         0 $weak_self->_spawn_worker($listen_entries, $worker_num);
3454             }
3455             }
3456              
3457             # Check if all workers have exited (for shutdown)
3458 7 100 66     53 if ($weak_self->{shutting_down} && !keys %{$weak_self->{worker_pids}}) {
  7         165  
3459             # Cancel the shutdown SIGKILL escalation timer
3460 4 50       36 if ($weak_self->{_shutdown_kill_timer}) {
3461 4         59 $loop->unwatch_time($weak_self->{_shutdown_kill_timer});
3462 4         192 delete $weak_self->{_shutdown_kill_timer};
3463             }
3464 4         99 $loop->stop;
3465             }
3466 15         2199 });
3467              
3468 15         36588 return $pid;
3469             }
3470              
3471             sub _run_as_worker {
3472 9     9   417 my ($self, $listen_entries, $worker_num, $heartbeat_wr) = @_;
3473              
3474             # Note: $ONE_TRUE_LOOP already cleared by $loop->fork(), so this creates a fresh loop
3475             # Note: $SIG{INT} = 'IGNORE' inherited from parent - do NOT call watch_signal(INT)
3476             # or it will overwrite the IGNORE with a CODE ref!
3477 9         998 my $loop = IO::Async::Loop->new;
3478              
3479             # In reuseport mode, each worker creates its own TCP listening socket
3480 9         13866 my $reuseport = $self->{reuseport};
3481 9         253 for my $entry (@$listen_entries) {
3482 9         225 my $spec = $entry->{spec};
3483 9 0 33     294 if (!$entry->{socket} && $reuseport && $spec->{type} eq 'tcp') {
      33        
3484             $entry->{socket} = IO::Socket::INET->new(
3485             LocalAddr => $spec->{host},
3486             LocalPort => $spec->{bound_port},
3487             Proto => 'tcp',
3488             Listen => $self->{listener_backlog},
3489 0 0       0 ReuseAddr => 1,
3490             ReusePort => 1,
3491             Blocking => 0,
3492             ) or die "Worker $worker_num: Cannot create listening socket: $!";
3493             }
3494             }
3495              
3496             # Build listener specs for the worker server constructor
3497 9         116 my @listen_specs;
3498 9         186 for my $entry (@$listen_entries) {
3499 9         111 my $s = $entry->{spec};
3500 9 50       263 if ($s->{type} eq 'unix') {
3501 0 0       0 push @listen_specs, { socket => $s->{path}, (defined $s->{socket_mode} ? (socket_mode => $s->{socket_mode}) : ()) };
3502             } else {
3503 9 100       659 push @listen_specs, { host => $s->{host}, port => $s->{port}, ($s->{ssl} ? (ssl => $s->{ssl}) : ()) };
3504             }
3505             }
3506              
3507             # Create a fresh server instance for this worker (single-worker mode)
3508             my $worker_server = PAGI::Server->new(
3509             app => $self->{app},
3510             listen => \@listen_specs,
3511             ssl => $self->{ssl},
3512             http2 => $self->{http2},
3513             extensions => $self->{extensions},
3514             on_error => $self->{on_error},
3515             access_log => $self->{access_log},
3516             log_level => $self->{log_level},
3517             quiet => 1, # Workers should be quiet
3518             timeout => $self->{timeout},
3519             max_header_size => $self->{max_header_size},
3520             max_header_count => $self->{max_header_count},
3521             max_body_size => $self->{max_body_size},
3522             max_requests => $self->{max_requests},
3523             shutdown_timeout => $self->{shutdown_timeout},
3524             request_timeout => $self->{request_timeout},
3525             ws_idle_timeout => $self->{ws_idle_timeout},
3526             sse_idle_timeout => $self->{sse_idle_timeout},
3527             sync_file_threshold => $self->{sync_file_threshold},
3528             max_receive_queue => $self->{max_receive_queue},
3529             max_ws_frame_size => $self->{max_ws_frame_size},
3530             write_high_watermark => $self->{write_high_watermark},
3531             write_low_watermark => $self->{write_low_watermark},
3532 9         2051 workers => 0, # Single-worker mode in worker process
3533             );
3534 9         271 $worker_server->{is_worker} = 1;
3535 9         144 $worker_server->{worker_num} = $worker_num; # Store for lifespan scope
3536 9         204 $worker_server->{_request_count} = 0; # Track requests handled
3537              
3538             # Set bound_port from first TCP listener's socket
3539 9         105 for my $entry (@$listen_entries) {
3540 9 50 33     1799 if ($entry->{spec}{type} eq 'tcp' && $entry->{socket} && $entry->{socket}->can('sockport')) {
      33        
3541 9         698 $worker_server->{bound_port} = $entry->{socket}->sockport;
3542 9         1221 last;
3543             }
3544             }
3545              
3546 9         331 $loop->add($worker_server);
3547              
3548             # Build SSL config for this worker (each worker gets its own SSL context post-fork)
3549 9         1380 my $ssl_params = $worker_server->_build_ssl_config;
3550              
3551             # Set up graceful shutdown on SIGTERM using IO::Async's signal watching
3552             # (raw $SIG handlers don't work reliably when the loop is running)
3553             # Note: Windows doesn't support Unix signals, so this is skipped there
3554             # Note: We do NOT set up watch_signal(INT) here - workers inherit $SIG{INT}='IGNORE'
3555             # from parent, so they ignore SIGINT (including Ctrl-C). Parent sends SIGTERM.
3556 9         75 unless (WIN32) {
3557 9         40 my $shutdown_triggered = 0;
3558             $loop->watch_signal(TERM => sub {
3559 9 50   9   9207935 return if $shutdown_triggered;
3560 9         31 $shutdown_triggered = 1;
3561             $worker_server->adopt_future(
3562             $worker_server->shutdown->on_done(sub {
3563 9         693 $loop->stop;
3564             })->on_fail(sub {
3565 0         0 my ($error) = @_;
3566 0         0 $worker_server->_log(error => "Worker shutdown error: $error");
3567 0         0 $loop->stop; # Still stop even on error
3568             })
3569 9         346 );
3570 9         824 });
3571             }
3572              
3573             # Run lifespan startup using a proper async wrapper
3574 9         15480 my $startup_done = 0;
3575 9         65 my $startup_error;
3576              
3577 9     9   61 my $startup_future = (async sub {
3578 9         28 eval {
3579 9         219 my $startup_result = await $worker_server->_run_lifespan_startup;
3580 9 50       351 if (!$startup_result->{success}) {
3581 0   0     0 $startup_error = $startup_result->{message} // 'Lifespan startup failed';
3582             }
3583             };
3584 9 50       43 if ($@) {
3585 0         0 $startup_error = $@;
3586             }
3587 9         16 $startup_done = 1;
3588 9 50       104 $loop->stop if $startup_error; # Stop loop on error
3589 9         433 })->();
3590              
3591             # Use adopt_future instead of retain
3592 9         309 $worker_server->adopt_future($startup_future);
3593              
3594             # Run the loop briefly to let async startup complete
3595 9         802 $loop->loop_once while !$startup_done;
3596              
3597 9 50       86 if ($startup_error) {
3598 0         0 $self->_log(error => "Worker $worker_num ($$): startup failed: $startup_error");
3599 0         0 for my $entry (@$listen_entries) {
3600 0 0       0 close($entry->{socket}) if $entry->{socket};
3601             }
3602 0         0 exit(2); # Exit code 2 = startup failure (don't respawn)
3603             }
3604              
3605             # Create IO::Async::Listener for each inherited socket
3606 9         122 weaken(my $weak_server = $worker_server);
3607              
3608 9         96 for my $entry (@$listen_entries) {
3609 9 50       82 next unless $entry->{socket};
3610 9         99 my $spec = $entry->{spec};
3611              
3612             # Build SSL config for TCP listeners if needed
3613 9   66     130 my $use_ssl = ($ssl_params && $spec->{type} eq 'tcp');
3614              
3615             my $listener = IO::Async::Listener->new(
3616             handle => $entry->{socket},
3617             on_stream => sub {
3618 5     5   4539167 my ($listener, $stream) = @_;
3619 5 50       38 return unless $weak_server;
3620              
3621 5 100       124 if ($use_ssl) {
3622             $loop->SSL_upgrade(
3623             handle => $stream,
3624             SSL_server => 1,
3625             SSL_reuse_ctx => $worker_server->{_ssl_ctx},
3626             )->on_done(sub {
3627 3 50       289461 $weak_server->_on_connection($stream, $spec) if $weak_server;
3628             })->on_fail(sub {
3629 0         0 my ($failure) = @_;
3630 0 0       0 $weak_server->_log(debug => "SSL handshake failed: $failure")
3631             if $weak_server;
3632 3         172 });
3633             } else {
3634 2         53 $weak_server->_on_connection($stream, $spec);
3635             }
3636             },
3637 9         597 );
3638              
3639 9         3459 $worker_server->add_child($listener);
3640              
3641             # Configure accept error handler - try but ignore if it fails
3642 9         2550 eval {
3643             $listener->configure(
3644             on_accept_error => sub {
3645 0     0   0 my ($listener, $error) = @_;
3646 0 0       0 return unless $weak_server;
3647 0         0 $weak_server->_on_accept_error($error);
3648             },
3649 9         216 );
3650             };
3651             # Silently ignore configuration errors in workers
3652             }
3653              
3654             # Set up heartbeat writer: periodically signal liveness to parent
3655 9 50       7632 if ($heartbeat_wr) {
3656 9   50     171 my $interval = ($self->{heartbeat_timeout} || 50) / 5;
3657             my $hb_timer = IO::Async::Timer::Periodic->new(
3658             interval => $interval,
3659             on_tick => sub {
3660 0     0   0 syswrite($heartbeat_wr, "\x00", 1);
3661             },
3662 9         807 );
3663 9         2169 $worker_server->add_child($hb_timer);
3664 9         1309 $hb_timer->start;
3665             }
3666              
3667 9         33621 $worker_server->{running} = 1;
3668              
3669             # Run the event loop
3670 9         370 $loop->run;
3671              
3672             # Clean up FDs before exit
3673 9 50       2304 close($heartbeat_wr) if $heartbeat_wr;
3674 9         57 for my $entry (@$listen_entries) {
3675 9 50       353 close($entry->{socket}) if $entry->{socket};
3676             }
3677 9         2353 exit(0);
3678             }
3679              
3680             sub _on_connection {
3681 260     260   840 my ($self, $stream, $listener_spec) = @_;
3682              
3683 260         611 weaken(my $weak_self = $self);
3684              
3685             # Check if we're at capacity
3686 260         1208 my $max = $self->effective_max_connections;
3687 260 100       855 if ($self->connection_count >= $max) {
3688             # Over capacity - send 503 and close
3689 2         6 $self->_send_503_and_close($stream);
3690 2         10 return;
3691             }
3692              
3693             # Detect ALPN-negotiated protocol from TLS handle
3694 258         462 my $alpn_protocol;
3695 258 100 100     1140 if ($self->{tls_enabled} && $self->{http2_enabled}) {
3696 2   33     12 my $handle = $stream->write_handle // $stream->read_handle;
3697 2 50 33     49 if ($handle && $handle->can('alpn_selected')) {
3698 2         3 $alpn_protocol = eval { $handle->alpn_selected };
  2         15  
3699             }
3700             }
3701              
3702             my $conn = PAGI::Server::Connection->new(
3703             stream => $stream,
3704             app => $self->{app},
3705             protocol => $self->{protocol},
3706             server => $self,
3707             extensions => $self->{extensions},
3708             state => $self->{state},
3709             tls_enabled => $self->{tls_enabled} // 0,
3710             timeout => $self->{timeout},
3711             request_timeout => $self->{request_timeout},
3712             ws_idle_timeout => $self->{ws_idle_timeout},
3713             sse_idle_timeout => $self->{sse_idle_timeout},
3714             max_body_size => $self->{max_body_size},
3715             access_log => $self->{access_log},
3716             _access_log_formatter => $self->{_access_log_formatter},
3717             max_receive_queue => $self->{max_receive_queue},
3718             max_ws_frame_size => $self->{max_ws_frame_size},
3719             sync_file_threshold => $self->{sync_file_threshold},
3720             validate_events => $self->{validate_events},
3721             write_high_watermark => $self->{write_high_watermark},
3722             write_low_watermark => $self->{write_low_watermark},
3723             transport_type => ($listener_spec && $listener_spec->{type}) // 'tcp',
3724             transport_path => ($listener_spec ? $listener_spec->{path} : undef),
3725             ($self->{http2_enabled} ? (
3726             h2_protocol => $self->{http2_protocol},
3727             alpn_protocol => $alpn_protocol,
3728 258 50 100     9027 h2c_enabled => $self->{h2c_enabled} // 0,
    100 33        
      50        
      50        
3729             ) : ()),
3730             );
3731              
3732             # Track the connection (O(1) hash insert)
3733 258         1376 $self->{connections}{refaddr($conn)} = $conn;
3734              
3735             # Configure stream with callbacks BEFORE adding to loop
3736 258         1367 $conn->start;
3737              
3738             # Add stream to the loop so it can read/write
3739 258         17715 $self->add_child($stream);
3740             }
3741              
3742             sub _send_503_and_close {
3743 2     2   4 my ($self, $stream) = @_;
3744              
3745 2         3 my $body = "503 Service Unavailable - Server at capacity\r\n";
3746 2         9 my $response = join("\r\n",
3747             "HTTP/1.1 503 Service Unavailable",
3748             "Content-Type: text/plain",
3749             "Content-Length: " . length($body),
3750             "Connection: close",
3751             "Retry-After: 5",
3752             "",
3753             $body
3754             );
3755              
3756             # Configure stream with minimal on_read handler (required by IO::Async)
3757             $stream->configure(
3758 1     1   460 on_read => sub { 0 }, # Ignore any incoming data
3759 2         8 );
3760              
3761             # Add stream to loop so it can write
3762 2         102 $self->add_child($stream);
3763              
3764             # Write response and close
3765 2         338 $stream->write($response);
3766 2         416 $stream->close_when_empty;
3767              
3768 2         20 $self->_log(warn => "Connection rejected: at capacity (" . $self->connection_count . "/" . $self->effective_max_connections . ")");
3769             }
3770              
3771             sub _on_accept_error {
3772 0     0   0 my ($self, $error) = @_;
3773              
3774             # EMFILE = "Too many open files" - we're out of file descriptors
3775             # ENFILE = System-wide FD limit reached
3776 0 0       0 if ($error =~ /Too many open files|EMFILE|ENFILE/i) {
3777             # Only log the first EMFILE in a burst (when we're not already paused)
3778 0 0       0 unless ($self->{_accept_paused}) {
3779 0         0 $self->_log(warn => "Accept error (FD exhaustion): $error - pausing accept for 100ms");
3780             }
3781              
3782             # Pause accepting for a short time to let connections drain
3783 0         0 $self->_pause_accepting(0.1);
3784             }
3785             else {
3786             # Log other accept errors but don't crash
3787 0         0 $self->_log(error => "Accept error: $error");
3788             }
3789             }
3790              
3791             sub _pause_accepting {
3792 0     0   0 my ($self, $duration) = @_;
3793              
3794 0 0       0 return if $self->{_accept_paused};
3795 0         0 $self->{_accept_paused} = 1;
3796              
3797             # Cancel any existing timer before creating new one
3798 0 0       0 if ($self->{_accept_pause_timer}) {
3799 0         0 $self->loop->unwatch_time($self->{_accept_pause_timer});
3800 0         0 delete $self->{_accept_pause_timer};
3801             }
3802              
3803             # Temporarily disable all listeners
3804 0   0     0 for my $entry (@{$self->{_listen_entries} // []}) {
  0         0  
3805 0         0 my $listener = $entry->{listener};
3806 0 0 0     0 if ($listener && $listener->read_handle) {
3807 0         0 $listener->want_readready(0);
3808             }
3809             }
3810             # Backward compat: also pause $self->{listener} if not in entries
3811 0 0 0     0 if ($self->{listener} && !$self->{_listen_entries}) {
3812 0 0       0 $self->{listener}->want_readready(0) if $self->{listener}->read_handle;
3813             }
3814              
3815             # Re-enable after duration
3816 0         0 weaken(my $weak_self = $self);
3817             my $timer_id = $self->loop->watch_time(after => $duration, code => sub {
3818 0 0 0 0   0 return unless $weak_self && $weak_self->{running};
3819 0         0 $weak_self->{_accept_paused} = 0;
3820 0         0 delete $weak_self->{_accept_pause_timer};
3821              
3822             # Resume all listeners
3823 0   0     0 for my $entry (@{$weak_self->{_listen_entries} // []}) {
  0         0  
3824 0         0 my $listener = $entry->{listener};
3825 0 0 0     0 if ($listener && $listener->read_handle) {
3826 0         0 $listener->want_readready(1);
3827             }
3828             }
3829             # Backward compat
3830 0 0 0     0 if ($weak_self->{listener} && !$weak_self->{_listen_entries}) {
3831 0 0       0 $weak_self->{listener}->want_readready(1) if $weak_self->{listener}->read_handle;
3832             }
3833              
3834 0         0 $weak_self->_log(debug => "Accept resumed after FD exhaustion pause");
3835 0         0 });
3836              
3837 0         0 $self->{_accept_pause_timer} = $timer_id;
3838             }
3839              
3840             # Called when a request completes (for max_requests tracking)
3841             sub _on_request_complete {
3842 304     304   1023 my ($self) = @_;
3843              
3844 304 100       1233 return unless $self->{is_worker};
3845 7 100 66     62 return unless $self->{max_requests} && $self->{max_requests} > 0;
3846              
3847 2         3 $self->{_request_count}++;
3848              
3849 2 50       5 if ($self->{_request_count} >= $self->{max_requests}) {
3850 0 0       0 return if $self->{_max_requests_shutdown_triggered}; # Prevent duplicate shutdowns
3851 0         0 $self->{_max_requests_shutdown_triggered} = 1;
3852 0         0 $self->_log(info => "Worker $$: reached max_requests ($self->{max_requests}), shutting down");
3853             # Initiate graceful shutdown (finish current connections, then exit)
3854             $self->adopt_future(
3855             $self->shutdown->on_done(sub {
3856 0     0   0 $self->loop->stop;
3857             })->on_fail(sub {
3858 0     0   0 my ($error) = @_;
3859 0         0 $self->_log(error => "Worker $$: max_requests shutdown error: $error");
3860 0         0 $self->loop->stop; # Still stop even on error
3861             })
3862 0         0 );
3863             }
3864             }
3865              
3866             # Lifespan Protocol Implementation
3867              
3868 255     255   419 async sub _run_lifespan_startup {
3869 255         612 my ($self) = @_;
3870              
3871             # Create lifespan scope
3872             my $scope = {
3873             type => 'lifespan',
3874             pagi => {
3875             version => '0.2',
3876             spec_version => '0.2',
3877             is_worker => $self->{is_worker} // 0,
3878             worker_num => $self->{worker_num}, # undef for single-worker, 1-N for multi-worker
3879             },
3880             state => $self->{state}, # App can populate this
3881 255   50     2800 };
3882              
3883             # Create receive/send for lifespan protocol
3884 255         582 my @send_queue;
3885             my $receive_pending;
3886 255         2376 my $startup_complete = Future->new;
3887 255         2235 my $lifespan_supported = 1; # Track if app supports lifespan
3888              
3889             # $receive for the app - returns events from the server
3890             my $receive = sub {
3891 347 100   347   6991 if (@send_queue) {
3892 175         1238 return Future->done(shift @send_queue);
3893             }
3894 172         479 $receive_pending = Future->new;
3895 172         977 return $receive_pending;
3896 255         1184 };
3897              
3898             # $send for the app - handles app responses
3899 343     343   25705 my $send = async sub {
3900 343         642 my ($event) = @_;
3901 343   50     994 my $type = $event->{type} // '';
3902              
3903 343 100       1351 if ($type eq 'lifespan.startup.complete') {
    100          
    100          
    50          
3904 172         815 $startup_complete->done({ success => 1 });
3905             }
3906             elsif ($type eq 'lifespan.startup.failed') {
3907 2   50     67 my $message = $event->{message} // '';
3908 2         66 $startup_complete->done({ success => 0, message => $message });
3909             }
3910             elsif ($type eq 'lifespan.shutdown.complete') {
3911             # Store for shutdown handling
3912 167         687 $self->{shutdown_complete} = 1;
3913 167 50       520 if ($self->{shutdown_pending}) {
3914 167         877 $self->{shutdown_pending}->done({ success => 1 });
3915             }
3916             }
3917             elsif ($type eq 'lifespan.shutdown.failed') {
3918 0   0     0 my $message = $event->{message} // '';
3919 0         0 $self->{shutdown_complete} = 1;
3920 0 0       0 if ($self->{shutdown_pending}) {
3921 0         0 $self->{shutdown_pending}->done({ success => 0, message => $message });
3922             }
3923             }
3924              
3925 343         12541 return;
3926 255         1485 };
3927              
3928             # Queue the startup event
3929 255         1045 push @send_queue, { type => 'lifespan.startup' };
3930 255 50 33     899 if ($receive_pending && !$receive_pending->is_ready) {
3931 0         0 my $f = $receive_pending;
3932 0         0 $receive_pending = undef;
3933 0         0 $f->done(shift @send_queue);
3934             }
3935              
3936             # Store lifespan handlers for shutdown
3937 255         997 $self->{lifespan_receive} = $receive;
3938 255         635 $self->{lifespan_send} = $send;
3939 255         688 $self->{lifespan_send_queue} = \@send_queue;
3940 255         691 $self->{lifespan_receive_pending} = \$receive_pending;
3941              
3942             # Start the lifespan app handler
3943             # We run it in the background and wait for startup.complete
3944 255     255   367 my $app_future = (async sub {
3945 255         388 eval {
3946 255         1026 await $self->{app}->($scope, $receive, $send);
3947             };
3948             # Per spec: if the app throws an exception for lifespan scope,
3949             # the server should continue without lifespan support.
3950             # This matches Uvicorn/Hypercorn "auto" mode behavior.
3951             # Apps that don't support lifespan should: die if $scope->{type} ne 'websocket';
3952 250         21331 $lifespan_supported = 0;
3953 250 100       650 if (!$startup_complete->is_ready) {
3954 80         647 $self->_log(info => "Lifespan not supported, continuing without it");
3955 80         442 $startup_complete->done({ success => 1, lifespan_supported => 0 });
3956             }
3957 255         1296 })->();
3958              
3959             # Keep the app future so we can trigger shutdown later
3960 254         21552 $self->{lifespan_app_future} = $app_future;
3961             # Use adopt_future instead of retain for proper error handling
3962 254         1368 $self->adopt_future($app_future);
3963              
3964             # Wait for startup complete (with timeout)
3965 254         14320 my $result = await $startup_complete;
3966              
3967             # Track if lifespan is supported
3968 254   100     4503 $self->{lifespan_supported} = $result->{lifespan_supported} // 1;
3969              
3970 254         1277 return $result;
3971             }
3972              
3973 248     248   485 async sub _run_lifespan_shutdown {
3974 248         563 my ($self) = @_;
3975              
3976             # If lifespan is not supported or no lifespan was started, just return success
3977 248 100       1169 return { success => 1 } unless $self->{lifespan_supported};
3978 168 50       659 return { success => 1 } unless $self->{lifespan_send_queue};
3979              
3980 168         486 $self->{shutdown_pending} = $self->loop->new_future;
3981              
3982             # Queue the shutdown event
3983 168         17857 my $send_queue = $self->{lifespan_send_queue};
3984 168         408 my $receive_pending_ref = $self->{lifespan_receive_pending};
3985              
3986 168         642 push @$send_queue, { type => 'lifespan.shutdown' };
3987              
3988             # Trigger pending receive if waiting
3989 168 50 33     986 if ($$receive_pending_ref && !$$receive_pending_ref->is_ready) {
3990 168         1342 my $f = $$receive_pending_ref;
3991 168         303 $$receive_pending_ref = undef;
3992 168         505 $f->done(shift @$send_queue);
3993             }
3994              
3995             # Wait for shutdown complete (with timeout to prevent hanging)
3996 168   50     16714 my $timeout = $self->{shutdown_timeout} // 30;
3997 168         786 my $timeout_f = $self->loop->delay_future(after => $timeout);
3998              
3999 168         24449 my $result = await Future->wait_any($self->{shutdown_pending}, $timeout_f);
4000              
4001             # If timeout won, return failure
4002 168 50 33     2040601 if ($timeout_f->is_ready && !$self->{shutdown_pending}->is_ready) {
4003 0         0 return { success => 0, message => "Lifespan shutdown timed out after ${timeout}s" };
4004             }
4005              
4006 168   100     2878 return $result // { success => 1 };
4007             }
4008              
4009 249     249 1 12458405 async sub shutdown {
4010 249         762 my ($self) = @_;
4011              
4012 249 50       1224 return unless $self->{running};
4013 249         657 $self->{running} = 0;
4014 249         767 $self->{shutting_down} = 1;
4015              
4016             # Cancel accept pause timer if active
4017 249 50       997 if ($self->{_accept_pause_timer}) {
4018 0         0 $self->loop->unwatch_time($self->{_accept_pause_timer});
4019 0         0 delete $self->{_accept_pause_timer};
4020 0         0 $self->{_accept_paused} = 0;
4021             }
4022              
4023             # Stop accepting new connections on all listeners
4024 249   100     458 for my $entry (@{$self->{_listen_entries} // []}) {
  249         1410  
4025 241         621 eval { $self->remove_child($entry->{listener}) };
  241         1450  
4026             }
4027 249         39739 $self->{listener} = undef;
4028              
4029             # Clean up PAGI_REUSE entries for sockets we created (not inherited)
4030 249   100     442 for my $entry (@{$self->{_listen_entries} // []}) {
  249         1268  
4031 241         792 my $key = $entry->{spec}{_reuse_key};
4032 241 50 66     2267 if ($key && !$self->{_hot_restart_in_progress} && defined $ENV{PAGI_REUSE}) {
      66        
4033 238         11280 $ENV{PAGI_REUSE} =~ s/(?:^|,)\Q$key\E//;
4034 238 50       1346 $ENV{PAGI_REUSE} =~ s/^,// if defined $ENV{PAGI_REUSE};
4035             }
4036             }
4037              
4038             # Clean up Unix socket files (only those we created, not inherited)
4039 249   100     456 for my $entry (@{$self->{_listen_entries} // []}) {
  249         960  
4040 241 100 100     1155 if ($entry->{spec}{type} eq 'unix'
      66        
4041             && !$entry->{spec}{_inherited}
4042             && -e $entry->{spec}{path}) {
4043 7         357 unlink $entry->{spec}{path};
4044             }
4045             }
4046 249         13400 $self->{_listen_entries} = [];
4047              
4048             # Wait for active connections to drain (graceful shutdown)
4049 249         1887 await $self->_drain_connections;
4050              
4051             # Run lifespan shutdown
4052 248         11879 my $shutdown_result = await $self->_run_lifespan_shutdown;
4053              
4054 248 50       7542 if (!$shutdown_result->{success}) {
4055 0   0     0 my $message = $shutdown_result->{message} // 'Lifespan shutdown failed';
4056 0         0 $self->_log(warn => "PAGI Server shutdown warning: $message");
4057             }
4058              
4059 248         892 return $self;
4060             }
4061              
4062             # Wait for active connections to complete, with timeout
4063             # Uses event-driven approach: Connection._close() signals when last one closes
4064 249     249   472 async sub _drain_connections {
4065 249         644 my ($self) = @_;
4066              
4067 249   50     1100 my $timeout = $self->{shutdown_timeout} // 30;
4068 249         837 my $loop = $self->loop;
4069              
4070             # First, close all idle connections immediately (not processing a request)
4071             # Keep-alive connections waiting for next request should be closed
4072 249         1237 my @idle = grep { !$_->{handling_request} } values %{$self->{connections}};
  192         827  
  249         1035  
4073 249         694 for my $conn (@idle) {
4074 182         3030 $conn->_handle_disconnect_and_close('server_shutdown');
4075             }
4076              
4077             # Also close long-lived connections (SSE, WebSocket) immediately
4078             # These never become "idle" so would wait for full timeout otherwise
4079 249 50       19904 my @longlived = grep { $_->{sse_mode} || $_->{websocket_mode} } values %{$self->{connections}};
  10         63  
  249         820  
4080 249         729 for my $conn (@longlived) {
4081 9         46 $conn->_handle_disconnect_and_close('server_shutdown');
4082             }
4083              
4084             # If all connections are now closed, we're done
4085 249 100       408 return if keys %{$self->{connections}} == 0;
  249         11141  
4086              
4087             # Create a Future that Connection._close() will resolve when last one closes
4088 1         6 $self->{drain_complete} = $loop->new_future;
4089              
4090             # Wait for either: all connections close OR timeout
4091 1         57 my $timeout_f = $loop->delay_future(after => $timeout);
4092              
4093 1         170 await Future->wait_any($self->{drain_complete}, $timeout_f);
4094              
4095             # Brief pause to let any final socket writes flush
4096             # (stream->write is async; data may still be in kernel buffer)
4097 1 50       445 await $loop->delay_future(after => 0.05) if keys %{$self->{connections}} == 0;
  1         13  
4098              
4099             # If timeout won (connections still remain), force close them
4100 0 0       0 if (keys %{$self->{connections}} > 0) {
  0         0  
4101 0         0 my $remaining = scalar keys %{$self->{connections}};
  0         0  
4102 0         0 $self->_log(warn => "Shutdown timeout: force-closing $remaining active connections");
4103              
4104 0         0 for my $conn (values %{$self->{connections}}) {
  0         0  
4105 0 0 0     0 $conn->_close if $conn && $conn->can('_close');
4106             }
4107             }
4108              
4109 0         0 delete $self->{drain_complete};
4110 0         0 return;
4111             }
4112              
4113             # --- Access log format compiler ---
4114              
4115             my %ACCESS_LOG_PRESETS = (
4116             clf => '%h - - [%t] "%m %U%q" %s %ds',
4117             combined => '%h - - [%t] "%r" %s %b "%{Referer}i" "%{User-Agent}i"',
4118             common => '%h - - [%t] "%r" %s %b',
4119             tiny => '%m %U%q %s %Dms',
4120             );
4121              
4122             sub _compile_access_log_format {
4123 445     445   14437 my ($class_or_self, $format) = @_;
4124              
4125             # Resolve preset names
4126 445 100       2051 if (exists $ACCESS_LOG_PRESETS{$format}) {
4127 411         1338 $format = $ACCESS_LOG_PRESETS{$format};
4128             }
4129              
4130             # Parse format string into a list of fragments (closures or literal strings)
4131 445         1007 my @fragments;
4132 445         772 my $pos = 0;
4133 445         1044 my $len = length($format);
4134              
4135 445         1550 while ($pos < $len) {
4136 5383         7453 my $ch = substr($format, $pos, 1);
4137              
4138 5383 100       9595 if ($ch eq '%') {
4139 2910         3044 $pos++;
4140 2910 50       4546 last if $pos >= $len;
4141              
4142 2910         4707 my $next = substr($format, $pos, 1);
4143              
4144 2910 100       5665 if ($next eq '%') {
    100          
4145             # Literal percent
4146 1         2 push @fragments, '%';
4147 1         2 $pos++;
4148             }
4149             elsif ($next eq '{') {
4150             # Header extraction: %{Name}i
4151 13         20 my $end = index($format, '}', $pos);
4152 13 50       22 die "Unterminated %{...} in access log format\n" if $end < 0;
4153 13         21 my $header_name = substr($format, $pos + 1, $end - $pos - 1);
4154 13         16 $pos = $end + 1;
4155              
4156             # Must be followed by 'i' (request header)
4157 13 50 33     43 die "Expected 'i' after %{$header_name} in access log format\n"
4158             if $pos >= $len || substr($format, $pos, 1) ne 'i';
4159 13         34 $pos++;
4160              
4161 13         23 my $lc_name = lc($header_name);
4162             push @fragments, sub {
4163 11     11   14 my ($info) = @_;
4164 11         15 for my $h (@{$info->{request_headers}}) {
  11         18  
4165 27 100       91 return $h->[1] if lc($h->[0]) eq $lc_name;
4166             }
4167 2         12 return '-';
4168 13         55 };
4169             }
4170             else {
4171             # Simple atom
4172 2896         3742 my $atom = $next;
4173 2896         2918 $pos++;
4174              
4175 2896         5515 my $frag = _access_log_atom($atom);
4176 2895         6368 push @fragments, $frag;
4177             }
4178             }
4179             else {
4180             # Literal text: collect until next %
4181 2473         3928 my $next_pct = index($format, '%', $pos);
4182 2473 100       3628 if ($next_pct < 0) {
4183 411         1413 push @fragments, substr($format, $pos);
4184 411         997 $pos = $len;
4185             }
4186             else {
4187 2062         4658 push @fragments, substr($format, $pos, $next_pct - $pos);
4188 2062         3362 $pos = $next_pct;
4189             }
4190             }
4191             }
4192              
4193             # Build a single closure from fragments
4194             return sub {
4195 296     296   844 my ($info) = @_;
4196 296 100       695 return join('', map { ref $_ ? $_->($info) : $_ } @fragments);
  3451         24315  
4197 444         3850 };
4198             }
4199              
4200             sub _access_log_atom {
4201 2896     2896   4316 my ($atom) = @_;
4202              
4203             my %atoms = (
4204 264   50 264   1149 h => sub { $_[0]->{client_ip} // '-' },
4205 1     1   8 l => sub { '-' },
4206 1     1   8 u => sub { '-' },
4207 262   50 262   730 t => sub { $_[0]->{timestamp} // '-' },
4208             r => sub {
4209 5     5   7 my $i = $_[0];
4210 5   50     14 my $uri = $i->{path} // '/';
4211 5         10 my $qs = $i->{query};
4212 5 100 66     20 $uri .= "?$qs" if defined $qs && length $qs;
4213 5   50     38 sprintf('%s %s HTTP/%s', $i->{method} // '-', $uri, $i->{http_version} // '1.1');
      50        
4214             },
4215 259   50 259   754 m => sub { $_[0]->{method} // '-' },
4216 259   50 259   773 U => sub { $_[0]->{path} // '/' },
4217             q => sub {
4218 261     261   494 my $qs = $_[0]->{query};
4219 261 100 100     1421 (defined $qs && length $qs) ? "?$qs" : '';
4220             },
4221 1   50 1   11 H => sub { 'HTTP/' . ($_[0]->{http_version} // '1.1') },
4222 269   50 269   722 s => sub { $_[0]->{status} // '-' },
4223             b => sub {
4224 10   50 10   16 my $size = $_[0]->{size} // 0;
4225 10 100       77 $size ? $size : '-';
4226             },
4227 2   50 2   21 B => sub { $_[0]->{size} // 0 },
4228 258   50 258   2966 d => sub { sprintf('%.3f', $_[0]->{duration} // 0) },
4229 4   50 4   22 D => sub { int(($_[0]->{duration} // 0) * 1_000_000) },
4230 2   50 2   21 T => sub { int($_[0]->{duration} // 0) },
4231 2896         53020 );
4232              
4233 2896 100       6341 if (my $frag = $atoms{$atom}) {
4234 2895         30683 return $frag;
4235             }
4236              
4237 1         30 die "Unknown access log format atom '%$atom'\n";
4238             }
4239              
4240             sub port {
4241 235     235 1 522201 my ($self) = @_;
4242              
4243 235   66     1215 return $self->{bound_port} // $self->{port};
4244             }
4245              
4246             sub socket_path {
4247 5     5 1 304 my ($self) = @_;
4248 5   50     8 for my $listener (@{$self->{listeners} // []}) {
  5         19  
4249 5 100       28 return $listener->{path} if $listener->{type} eq 'unix';
4250             }
4251 2         8 return undef;
4252             }
4253              
4254             sub listeners {
4255 6     6 1 494 my ($self) = @_;
4256 6   50     34 return $self->{listeners} // [];
4257             }
4258              
4259             sub is_running {
4260 18     18 1 108757 my ($self) = @_;
4261              
4262 18 100       158 return $self->{running} ? 1 : 0;
4263             }
4264              
4265             sub connection_count {
4266 269     269 1 200514 my ($self) = @_;
4267              
4268 269         380 return scalar keys %{$self->{connections}};
  269         1309  
4269             }
4270              
4271             sub effective_max_connections {
4272 522     522 1 7780 my ($self) = @_;
4273              
4274             # If explicitly set, use that; otherwise default to 1000
4275             # (Same default as Mojolicious - simple, predictable, no platform-specific hacks)
4276             return $self->{max_connections} && $self->{max_connections} > 0
4277             ? $self->{max_connections}
4278 522 100 66     2452 : 1000;
4279             }
4280              
4281             1;
4282              
4283             __END__