File Coverage

blib/lib/Conduit/Client.pm
Criterion Covered Total %
statement 139 143 97.2
branch 25 40 62.5
condition 4 7 57.1
subroutine 19 19 100.0
pod 0 2 0.0
total 187 211 88.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2026 -- leonerd@leonerd.org.uk
5              
6 6     6   103 use v5.36;
  6         39  
7              
8 6     6   67 use feature 'try';
  6         17  
  6         1210  
9 6     6   50 use Future::AsyncAwait;
  6         13  
  6         69  
10 6     6   472 use Object::Pad 0.807 ':experimental(inherit_field)';
  6         65  
  6         387  
11              
12             class Conduit::Client 0.03
13             :strict(params);
14              
15 6     6   2791 no if $^V lt v5.40, warnings => "experimental::try", "experimental::builtin";
  6         12  
  6         821  
16              
17             field $socket :param;
18              
19 6     6   4334 use Future::Buffer;
  6         25969  
  6         595  
20 6     6   78 use Future::IO;
  6         19  
  6         437  
21 6     6   4431 use HTTP::Request;
  6         265901  
  6         475  
22 6     6   4408 use HTTP::Response;
  6         82141  
  6         736  
23              
24 6     6   3766 use Conduit::Metrics;
  6         27  
  6         10013  
25              
26             field $readbuf;
27             ADJUST {
28             $readbuf = Future::Buffer->new(
29             fill => sub () { Future::IO->sysread( $socket, 8192 ); },
30             );
31             }
32              
33             field $server :param :inheritable;
34             ADJUST { builtin::weaken( $server ); }
35              
36 17     17 0 1103 async method read_request ()
  17         63  
  17         34  
37 17         37 {
38 17 100       178 defined( my $header = await $readbuf->read_until( qr/\x0D\x0A\x0D\x0A/ ) )
39             or return undef;
40              
41 9         28770 my $req = HTTP::Request->parse( $header );
42              
43 9   100     49177 my $content_length = $req->content_length // 0;
44              
45 9 100       991 if( $content_length ) {
46 1         8 my $body = await $readbuf->read_exactly( $content_length );
47              
48 1         120 $req->add_content( $body );
49             }
50              
51 9         235 Conduit::Metrics->received_request( $req );
52 9         634 return $req;
53             }
54              
55             field $bytes_written :inheritable;
56              
57 15     15 0 1355 async method write ( $str )
  15         104  
  15         48  
  15         25  
58 15         38 {
59 15         123 await Future::IO->write_exactly( $socket, $str );
60 15         29699 $bytes_written += length $str;
61             }
62              
63             class Conduit::Client::_ForHTTP
64             :strict(params);
65             inherit Conduit::Client qw( $server $bytes_written );
66              
67 6     6   1413 no if $^V lt v5.40, warnings => "experimental::try", "experimental::builtin";
  6         16  
  6         5518  
68              
69             field $responder :param;
70              
71 3     3   8 async method run ()
  3         10  
  3         6  
72 3         8 {
73 3         29 while( defined( my $req = await $self->read_request ) ) {
74 4         414 my $resp;
75 4         42 try {
76 4         23 $resp = await $responder->( $req );
77             }
78             catch( $e ) {
79 1         9 chomp $e;
80 1         7 $resp = HTTP::Response->new( 500, undef,
81             [ "Content-Type" => "text/plain" ],
82             $e,
83             );
84             }
85 4         880 $resp->request( $req );
86              
87 4 50       106 $resp->protocol or $resp->protocol( $req->protocol );
88 4 50       125 $resp->content_length or $resp->content_length( length $resp->content );
89              
90 4         393 $bytes_written = 0;
91              
92 4 50       47 $server->on_response_header( $req, $resp )
93             if $server->can( "on_response_header" );
94              
95 4         25 await $self->write( $resp->as_string( "\x0D\x0A" ) );
96 4         405 Conduit::Metrics->sent_response( $resp, $bytes_written );
97             }
98             }
99              
100             class Conduit::Client::_ForPSGI
101             :strict(params);
102             inherit Conduit::Client qw( $server $bytes_written );
103              
104 6     6   1054 no if $^V lt v5.40, warnings => "experimental::try", "experimental::builtin";
  6         16  
  6         19799  
105              
106             field $psgi_app :param;
107              
108 5     5   17 async method run ()
  5         19  
  5         11  
