File Coverage

blib/lib/PAGI/Context.pm
Criterion Covered Total %
statement 196 196 100.0
branch 59 62 95.1
condition 56 73 76.7
subroutine 55 55 100.0
pod 17 43 39.5
total 383 429 89.2


line stmt bran cond sub pod time code
1             package PAGI::Context;
2             $PAGI::Context::VERSION = '0.002000';
3 27     27   2343631 use strict;
  27         43  
  27         945  
4 27     27   137 use warnings;
  27         46  
  27         1239  
5 27     27   104 use Carp qw(croak);
  27         32  
  27         1284  
6 27     27   128 use Scalar::Util qw(blessed);
  27         42  
  27         863  
7 27     27   2054 use Future::AsyncAwait;
  27         72075  
  27         137  
8 27     27   1173 use Future;
  27         40  
  27         85973  
9              
10             =encoding UTF-8
11              
12             =head1 NAME
13              
14             PAGI::Context - Per-request context with protocol-specific subclasses
15              
16             =head1 SYNOPSIS
17              
18             use PAGI::Context;
19             use Future::AsyncAwait;
20              
21             # Factory returns the right subclass based on scope type
22             my $ctx = PAGI::Context->new($scope, $receive, $send);
23              
24             # Shared methods (all protocol types)
25             my $type = $ctx->type; # 'http', 'websocket', 'sse'
26             my $path = $ctx->path;
27             my $stash = $ctx->stash; # PAGI::Stash
28             my $session = $ctx->session; # PAGI::Session
29              
30             # WebSocket context - protocol ops directly on $ctx
31             await $ctx->accept;
32             await $ctx->send_json({ msg => 'hello' });
33             my $text = await $ctx->receive_text;
34             await $ctx->close;
35              
36             # SSE context - same idea
37             await $ctx->send_event(event => 'update', data => $payload);
38             await $ctx->keepalive(25);
39              
40             # Event dispatcher - works on any protocol type
41             my $reason = await $ctx
42             ->on('websocket.receive', async sub { ... })
43             ->on('chat.message', async sub { ... })
44             ->on_error(sub { ... })
45             ->run; # returns 'disconnect', 'stop', or 'error'
46              
47             # Underlying protocol objects still available
48             my $ws = $ctx->websocket; # PAGI::WebSocket (WS only)
49             my $sse = $ctx->sse; # PAGI::SSE (SSE only)
50             my $req = $ctx->request; # PAGI::Request (HTTP only)
51             my $res = $ctx->response; # PAGI::Response (HTTP only)
52              
53             =head1 DESCRIPTION
54              
55             PAGI::Context is a factory and base class that provides a unified entry
56             point for per-request context. Calling C<< PAGI::Context->new(...) >>
57             inspects C<< $scope->{type} >> and returns the appropriate subclass:
58             L, L, or
59             L.
60              
61             Shared methods (scope accessors, stash, session, event dispatcher) live
62             on the base class. Protocol-specific methods are delegated from
63             subclasses so you can use C<$ctx> as your single object:
64              
65             # Instead of:
66             my $ws = $ctx->websocket;
67             await $ws->send_json($data); # closes over $ws in every handler
68              
69             # Just do:
70             await $ctx->send_json($data); # $ctx is already in scope
71              
72             =head2 Protocol Shape
73              
74             Each context type has a different set of available methods. Calling a
75             method that belongs to a different protocol type raises a standard Perl
76             C error.
77              
78             Method HTTP WebSocket SSE
79             ────────────────── ────── ────────── ──────
80             request, response yes - -
81             text, html, json yes - -
82             redirect yes - -
83             method yes - -
84             accept - yes -
85             send_text - yes -
86             send_bytes - yes -
87             send_json - yes yes
88             send - - yes
89             send_event - - yes
90             send_comment - - yes
91             start - - yes
92             close - yes yes
93             query / query_param - yes(query) yes(query_param)
94             is_connected base* WS override -
95             is_closed - yes yes
96             is_started - - yes
97             keepalive - yes yes
98             each_text, etc. - yes -
99             each, every - - yes
100              
101             *is_connected on WebSocket contexts checks WS handshake state,
102             not the TCP-level pagi.connection that the base class uses.
103              
104             See L and L for the
105             full method reference on each subclass.
106              
107             =head1 EXTENSIBILITY
108              
109             Override C<_type_map> to add or replace protocol types:
110              
111             package MyApp::Context;
112             our @ISA = ('PAGI::Context');
113              
114             sub _type_map {
115             my ($class) = @_;
116             return {
117             %{ $class->SUPER::_type_map },
118             grpc => 'MyApp::Context::GRPC',
119             };
120             }
121              
122             Override C<_resolve_class> for custom resolution logic beyond the type map.
123              
124             =head1 CONSTRUCTOR
125              
126             =head2 new
127              
128             my $ctx = PAGI::Context->new($scope, $receive, $send);
129              
130             Factory constructor. Returns a subclass instance based on
131             C<< $scope->{type} >>. Defaults to HTTP if type is missing or unknown.
132              
133             =cut
134              
135             sub new {
136 245     245 1 2848277 my ($class, $scope, $receive, $send) = @_;
137 245         780 my $subclass = $class->_resolve_class($scope);
138 245         1127 return bless {
139             scope => $scope,
140             receive => $receive,
141             send => $send,
142             }, $subclass;
143             }
144              
145             =head1 CLASS METHODS
146              
147             =head2 _type_map
148              
149             my $map = PAGI::Context->_type_map;
150              
151             Returns a hashref mapping scope type strings to subclass package names.
152             Override in a subclass to add or replace protocol types.
153              
154             =cut
155              
156             sub _type_map {
157             return {
158 252     252   183911 http => 'PAGI::Context::HTTP',
159             websocket => 'PAGI::Context::WebSocket',
160             sse => 'PAGI::Context::SSE',
161             };
162             }
163              
164             =head2 _resolve_class
165              
166             my $class = PAGI::Context->_resolve_class($scope);
167              
168             Resolves the scope to a subclass package name. Looks up
169             C<< $scope->{type} >> in C<_type_map>; defaults to the C mapping
170             if the type is missing or unknown. Override for custom resolution logic.
171              
172             =cut
173              
174             sub _resolve_class {
175 249     249   2887 my ($class, $scope) = @_;
176 249   100     691 my $type = $scope->{type} // 'http';
177 249   66     560 return $class->_type_map->{$type} // $class->_type_map->{http};
178             }
179              
180             =head1 METHODS
181              
182             =head2 Scope Accessors
183              
184             $ctx->scope; # raw $scope hashref
185             $ctx->type; # $scope->{type}
186             $ctx->path; # $scope->{path}
187             $ctx->raw_path; # $scope->{raw_path} // $scope->{path}
188             $ctx->query_string; # $scope->{query_string} // ''
189             $ctx->scheme; # $scope->{scheme} // 'http'
190             $ctx->client; # $scope->{client}
191             $ctx->server; # $scope->{server}
192             $ctx->headers; # $scope->{headers} arrayref of [name, value]
193              
194             =cut
195              
196 3     3 0 41 sub scope { shift->{scope} }
197 47     47 0 1568 sub type { shift->{scope}{type} }
198 4     4 0 16 sub path { shift->{scope}{path} }
199 3   66 3 0 8 sub raw_path { my $s = shift; $s->{scope}{raw_path} // $s->{scope}{path} }
  3         40  
200 2   100 2 0 13 sub query_string { shift->{scope}{query_string} // '' }
201 2   100 2 0 10 sub scheme { shift->{scope}{scheme} // 'http' }
202 1     1 0 5 sub client { shift->{scope}{client} }
203 1     1 0 5 sub server { shift->{scope}{server} }
204 1     1 0 5 sub headers { shift->{scope}{headers} }
205              
206             =head2 assert_http, assert_websocket, assert_sse
207              
208             my $ctx = PAGI::Context->new($scope, $receive, $send)->assert_http;
209              
210             Type guards for handlers that support only one protocol. Each returns the
211             context unchanged when C<< $ctx->type >> matches, and Cs with a clear
212             message otherwise — turning a forgotten scope-type check into a loud, early
213             failure instead of a confusing C deeper in the
214             handler. Because they chain off the polymorphic L, the whole gate is one
215             line:
216              
217             my $ctx = PAGI::Context->new(@_)->assert_http; # croaks unless the scope is http
218              
219             They are named C rather than C<< PAGI::Context->http >> /
220             C<< ->websocket >> because C and C are already instance methods
221             that return the underlying channel objects.
222              
223             =cut
224              
225 3     3 1 26 sub assert_http { $_[0]->_assert_type('http') }
226 2     2 1 389 sub assert_websocket { $_[0]->_assert_type('websocket') }
227 2     2 1 751 sub assert_sse { $_[0]->_assert_type('sse') }
228              
229             sub _assert_type {
230 7     7   12 my ($self, $want) = @_;
231 7   50     16 my $got = $self->type // '(none)';
232 7 100       359 croak "expected a '$want' context, got a '$got' context" unless $got eq $want;
233 4         23 return $self;
234             }
235              
236             =head2 Path Parameters
237              
238             my $params = $ctx->path_params; # hashref
239             my $id = $ctx->path_param('id'); # strict: dies if missing
240             my $id = $ctx->path_param('id', strict => 0); # returns undef
241              
242             C returns the C<< $scope->{path_params} >> hashref (set by
243             the router), defaulting to C<{}> if not present.
244              
245             C returns a single parameter by name. By default it dies if
246             the key is not found (strict mode). Pass C<< strict => 0 >> to return
247             C for missing keys instead.
248              
249             =cut
250              
251             sub path_params {
252 8     8 0 18 my ($self) = @_;
253 8   100     32 return $self->{scope}{path_params} // {};
254             }
255              
256             sub path_param {
257 6     6 0 413 my ($self, $name, %opts) = @_;
258 6 100       16 my $strict = exists $opts{strict} ? $opts{strict} : 1;
259 6         13 my $params = $self->path_params;
260              
261 6 100 100     24 if ($strict && !exists $params->{$name}) {
262 1         4 my @available = sort keys %$params;
263 1 50       11 die "path_param '$name' not found. "
264             . (@available ? "Available: " . join(', ', @available) : "No path params set")
265             . "\n";
266             }
267              
268 5         46 return $params->{$name};
269             }
270              
271             =head2 Protocol Introspection
272              
273             $ctx->is_http; # true if type eq 'http'
274             $ctx->is_websocket; # true if type eq 'websocket'
275             $ctx->is_sse; # true if type eq 'sse'
276              
277             =cut
278              
279 5   50 5 0 45 sub is_http { (shift->{scope}{type} // '') eq 'http' }
280 5   50 5 0 24 sub is_websocket { (shift->{scope}{type} // '') eq 'websocket' }
281 4   50 4 0 19 sub is_sse { (shift->{scope}{type} // '') eq 'sse' }
282              
283             =head2 header
284              
285             my $value = $ctx->header('Content-Type');
286              
287             Returns the last value for the named header (case-insensitive), or
288             C if not found.
289              
290             =cut
291              
292             sub header {
293 12     12 1 78 my ($self, $name) = @_;
294 12         29 $name = lc($name);
295 12         37 my $value;
296 12   50     14 for my $pair (@{$self->{scope}{headers} // []}) {
  12         36  
297 25 100       54 if (lc($pair->[0]) eq $name) {
298 10         16 $value = $pair->[1];
299             }
300             }
301 12         33 return $value;
302             }
303              
304             =head2 receive
305              
306             my $receive = $ctx->receive;
307              
308             Returns the raw C<$receive> coderef. Calling it returns a L that
309             resolves to the next protocol event hashref from the client.
310              
311             # Read an HTTP request body event
312             my $event = await $ctx->receive->();
313             # $event = { type => 'http.request', body => '...' }
314              
315             # Read a WebSocket message
316             my $msg = await $ctx->receive->();
317             # $msg = { type => 'websocket.receive', text => 'hello' }
318              
319             Most users should prefer the protocol helpers (C<< $ctx->request >>,
320             C<< $ctx->websocket >>, C<< $ctx->sse >>) which handle the event
321             protocol internally. Use C only for raw protocol access.
322              
323             =head2 send
324              
325             my $send = $ctx->send;
326              
327             Returns the raw C<$send> coderef. Calling it with an event hashref
328             returns a L that resolves when the event has been sent.
329              
330             # Send an HTTP response (two events: start + body)
331             await $ctx->send->({ type => 'http.response.start', status => 200,
332             headers => [['content-type', 'text/plain']] });
333             await $ctx->send->({ type => 'http.response.body', body => 'Hello' });
334              
335             # Accept a WebSocket connection
336             await $ctx->send->({ type => 'websocket.accept' });
337              
338             Most users should prefer the protocol helpers (C<< $ctx->response >>,
339             C<< $ctx->websocket >>, C<< $ctx->sse >>) which build and send events
340             for you. Use C only for raw protocol access.
341              
342             =head2 raw_send
343              
344             my $send = $ctx->raw_send;
345              
346             Returns the raw C<$send> coderef on B context type. On the base context
347             this is identical to L, but subclasses never override C, whereas
348             some override C: L replaces C with a C
349             convenience (C<< $ctx->send($data) >> sends an SSE data event). So when you are
350             holding an SSE (or any) context and need the actual underlying channel rather
351             than the protocol convenience, reach for C.
352              
353             The reason you reach for it: emitting your B event types that something
354             downstream (a middleware) translates into the wire protocol — the same shape
355             PAGI's SSE/WebSocket layers themselves use (C / C are
356             custom send events a layer renders).
357              
358             # Inside an SSE handler: $ctx->send would emit an sse.send for you, but we
359             # want to emit our own typed event for a middleware to render.
360             my $emit = $ctx->raw_send;
361             await $emit->({ type => 'app.event', name => 'tick', data => 1 });
362              
363             # A middleware wrapping $send catches it and renders to the wire:
364             # my $wrapped = async sub ($ev) {
365             # if (($ev->{type} // '') eq 'app.event') {
366             # await $send->({ type => 'sse.send', event => $ev->{name},
367             # data => encode_json({ value => $ev->{data} }) });
368             # return;
369             # }
370             # await $send->($ev); # real protocol events pass through
371             # };
372              
373             Note the trap C avoids: on an SSE context C<< $ctx->send >> is a
374             B (it calls C<< $ctx->sse->send >>), not an accessor — so
375             C<< $ctx->send >> there is not the raw coderef. C always is.
376              
377             =cut
378              
379 1     1 1 11 sub receive { shift->{receive} }
380 2     2 1 7 sub send { shift->{send} }
381 3     3 1 1206 sub raw_send { shift->{send} }
382              
383             =head2 stash
384              
385             my $stash = $ctx->stash; # PAGI::Stash instance
386              
387             Returns a L wrapping C<< $scope->{'pagi.stash'} >>.
388             Lazy-constructed and cached.
389              
390             =head2 session
391              
392             my $session = $ctx->session; # PAGI::Session instance
393              
394             Returns a L wrapping C<< $scope->{'pagi.session'} >>.
395             Lazy-constructed and cached. Dies if session middleware has not run.
396             Use C to check availability first.
397              
398             =head2 has_session
399              
400             if ($ctx->has_session) {
401             my $user_id = $ctx->session->get('user_id');
402             }
403              
404             Returns true if session middleware has populated C<< $scope->{'pagi.session'} >>.
405              
406             =head2 state
407              
408             my $state = $ctx->state; # hashref
409              
410             Returns C<< $scope->{state} >> - the app/endpoint-level shared state.
411              
412             =cut
413              
414             sub stash {
415 17     17 1 2021 my ($self) = @_;
416 17   66     174 return $self->{_stash} //= do {
417 14         2283 require PAGI::Stash;
418 14         104 PAGI::Stash->new($self->{scope});
419             };
420             }
421              
422             sub session {
423 3     3 1 36 my ($self) = @_;
424 3   100     11 return $self->{_session} //= do {
425 2         14 require PAGI::Session;
426 2         27 PAGI::Session->new($self->{scope});
427             };
428             }
429              
430             sub has_session {
431 2     2 1 16 my ($self) = @_;
432 2 100       13 return exists $self->{scope}{'pagi.session'} ? 1 : 0;
433             }
434              
435             sub state {
436 11     11 1 45 my ($self) = @_;
437 11   100     87 return $self->{scope}{state} // {};
438             }
439              
440             =head2 Connection State
441              
442             $ctx->connection; # PAGI::Server::ConnectionState object
443             $ctx->is_connected; # boolean
444             $ctx->is_disconnected; # boolean
445             $ctx->disconnect_reason; # string or undef (abnormal disconnect only)
446             $ctx->on_disconnect($cb); # callback on abnormal disconnect
447             $ctx->on_complete($cb); # callback on clean completion
448              
449             Delegates to C<< $scope->{'pagi.connection'} >>. C fires only on
450             an abnormal end and C only on a clean finish -- exactly one per
451             request.
452              
453             C<< $ctx->buffered_amount >>, C<< $ctx->high_water_mark >>, and
454             C<< $ctx->low_water_mark >> expose outbound flow control via the server's
455             C handle (see L):
456             bytes queued for the client but not yet on the wire, and the backpressure band.
457             C returns C<0> (and the watermarks C) when the server
458             does not provide the handle.
459              
460             C<< $ctx->on_high_water($cb) >> and C<< $ctx->on_drain($cb) >> register
461             edge-triggered backpressure callbacks (pause/resume a producer), and
462             C<< $ctx->is_writable >> is true while the buffer is below the high mark. Both
463             delegate to C and degrade quietly -- the callbacks become no-ops
464             and C stays true -- when the handle (or its callback support) is
465             absent.
466              
467             =cut
468              
469             sub connection {
470 13     13 0 30 my ($self) = @_;
471 13         30 return $self->{scope}{'pagi.connection'};
472             }
473              
474             sub is_connected {
475 6     6 0 294 my ($self) = @_;
476 6         12 my $conn = $self->connection;
477 6 100       28 return 0 unless $conn;
478 4         12 return $conn->is_connected;
479             }
480              
481             sub is_disconnected {
482 3     3 0 512 my ($self) = @_;
483 3         7 return !$self->is_connected;
484             }
485              
486             sub disconnect_reason {
487 3     3 0 493 my ($self) = @_;
488 3         5 my $conn = $self->connection;
489 3 100       8 return undef unless $conn;
490 2         11 return $conn->disconnect_reason;
491             }
492              
493             sub on_disconnect {
494 1     1 0 296 my ($self, $cb) = @_;
495 1         3 my $conn = $self->connection;
496 1 50       4 return unless $conn;
497 1         3 $conn->on_disconnect($cb);
498             }
499              
500             sub on_complete {
501 2     2 0 45 my ($self, $cb) = @_;
502 2         9 my $conn = $self->connection;
503 2 100       18 return unless $conn;
504 1         5 $conn->on_complete($cb);
505             }
506              
507             # Outbound flow-control introspection (delegates to the pagi.transport handle)
508             sub buffered_amount {
509 2     2 0 12 my $self = shift;
510 2         5 my $t = $self->{scope}{'pagi.transport'};
511 2 100       8 return 0 unless $t;
512 1         4 return $t->buffered_amount;
513             }
514              
515             sub high_water_mark {
516 2     2 0 324 my $self = shift;
517 2         6 my $t = $self->{scope}{'pagi.transport'};
518 2 100       6 return undef unless $t;
519 1         3 return $t->high_water_mark;
520             }
521              
522             sub low_water_mark {
523 2     2 0 317 my $self = shift;
524 2         4 my $t = $self->{scope}{'pagi.transport'};
525 2 100       6 return undef unless $t;
526 1         4 return $t->low_water_mark;
527             }
528              
529             sub on_high_water {
530 2     2 0 31 my ($self, $cb) = @_;
531 2         5 my $t = $self->{scope}{'pagi.transport'};
532 2 100 66     15 $t->on_high_water($cb) if $t && $t->can('on_high_water');
533 2         10 return $self;
534             }
535              
536             sub on_drain {
537 2     2 0 364 my ($self, $cb) = @_;
538 2         5 my $t = $self->{scope}{'pagi.transport'};
539 2 100 66     12 $t->on_drain($cb) if $t && $t->can('on_drain');
540 2         8 return $self;
541             }
542              
543             sub is_writable {
544 3     3 0 5 my $self = shift;
545 3         5 my $t = $self->{scope}{'pagi.transport'};
546 3 100       8 return 1 unless $t;
547 2         6 my $high = $t->high_water_mark;
548 2 50       8 return 1 unless defined $high;
549 2 100       5 return $t->buffered_amount < $high ? 1 : 0;
550             }
551              
552             =head1 EVENT DISPATCHER
553              
554             The event dispatcher provides a generic, protocol-agnostic way to handle
555             PAGI events. It is most useful when the receive stream carries a mix of
556             protocol events and application-level events injected by middleware such
557             as C.
558              
559             my $ctx = PAGI::Context->new($scope, $receive, $send);
560              
561             $ctx->on('websocket.receive', async sub {
562             my ($ctx, $event) = @_;
563             my $text = $event->{text} // '';
564             await $ctx->send->({ type => 'websocket.send', text => "echo: $text" });
565             });
566              
567             $ctx->on('chat.message', async sub {
568             my ($ctx, $event) = @_;
569             # handle a channel-injected event
570             });
571              
572             $ctx->on_error(sub {
573             my ($ctx, $error, $source) = @_;
574             warn "[$source] $error";
575             });
576              
577             my $reason = await $ctx->run; # 'disconnect', 'stop', or 'error'
578              
579             =head2 on
580              
581             $ctx->on($event_type, $callback); # returns $ctx
582              
583             Register a handler for a raw PAGI event type string. Multiple handlers
584             may be registered for the same type; they are called in registration order.
585             Handlers receive C<($ctx, $event)>. Handlers may be plain coderefs or
586             Cs; if a handler returns a L, C awaits it before
587             continuing.
588              
589             Returns C<$ctx> for chaining.
590              
591             =head2 on_default
592              
593             $ctx->on_default($callback); # returns $ctx
594              
595             Register a single fallback handler, called with C<($ctx, $event)> for any
596             event that has no type-specific handler. The last registration wins. The
597             callback may be a plain coderef or an C; if it returns a L,
598             C awaits it. Exceptions are routed to C with
599             C.
600              
601             B C does B fire
602             for the protocol's disconnect event (C, C,
603             C): that event ends the loop normally and is not treated as an
604             "unhandled" surprise. To run cleanup or logging on disconnect, register a
605             handler for it explicitly instead:
606              
607             $ctx->on('websocket.disconnect', sub { ... });
608              
609             Returns C<$ctx> for chaining.
610              
611             =head2 on_error
612              
613             $ctx->on_error($callback); # returns $ctx
614              
615             Register an error callback. It is called when C<$receive-E()> fails
616             (C<$source = 'receive'>) or when a registered handler throws (C<$source =
617             'handler'>). Callbacks receive C<($ctx, $error, $source)>.
618              
619             Multiple callbacks may be registered and are called in order. Callbacks
620             may be Cs; if a callback returns a L, it is awaited.
621             If no callbacks are registered, errors are emitted via C.
622              
623             Returns C<$ctx> for chaining.
624              
625             Handlers and error callbacks are cleared automatically when C
626             resolves, so closures that capture C<$ctx> do not leak -- you do not need
627             to C them. (Weakening only matters if you register callbacks and
628             never call C.)
629              
630             =head2 stop
631              
632             $ctx->stop; # returns $ctx
633              
634             Signal the C loop to exit cleanly after the current event's handlers
635             finish, before the next event is read. C will resolve with reason
636             C<'stop'>.
637              
638             Returns C<$ctx> for chaining.
639              
640             =head2 run
641              
642             my $reason = await $ctx->run;
643              
644             Start the event dispatch loop. Reads events from the receive stream and
645             dispatches each to registered handlers. The loop runs until one of:
646              
647             =over 4
648              
649             =item * The protocol's terminal disconnect event arrives (C,
650             C, C) - resolves with C<'disconnect'>
651              
652             =item * C was called - resolves with C<'stop'>
653              
654             =item * C<$receive-E()> fails - fires C callbacks and resolves
655             with C<'error'>
656              
657             =back
658              
659             C always resolves successfully (never rejects). The caller does
660             not need to C it.
661              
662             Calling C a second time while already running throws synchronously.
663              
664             When run() resolves, all registered handlers and error callbacks are
665             cleared to break closure-based reference cycles.
666              
667             =cut
668              
669             # ---------------------------------------------------------------------------
670             # Event dispatcher - on(), on_error(), stop(), run()
671             # ---------------------------------------------------------------------------
672              
673             # Terminal event type by scope type (websocket.*, sse.*, http.* are reserved)
674             my %_TERMINAL = (
675             websocket => 'websocket.disconnect',
676             sse => 'sse.disconnect',
677             http => 'http.disconnect',
678             );
679              
680             sub _terminal_event_type {
681 36     36   40 my ($self) = @_;
682 36   50     57 return $_TERMINAL{ $self->type // '' };
683             }
684              
685             # Register a handler for a raw PAGI event type string.
686             # Returns $self for chaining.
687             sub on {
688 27     27 1 485 my ($self, $type, $cb) = @_;
689 27         32 push @{ $self->{_handlers}{$type} }, $cb;
  27         65  
690 27         43 return $self;
691             }
692              
693             # Register a single fallback handler for events with no type-specific handler
694             # (except the terminal disconnect). Last registration wins. Returns $self.
695             sub on_default {
696 5     5 1 24 my ($self, $cb) = @_;
697 5         8 $self->{_on_default} = $cb;
698 5         8 return $self;
699             }
700              
701             # Register an error handler. Called for both $receive->() failures
702             # (source='receive') and handler exceptions (source='handler').
703             # Returns $self for chaining.
704             sub on_error {
705 14     14 1 101 my ($self, $cb) = @_;
706 14         14 push @{ $self->{_on_error} }, $cb;
  14         23  
707 14         24 return $self;
708             }
709              
710             # Signal the run() loop to exit after the current handler finishes.
711             sub stop {
712 2     2 1 24 my ($self) = @_;
713 2         5 $self->{_stopped} = 1;
714 2         3 return $self;
715             }
716              
717             # Internal: fire error callbacks with ($ctx, $error, $source).
718 13     13   18 async sub _trigger_ctx_error {
719 13         26 my ($self, $error, $source) = @_;
720              
721 13         14 for my $cb (@{ $self->{_on_error} }) {
  13         24  
722 12         24 eval {
723 12         27 my $r = $cb->($self, $error, $source);
724 11 100 66     130 if (blessed($r) && $r->isa('Future')) {
725 2         7 await $r;
726             }
727             };
728 12 100       58 if ($@) {
729 2         20 warn "PAGI::Context on_error callback error: $@";
730             }
731             }
732              
733 13 100       15 if (!@{ $self->{_on_error} }) {
  13         65  
734 4         37 warn "PAGI::Context error ($source): $error";
735             }
736             }
737              
738             # Run the event dispatch loop. A plain sub so a re-entrant call croaks
739             # synchronously (rather than as a failed Future); the async loop is in _run().
740             # Always resolves (never rejects). Returns reason: 'disconnect', 'stop', 'error'.
741             sub run {
742 38     38 1 288 my ($self) = @_;
743             croak "PAGI::Context run() called while already running"
744 38 100       345 if $self->{_running};
745 36         49 $self->{_running} = 1;
746 36         68 return $self->_run;
747             }
748              
749 36     36   42 async sub _run {
750 36         35 my ($self) = @_;
751 36         73 $self->{_stopped} = 0;
752 36   100     116 $self->{_on_error} //= [];
753              
754 36         37 my $reason = 'stop';
755 36         67 my $terminal = $self->_terminal_event_type;
756              
757 36         79 LOOP: while (!$self->{_stopped}) {
758 56         60 my $event = eval { await $self->{receive}->() };
  56         79  
759 56 100       3367 if (my $err = $@) {
760 9         41 await $self->_trigger_ctx_error($err, 'receive');
761 9         304 $reason = 'error';
762 9         17 last LOOP;
763             }
764              
765 47   50     94 my $type = $event->{type} // '';
766              
767             # Snapshot before iterating - on() calls from inside a handler
768             # must not affect the current iteration.
769 47   100     45 my @handlers = @{ $self->{_handlers}{$type} // [] };
  47         126  
770              
771 47 100 66     190 if (@handlers) {
    100 100        
    100 66        
      100        
772 20         32 for my $cb (@handlers) {
773 22         21 eval {
774 22         43 my $r = $cb->($self, $event);
775 19 100 100     108 if (blessed($r) && $r->isa('Future')) {
776 1         2 await $r;
777             }
778             };
779 22 100       73 if (my $err = $@) {
780 3         6 await $self->_trigger_ctx_error($err, 'handler');
781             }
782             }
783             } elsif ($self->{_on_default} && !($terminal && $type eq $terminal)) {
784 2         29 eval {
785 2         6 my $r = $self->{_on_default}->($self, $event);
786 2 100 66     36 if (blessed($r) && $r->isa('Future')) { await $r; }
  1         3  
787             };
788 2 100       22 if (my $err = $@) { await $self->_trigger_ctx_error($err, 'handler'); }
  1         3  
789             } elsif ($ENV{PAGI_DEBUG} && !($terminal && $type eq $terminal)) {
790 1         9 warn "PAGI::Context: unhandled event type '$type'\n";
791             }
792              
793 47 100 66     257 if ($terminal && $type eq $terminal) {
794 25         25 $reason = 'disconnect';
795 25         43 last LOOP;
796             }
797             }
798              
799             # Clear callbacks to break any closure-based reference cycles.
800 36         98 $self->{_handlers} = {};
801 36         64 $self->{_on_error} = [];
802 36         52 $self->{_on_default} = undef;
803 36         37 $self->{_running} = 0;
804 36         33 $self->{_stopped} = 0;
805              
806 36         123 return $reason;
807             }
808              
809             # Load subclasses
810             require PAGI::Context::HTTP;
811             require PAGI::Context::WebSocket;
812             require PAGI::Context::SSE;
813              
814             1;
815              
816             __END__