File Coverage

blib/lib/MCP/Server/Transport/HTTP.pm
Criterion Covered Total %
statement 192 192 100.0
branch 70 82 85.3
condition 17 23 73.9
subroutine 29 29 100.0
pod 4 4 100.0
total 312 330 94.5


line stmt bran cond sub pod time code
1             package MCP::Server::Transport::HTTP;
2 4     4   21 use Mojo::Base 'MCP::Server::Transport', -signatures;
  4         7  
  4         61  
3              
4 4     4   365 use Crypt::Misc qw(random_v4uuid);
  4         5  
  4         372  
5 4     4   17 use MCP::Server::Context;
  4         19  
  4         49  
6 4     4   1776 use MCP::Server::Session;
  4         9  
  4         26  
7 4     4   162 use Mojo::IOLoop;
  4         40  
  4         33  
8 4     4   108 use Mojo::JSON qw(to_json true);
  4         6  
  4         201  
9 4     4   16 use Mojo::Util qw(dumper);
  4         20  
  4         188  
10 4     4   18 use Scalar::Util qw(blessed weaken);
  4         4  
  4         230  
11              
12 4   50 4   17 use constant DEBUG => $ENV{MCP_DEBUG} || 0;
  4         11  
  4         8706  
13              
14             has 'auth';
15             has heartbeat => 30;
16             has 'metadata_url';
17             has session_timeout => 3600;
18             has sessions => sub { {} };
19             has streaming => 0;
20              
21 18 100   18 1 30 sub notifications ($self) { $self->streaming ? 1 : 0 }
  18         22  
  18         19  
  18         76  
22              
23 136     136 1 187 sub handle_request ($self, $c) {
  136         195  
  136         184  
  136         146  
24 136 100       410 if (my $auth = $self->auth) {
25 28 100       219 return $self->_unauthorized($c) unless my $info = $auth->($c);
26 26         1577 $c->stash('mcp.auth' => $info);
27             }
28              
29 134         988 my $method = $c->req->method;
30 134 100       1836 return $self->_handle_post($c) if $method eq 'POST';
31 27 100 100     107 return $self->_handle_get($c) if $method eq 'GET' && $self->streaming;
32 15 100 100     74 return $self->_handle_delete($c) if $method eq 'DELETE' && $self->streaming;
33 2         14 return $c->render(json => {error => 'Method not allowed'}, status => 405);
34             }
35              
36 4     4 1 24 sub notify ($self, $session_id, $method, $params = {}) {
  4         5  
  4         7  
  4         6  
  4         12  
  4         5  
37 4 50       14 return undef unless my $session = $self->sessions->{$session_id};
38 4 100       29 return undef unless my $stream = $session->stream;
39 3         40 $stream->write_sse({text => to_json({jsonrpc => '2.0', method => $method, params => $params})});
40 3         1122 return 1;
41             }
42              
43 3     3 1 5 sub notify_all ($self, $method, $params = {}) {
  3         4  
  3         5  
  3         4  
  3         17  
44 3 100       9 return undef unless $self->streaming;
45 2         18 my $payload = {text => to_json({jsonrpc => '2.0', method => $method, params => $params})};
46 2         24 for my $session (values %{$self->sessions}) {
  2         18  
47 1 50       7 next unless my $stream = $session->stream;
48 1         6 $stream->write_sse($payload);
49             }
50 2         234 return 1;
51             }
52              
53 6     6   99 sub _challenge_header ($self, %extra) {
  6         7  
  6         11  
  6         8  
54 6         7 my @parts;
55 6 50       16 push @parts, qq{resource_metadata="@{[$self->metadata_url]}"} if $self->metadata_url;
  6         51  
56 6 100       51 push @parts, qq{error="$extra{error}"} if $extra{error};
57 6 100       19 push @parts, qq{scope="$extra{scope}"} if defined $extra{scope};
58 6 50       40 return 'Bearer' . (@parts ? ' ' . join(', ', @parts) : '');
59             }
60              
61 132     132   178 sub _extract_session_id ($self, $c) { return $c->req->headers->header('Mcp-Session-Id') }
  132         151  
  132         145  
  132         128  
  132         237  