109 5         14 {
110 5         63 while( defined( my $req = await $self->read_request ) ) {
111 5         787 my $uri = $req->uri;
112              
113 5         95 my $path_info = $uri->path;
114 5 50       262 $path_info = "" if $path_info eq "/";
115              
116 5         62 open my $stdin, "<", \$req->content;
117              
118 5   50     248 my %env = (
119             SERVER_PROTOCOL => $req->protocol,
120             SCRIPT_NAME => '',
121             PATH_INFO => $path_info,
122             QUERY_STRING => $uri->query // "",
123             REQUEST_METHOD => $req->method,
124             REQUEST_URI => $uri->path,
125             'psgi.version' => [1,0],
126             'psgi.url_scheme' => "http",
127             'psgi.input' => $stdin,
128             'psgi.errors' => \*STDERR,
129             'psgi.multithread' => 0,
130             'psgi.multiprocess' => 0,
131             'psgi.run_once' => 0,
132             'psgi.nonblocking' => 1,
133             'psgi.streaming' => 1,
134             );
135              
136             # TODO: socket info
137              
138 1     1   46 $req->scan( sub ( $name, $value ) {
  1         3  
  1         3  
  1         2  
139 1         7 $name =~ s/-/_/g;
140 1         5 $name = uc $name;
141              
142             # Content-Length and Content-Type don't get an HTTP_ prefix
143 1 50       9 $name = "HTTP_$name" unless $name =~ m/^CONTENT_(?:LENGTH|TYPE)$/;
144              
145 1         5 $env{$name} = $value;
146 5         470 } );
147              
148 5         303 my $psgiresp;
149 5         16 try {
150 5         29 $psgiresp = $psgi_app->( \%env );
151             }
152             catch( $e ) {
153 0         0 chomp $e;
154 0         0 $psgiresp = [ 500, [ "Content-Type" => "text/plain" ], [ $e ] ];
155             }
156              
157 5         83 $bytes_written = 0;
158              
159 5     5   12 my $aresponder = async sub ( $v ) {
  5         13  
  5         10  
  5         9  
160 5         14 my ( $status, $headers, $body ) = @$v;
161              
162 5         45 my $resp = HTTP::Response->new( $status );
163              
164 5         379 $resp->request( $req );
165 5         64 $resp->protocol( $req->protocol );
166              
167 5         101 my $has_content_length = 0;
168 5         9 my $use_chunked_transfer = 0;
169 5         33 while( my ( $key, $value ) = splice @$headers, 0, 2 ) {
170 5         1134 $resp->push_header( $key, $value );
171              
172 5 50       364 $has_content_length = 1 if $key eq "Content-Length";
173 5 50 33     71 $use_chunked_transfer = 1 if $key eq "Transfer-Encoding" and $value eq "chunked";
174             }
175              
176 5 50       33 if( !defined $body ) {
    100          
177 0         0 die "TODO: no body yet; use deferred writer";
178             }
179             elsif( ref $body eq "ARRAY" ) {
180 4 50       14 unless( $has_content_length ) {
181 4         10 my $len = 0;
182 4         15 $len += length( $_ ) for @$body;
183              
184 4         27 $resp->content_length( $len );
185             }
186              
187 4 50       228 $server->on_response_header( $req, $resp )
188             if $server->can( "on_response_header" );
189              
190 4         22 await $self->write( $resp->as_string( "\x0D\x0A" ) );
191              
192 4         330 foreach my $chunk ( @$body ) {
193 4         18 await $self->write( $chunk );
194             }
195             }
196             else {
197 1 50       8 unless( $has_content_length ) {
198 1         17 $resp->header( "Transfer-Encoding" => "chunked" );
199 1         134 $use_chunked_transfer = 1;
200             }
201              
202 1 50       9 $server->on_response_header( $req, $resp )
203             if $server->can( "on_response_header" );
204              
205 1         7 await $self->write( $resp->as_string( "\x0D\x0A" ) );
206              
207             # PSGI says we must call the synchronous ->getline method
208 1         90 while(1) {
209 2         92 my $buffer = do { local $/ = \8192; $body->getline; };
  2         12  
  2         72  
210 2 100       12 defined $buffer or last;
211              
212 1 50       8 $buffer = sprintf( "%X\x0D\x0A", length $buffer )
213             . $buffer if $use_chunked_transfer;
214              
215 1         5 await $self->write( $buffer );
216             }
217              
218 1 50       8 await $self->write( "0\x0D\x0A\x0D\x0A" ) if $use_chunked_transfer;
219             }
220              
221 5         424 Conduit::Metrics->sent_response( $resp, $bytes_written );
222 5         40 };
223              
224 5 100       27 if( ref $psgiresp eq "ARRAY" ) {
    50          
225 4         29 await $aresponder->( $psgiresp );
226             }
227             elsif( ref $psgiresp eq "CODE" ) {
228 1     1   12 $psgiresp->( sub ( @args ){
  1         22  
  1         2  
229             # PSGI app is expecting a *synchronous* responder.
230             # We have no choice here
231 1         5 $aresponder->( @args )->get;
232 1         8 } );
233             }
234             else {
235 0         0 die "ARGH PSGI app returned neither ARRAY nor CODE";
236             }
237             }
238             }
239              
240             0x55AA;