File Coverage

blib/lib/Net/Async/MCP/Transport/Stdio.pm
Criterion Covered Total %
statement 88 95 92.6
branch 15 30 50.0
condition 3 12 25.0
subroutine 17 17 100.0
pod 4 4 100.0
total 127 158 80.3


line stmt bran cond sub pod time code
1             package Net::Async::MCP::Transport::Stdio;
2             # ABSTRACT: Stdio MCP transport via subprocess JSON-RPC
3             our $VERSION = '0.002';
4 2     2   1100 use strict;
  2         10  
  2         89  
5 2     2   9 use warnings;
  2         9  
  2         97  
6 2     2   9 use parent 'IO::Async::Notifier';
  2         12  
  2         15  
7              
8 2     2   156 use Future;
  2         3  
  2         51  
9 2     2   938 use JSON::MaybeXS;
  2         21200  
  2         195  
10 2     2   15 use Carp qw( croak );
  2         4  
  2         3547  
11              
12              
13             sub _init {
14 1     1   11 my ( $self, $params ) = @_;
15             $self->{command} = delete $params->{command}
16 1 50       7 or croak "command is required";
17 1         2 $self->{pending} = {};
18 1         2 $self->{next_id} = 0;
19 1         2 $self->{buffer} = '';
20 1         2 $self->{closed} = 0;
21 1         3 $self->{json} = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);
22 1         21 $self->SUPER::_init($params);
23             }
24              
25             sub configure {
26 1     1 1 5 my ( $self, %params ) = @_;
27 1 50       4 if (exists $params{command}) {
28 0         0 $self->{command} = delete $params{command};
29             }
30 1         3 $self->SUPER::configure(%params);
31             }
32              
33             sub _add_to_loop {
34 1     1   62 my ( $self, $loop ) = @_;
35 1         3 $self->SUPER::_add_to_loop($loop);
36              
37 1         463 require IO::Async::Process;
38              
39             my $process = IO::Async::Process->new(
40             command => $self->{command},
41             stdin => { via => 'pipe_write' },
42             stdout => {
43             on_read => sub {
44 10     10   1706550 my ( $stream, $buffref, $eof ) = @_;
45 10         39 $self->_on_stdout_read($buffref, $eof);
46 10         465 return 0;
47             },
48             },
49             stderr => {
50             on_read => sub {
51 1     1   9711 my ( $stream, $buffref, $eof ) = @_;
52 1         2 $$buffref = '';
53 1         4 return 0;
54             },
55             },
56             on_finish => sub {
57 1     1   642 my ( $proc, $exitcode ) = @_;
58 1         4 $self->_on_finish($exitcode);
59             },
60 1         7217 );
61              
62 1         303 $self->{process} = $process;
63 1         9 $self->add_child($process);
64             }
65              
66             sub send_request {
67 10     10 1 28 my ( $self, $method, $params ) = @_;
68              
69 10 50       26 if ($self->{closed}) {
70 0         0 return Future->fail("MCP server process has exited");
71             }
72              
73 10         25 my $id = ++$self->{next_id};
74 10 100       52 my $request = {
75             jsonrpc => '2.0',
76             id => $id,
77             method => $method,
78             defined $params ? ( params => $params ) : (),
79             };
80              
81 10         100 my $json_line = $self->{json}->encode($request) . "\n";
82 10         40 $self->{process}->stdin->write($json_line);
83              
84 10         1569 my $future = $self->loop->new_future;
85 10         304 $self->{pending}{$id} = $future;
86 10         82 return $future;
87             }
88              
89              
90             sub send_notification {
91 1     1 1 3 my ( $self, $method, $params ) = @_;
92              
93 1 50       5 if ($self->{closed}) {
94 0         0 return Future->fail("MCP server process has exited");
95             }
96              
97 1 50       4 my $request = {
98             jsonrpc => '2.0',
99             method => $method,
100             defined $params ? ( params => $params ) : (),
101             };
102              
103 1         11 my $json_line = $self->{json}->encode($request) . "\n";
104 1         7 $self->{process}->stdin->write($json_line);
105              
106 1         315 return Future->done;
107             }
108              
109              
110             sub close {
111 1     1 1 3 my ( $self ) = @_;
112 1 50       3 return Future->done if $self->{closed};
113              
114 1         2 $self->{closed} = 1;
115              
116 1 50 33     20 if ($self->{process} && $self->{process}->is_running) {
117 1         8 my $future = $self->loop->new_future;
118 1         29 $self->{close_future} = $future;
119 1         4 $self->{process}->kill('TERM');
120 1         43 return $future;
121             }
122              
123 0         0 return Future->done;
124             }
125              
126              
127             sub _on_stdout_read {
128 10     10   41 my ( $self, $buffref, $eof ) = @_;
129 10         27 $self->{buffer} .= $$buffref;
130 10         16 $$buffref = '';
131              
132 10         78 while ($self->{buffer} =~ s/^(.*?)\n//) {
133 10         62 my $line = $1;
134 10         17 $line =~ s/\r$//;
135 10 50       20 next if $line eq '';
136              
137 10         14 my $response = eval { $self->{json}->decode($line) };
  10         145  
138 10 50 33     50 next unless $response && ref $response eq 'HASH';
139              
140 10         17 my $id = $response->{id};
141 10 50       15 next unless defined $id;
142              
143 10         23 my $future = delete $self->{pending}{$id};
144 10 50       18 next unless $future;
145              
146 10 50       24 if (my $err = $response->{error}) {
147 0         0 $future->fail("MCP error $err->{code}: $err->{message}");
148             }
149             else {
150 10         44 $future->done($response->{result});
151             }
152             }
153             }
154              
155             sub _on_finish {
156 1     1   3 my ( $self, $exitcode ) = @_;
157 1         2 $self->{closed} = 1;
158              
159 1         2 for my $id (keys %{$self->{pending}}) {
  1         6  
160 0         0 my $future = delete $self->{pending}{$id};
161 0 0 0     0 $future->fail("MCP server process exited (code $exitcode)")
162             if $future && !$future->is_ready;
163             }
164              
165 1 50 33     10 if ($self->{close_future} && !$self->{close_future}->is_ready) {
166 1         11 $self->{close_future}->done;
167             }
168             }
169              
170              
171             1;
172              
173             __END__