62              
63 105     105   555 sub _handle ($self, $data, $context) {
  105         121  
  105         125  
  105         118  
  105         107  
64 105         121 warn "-- MCP Request\n@{[dumper($data)]}\n" if DEBUG;
65 105         279 my $result = $self->server->handle($data, $context);
66 105         381 warn "-- MCP Response\n@{[dumper($result)]}\n" if DEBUG && $result;
67 105         331 return $result;
68             }
69              
70 13     13   84 sub _handle_delete ($self, $c) {
  13         15  
  13         17  
  13         22  
71 13 100       65 return $c->render(json => {error => 'Missing session ID'}, status => 400)
72             unless my $session_id = $self->_extract_session_id($c);
73             return $c->render(json => {error => 'Session not found'}, status => 404)
74 12 100       349 unless my $session = delete $self->sessions->{$session_id};
75              
76 10 100       82 if (my $stream = $session->stream) { $stream->finish }
  6         55  
77 10         1788 $c->render(data => '', status => 204);
78             }
79              
80 12     12   86 sub _handle_get ($self, $c) {
  12         14  
  12         18  
  12         12  
81 12 100       32 return $c->render(json => {error => 'Missing session ID'}, status => 400)
82             unless my $session_id = $self->_extract_session_id($c);
83             return $c->render(json => {error => 'Session not found'}, status => 404)
84 11 100       349 unless my $session = $self->sessions->{$session_id};
85 8 100       54 return $c->render(json => {error => 'Stream already open'}, status => 409) if $session->stream;
86              
87 7         56 $c->inactivity_timeout(0);
88 7         468 $c->res->headers->header('Mcp-Session-Id' => $session_id);
89 7         279 $session->stream($c)->touch;
90 7         25 $c->write_sse;
91              
92 7         1154 my $heartbeat_id;
93 7 100       24 if (my $interval = $self->heartbeat) {
94 1     1   9 $heartbeat_id = Mojo::IOLoop->recurring($interval => sub { $c->write_sse({comment => 'keepalive'}) });
  1         433444  
95             }
96              
97 7         88 weaken(my $self_weak = $self);
98             $c->on(
99             finish => sub {
100 7 100   7   10864 Mojo::IOLoop->remove($heartbeat_id) if $heartbeat_id;
101 7 50       67 return unless $self_weak;
102 7 100       24 return unless my $session = $self_weak->sessions->{$session_id};
103 1 50 50     8 return unless ($session->stream // 0) == $c;
104 1         7 $session->stream(undef)->touch;
105             }
106 7         42 );
107             }
108              
109 18     18   36 sub _handle_initialization ($self, $c, $data) {
  18         29  
  18         28  
  18         23  
  18         26  
110 18         103 my $session_id = random_v4uuid;
111 18         1649 my $result = $self->_handle($data, MCP::Server::Context->new(scopes => $self->_scopes($c)));
112 18 100       78 if ($self->streaming) {
113 11         73 $self->sessions->{$session_id} = MCP::Server::Session->new(id => $session_id);
114 11         100 $self->_start_sweep;
115             }
116 18         141 $c->res->headers->header('Mcp-Session-Id' => $session_id);
117 18         754 $c->render(json => $result, status => 200);
118             }
119              
120 107     107   160 sub _handle_post ($self, $c) {
  107         2948  
  107         157  
  107         122  
121 107         292 my $session_id = $self->_extract_session_id($c);
122              
123 107 50       2338 return $c->render(json => {error => 'Invalid JSON'}, status => 400) unless my $data = $c->req->json;
124 107 50       5449 return $c->render(json => {error => 'Invalid JSON', status => 400}) unless ref $data eq 'HASH';
125              
126 107 100 66     528 if ($data->{method} && $data->{method} eq 'initialize') { $self->_handle_initialization($c, $data) }
  18         59  
127 89         290 else { $self->_handle_regular_request($c, $data, $session_id) }
128             }
129              
130 89     89   105 sub _handle_regular_request ($self, $c, $data, $session_id) {
  89         114  
  89         103  
  89         113  
  89         125  
  89         97  
131 89 50       165 return $c->render(json => {error => 'Missing session ID'}, status => 400) unless $session_id;
132 89 100       224 if ($self->streaming) {
133             return $c->render(json => {error => 'Session not found'}, status => 404)
134 18 100       90 unless my $session = $self->sessions->{$session_id};
135 16         123 $session->touch;
136             }
137              
138 87         482 $c->res->headers->header('Mcp-Session-Id' => $session_id);
139 87         3070 my $context = MCP::Server::Context->new(
140             transport => $self,
141             session_id => $session_id,
142             controller => $c,
143             scopes => $self->_scopes($c)
144             );
145 87 100       1451 return $c->render(data => '', status => 202) unless defined(my $result = $self->_handle($data, $context));
146              
147             # Insufficient scope
148 70 100       184 if (my $needed = $context->insufficient_scope) {
149 4         18 $c->res->headers->header(
150             'WWW-Authenticate' => $self->_challenge_header(error => 'insufficient_scope', scope => join(' ', @$needed)));
151 4         93 return $c->render(json => $result, status => 403);
152             }
153              
154             # Sync
155 66 100 66     519 return $c->render(json => $result, status => 200) if !blessed($result) || !$result->isa('Mojo::Promise');
156              
157             # Async
158 6         39 $c->inactivity_timeout(0);
159 6         411 $c->write_sse;
160 6     6   1095 $result->then(sub { $c->write_sse({text => to_json($_[0])})->finish });
  6         491  
161             }
162              
163 105     105   152 sub _scopes ($self, $c) {
  105         135  
  105         148  
  105         118  
164 105 100       237 return undef unless $self->auth;
165 26   50     140 return ($c->stash('mcp.auth') // {})->{scopes} // [];
      50        
166             }
167              
168 11     11   14 sub _start_sweep ($self) {
  11         15  
  11         15  
169 11 100       25 return if $self->{_sweep_id};
170 1 50       2 return unless my $interval = $self->session_timeout;
171 1         6 weaken(my $self_weak = $self);
172 1 50   7   9 $self->{_sweep_id} = Mojo::IOLoop->recurring($interval => sub { $self_weak->_sweep if $self_weak });
  7         1649584  
173             }
174              
175 7     7   22 sub _sweep ($self) {
  7         31  
  7         16  
176 7 50       47 return unless my $timeout = $self->session_timeout;
177 7         100 my $cutoff = time - $timeout;
178 7         37 my $sessions = $self->sessions;
179 7         67 for my $id (keys %$sessions) {
180 9         43 my $session = $sessions->{$id};
181 9 100 100     40 delete $sessions->{$id} if !$session->stream && $session->last_used < $cutoff;
182             }
183             }
184              
185 2     2   82 sub _unauthorized ($self, $c) {
  2         3  
  2         2  
  2         5  
186 2         8 $c->res->headers->header('WWW-Authenticate' => $self->_challenge_header);
187 2         65 return $c->render(json => {error => 'Unauthorized'}, status => 401);
188             }
189              
190             1;
191              
192             =encoding utf8
193              
194             =head1 NAME
195              
196             MCP::Server::Transport::HTTP - HTTP transport for MCP servers
197              
198             =head1 SYNOPSIS
199              
200             use MCP::Server::Transport::HTTP;
201              
202             my $http = MCP::Server::Transport::HTTP->new;
203              
204             =head1 DESCRIPTION
205              
206             L is a transport for MCP (Model Context Protocol) server that uses HTTP as the
207             underlying transport mechanism.
208              
209             By default only C requests are handled. When L is enabled, the transport additionally supports
210             the server-to-client SSE stream (C) and explicit session termination (C) defined by the Streamable
211             HTTP transport. Note that this requires per-process state and is therefore not compatible with pre-forking web
212             servers.
213              
214             =head1 ATTRIBUTES
215              
216             L inherits all attributes from L and implements the following
217             new ones.
218              
219             =head2 auth
220              
221             my $cb = $http->auth;
222             $http = $http->auth(sub ($c) {...});
223              
224             Optional callback to authenticate each request before it is dispatched. It receives the L
225             and returns a hash reference of authentication info on success, or a false value to reject the request with a
226             C<401> C challenge. The C key of the returned hash reference is made available to handlers
227             as L. Token validation is left to the application, so this is where you verify an
228             OAuth 2.0 access token; when not set, requests are not authenticated.
229              
230             =head2 heartbeat
231              
232             my $seconds = $http->heartbeat;
233             $http = $http->heartbeat(30);
234              
235             Interval in seconds at which a keep-alive comment is sent on each open server-to-client stream. Defaults to C<30>;
236             set to C<0> to disable. Useful when running behind reverse proxies that close idle connections. Only used when
237             L is enabled.
238              
239             =head2 metadata_url
240              
241             my $url = $http->metadata_url;
242             $http = $http->metadata_url('https://example.com/.well-known/oauth-protected-resource');
243              
244             URL of the OAuth 2.0 Protected Resource Metadata document. When set, it is included as the C
245             parameter of the C challenge sent with C<401> and C<403> responses, so clients can discover the
246             authorization server. Use an absolute URL so remote clients can fetch it. See L.
247              
248             =head2 session_timeout
249              
250             my $seconds = $http->session_timeout;
251             $http = $http->session_timeout(3600);
252              
253             Idle timeout in seconds for sessions without an open server-to-client stream. Defaults to C<3600>; set to C<0> to
254             disable. A periodic sweep removes sessions whose last activity is older than this value, so the effective lifetime
255             of an idle session is up to twice the configured timeout. Only used when L is enabled.
256              
257             =head2 sessions
258              
259             my $sessions = $http->sessions;
260             $http = $http->sessions({});
261              
262             Per-process registry of active L objects, keyed by session ID. Only used when L
263             is enabled.
264              
265             =head2 streaming
266              
267             my $bool = $http->streaming;
268             $http = $http->streaming(1);
269              
270             Enable server-to-client streaming and session lifecycle management. Defaults to false. When enabled, the transport
271             tracks all sessions in L, accepts C requests to open a long-lived SSE stream the server can push
272             notifications to, and accepts C requests to terminate a session. Requests for unknown sessions are rejected
273             with status C<404>.
274              
275             =head1 METHODS
276              
277             L inherits all methods from L and implements the following new
278             ones.
279              
280             =head2 handle_request
281              
282             $http->handle_request(Mojolicious::Controller->new);
283              
284             Handles an incoming HTTP request.
285              
286             =head2 notifications
287              
288             my $bool = $http->notifications;
289              
290             True when L is enabled, false otherwise.
291              
292             =head2 notify
293              
294             my $bool = $http->notify($session_id, $method);
295             my $bool = $http->notify($session_id, $method, {foo => 'bar'});
296              
297             Send a JSON-RPC notification to the open SSE stream of a session. Returns true on success, or C if the
298             session does not exist or has no open stream. Only available when L is enabled.
299              
300             =head2 notify_all
301              
302             my $bool = $http->notify_all($method);
303             my $bool = $http->notify_all($method, {foo => 'bar'});
304              
305             Send a JSON-RPC notification to the open SSE stream of every active session. Returns true on success, or C
306             when L is disabled.
307              
308             =head1 SEE ALSO
309              
310             L, L, L.
311              
312             =cut