File Coverage

blib/lib/Net/Async/MCP/Transport/HTTP.pm
Criterion Covered Total %
statement 18 105 17.1
branch 0 42 0.0
condition 0 11 0.0
subroutine 6 19 31.5
pod 4 4 100.0
total 28 181 15.4


line stmt bran cond sub pod time code
1             package Net::Async::MCP::Transport::HTTP;
2             # ABSTRACT: Streamable HTTP MCP transport via Net::Async::HTTP
3             our $VERSION = '0.002';
4 1     1   1221 use strict;
  1         3  
  1         53  
5 1     1   6 use warnings;
  1         2  
  1         65  
6 1     1   7 use parent 'IO::Async::Notifier';
  1         2  
  1         10  
7              
8 1     1   87 use Future;
  1         3  
  1         25  
9 1     1   5 use JSON::MaybeXS;
  1         2  
  1         78  
10 1     1   7 use Carp qw( croak );
  1         2  
  1         6870  
11              
12              
13             sub _init {
14 0     0     my ( $self, $params ) = @_;
15             $self->{url} = delete $params->{url}
16 0 0         or croak "url is required";
17 0           $self->{next_id} = 0;
18 0           $self->{session_id} = undef;
19 0           $self->{json} = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);
20 0           $self->SUPER::_init($params);
21             }
22              
23             sub configure {
24 0     0 1   my ( $self, %params ) = @_;
25 0 0         if (exists $params{url}) {
26 0           $self->{url} = delete $params{url};
27             }
28 0           $self->SUPER::configure(%params);
29             }
30              
31             sub _add_to_loop {
32 0     0     my ( $self, $loop ) = @_;
33 0           $self->SUPER::_add_to_loop($loop);
34              
35 0           require Net::Async::HTTP;
36              
37 0           my $http = Net::Async::HTTP->new(
38             max_connections_per_host => 0,
39             );
40 0           $self->{http} = $http;
41 0           $self->add_child($http);
42             }
43              
44             sub send_request {
45 0     0 1   my ( $self, $method, $params ) = @_;
46              
47 0           my $id = ++$self->{next_id};
48 0 0         my $request = {
49             jsonrpc => '2.0',
50             id => $id,
51             method => $method,
52             defined $params ? ( params => $params ) : (),
53             };
54              
55 0           my $body = $self->{json}->encode($request);
56              
57 0           my @headers = (
58             'Content-Type' => 'application/json',
59             'Accept' => 'application/json, text/event-stream',
60             );
61 0 0         if (defined $self->{session_id}) {
62 0           push @headers, 'Mcp-Session-Id' => $self->{session_id};
63             }
64              
65 0           require HTTP::Request;
66             my $http_req = HTTP::Request->new(
67             POST => $self->{url},
68 0           [ @headers ],
69             $body,
70             );
71              
72             return $self->{http}->do_request(request => $http_req)->then(sub {
73 0     0     my ( $response ) = @_;
74 0           return $self->_handle_response($response);
75 0           });
76             }
77              
78              
79             sub send_notification {
80 0     0 1   my ( $self, $method, $params ) = @_;
81              
82 0 0         my $request = {
83             jsonrpc => '2.0',
84             method => $method,
85             defined $params ? ( params => $params ) : (),
86             };
87              
88 0           my $body = $self->{json}->encode($request);
89              
90 0           my @headers = (
91             'Content-Type' => 'application/json',
92             'Accept' => 'application/json, text/event-stream',
93             );
94 0 0         if (defined $self->{session_id}) {
95 0           push @headers, 'Mcp-Session-Id' => $self->{session_id};
96             }
97              
98 0           require HTTP::Request;
99             my $http_req = HTTP::Request->new(
100             POST => $self->{url},
101 0           [ @headers ],
102             $body,
103             );
104              
105             return $self->{http}->do_request(request => $http_req)->then(sub {
106 0     0     return Future->done;
107 0           });
108             }
109              
110              
111             sub close {
112 0     0 1   my ( $self ) = @_;
113              
114 0 0         if (defined $self->{session_id}) {
115 0           require HTTP::Request;
116             my $http_req = HTTP::Request->new(
117             DELETE => $self->{url},
118 0           [ 'Mcp-Session-Id' => $self->{session_id} ],
119             );
120             return $self->{http}->do_request(request => $http_req)->then(sub {
121 0     0     $self->{session_id} = undef;
122 0           return Future->done;
123             })->else(sub {
124 0     0     $self->{session_id} = undef;
125 0           return Future->done;
126 0           });
127             }
128              
129 0           return Future->done;
130             }
131              
132              
133             sub _handle_response {
134 0     0     my ( $self, $response ) = @_;
135              
136 0           my $status = $response->code;
137              
138 0 0         if ($status == 404) {
139 0           $self->{session_id} = undef;
140 0           return Future->fail("MCP session expired (HTTP 404)");
141             }
142              
143 0 0         unless ($response->is_success) {
144 0           return Future->fail("MCP HTTP error: " . $response->status_line);
145             }
146              
147             # Capture session ID from response headers
148 0           my $session_id = $response->header('Mcp-Session-Id');
149 0 0         if (defined $session_id) {
150 0           $self->{session_id} = $session_id;
151             }
152              
153 0   0       my $content_type = $response->content_type // '';
154              
155 0 0         if ($content_type =~ m{^application/json}i) {
    0          
156 0           return $self->_handle_json_response($response->decoded_content);
157             }
158             elsif ($content_type =~ m{^text/event-stream}i) {
159 0           return $self->_handle_sse_response($response->decoded_content);
160             }
161              
162             # 202 Accepted with no body (for notifications/responses)
163 0 0         if ($status == 202) {
164 0           return Future->done(undef);
165             }
166              
167 0           return Future->fail("MCP HTTP unexpected content-type: $content_type");
168             }
169              
170             sub _handle_json_response {
171 0     0     my ( $self, $body ) = @_;
172              
173 0           my $data = eval { $self->{json}->decode($body) };
  0            
174 0 0         return Future->fail("MCP HTTP invalid JSON: $@") if $@;
175 0 0         return Future->fail("MCP HTTP invalid response") unless ref $data eq 'HASH';
176              
177 0 0         if (my $err = $data->{error}) {
178 0           return Future->fail("MCP error $err->{code}: $err->{message}");
179             }
180              
181 0           return Future->done($data->{result});
182             }
183              
184             sub _handle_sse_response {
185 0     0     my ( $self, $body ) = @_;
186              
187             # Parse SSE events, find the JSON-RPC response
188 0           my $last_data;
189 0           for my $line (split /\n/, $body) {
190 0 0         if ($line =~ /^data:\s*(.+)/) {
191 0           my $data_str = $1;
192 0           my $data = eval { $self->{json}->decode($data_str) };
  0            
193 0 0 0       next unless $data && ref $data eq 'HASH';
194             # Look for a JSON-RPC response (has id and result/error)
195 0 0 0       if (exists $data->{id} && (exists $data->{result} || exists $data->{error})) {
      0        
196 0           $last_data = $data;
197             }
198             }
199             }
200              
201 0 0         return Future->fail("MCP HTTP no JSON-RPC response in SSE stream")
202             unless $last_data;
203              
204 0 0         if (my $err = $last_data->{error}) {
205 0           return Future->fail("MCP error $err->{code}: $err->{message}");
206             }
207              
208 0           return Future->done($last_data->{result});
209             }
210              
211              
212             1;
213              
214             __END__