File Coverage

blib/lib/Net/Async/HTTP/Connection.pm
Criterion Covered Total %
statement 326 348 93.6
branch 141 184 76.6
condition 61 84 72.6
subroutine 36 37 97.3
pod 4 9 44.4
total 568 662 85.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2019 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP::Connection;
7              
8 37     37   247 use strict;
  37         122  
  37         1126  
9 37     37   204 use warnings;
  37         79  
  37         1365  
10              
11             our $VERSION = '0.47';
12              
13 37     37   233 use Carp;
  37         75  
  37         2074  
14              
15 37     37   289 use base qw( IO::Async::Stream );
  37         92  
  37         21515  
16             IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write )
17              
18 37     37   1382012 use Net::Async::HTTP::StallTimer;
  37         93  
  37         1192  
19              
20 37     37   17909 use HTTP::Response;
  37         915036  
  37         1812  
21              
22             my $CRLF = "\x0d\x0a"; # More portable than \r\n
23              
24 37     37   288 use Struct::Dumb;
  37         88  
  37         276  
25             struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )],
26             named_constructor => 1;
27              
28             # Detect whether HTTP::Message properly trims whitespace in header values. If
29             # it doesn't, we have to deploy a workaround to fix them up.
30             # https://rt.cpan.org/Ticket/Display.html?id=75224
31 37     37   3417 use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name: value " )->header("Name") eq "value";
  37         98  
  37         301  
32              
33             =head1 NAME
34              
35             C - HTTP client protocol handler
36              
37             =head1 DESCRIPTION
38              
39             This class provides a connection to a single HTTP server, and is used
40             internally by L. It is not intended for general use.
41              
42             =cut
43              
44             sub _init
45             {
46 98     98   1534 my $self = shift;
47 98         510 $self->SUPER::_init( @_ );
48              
49 98         2125 $self->{requests_in_flight} = 0;
50             }
51              
52             sub configure
53             {
54 280     280 1 250344 my $self = shift;
55 280         878 my %params = @_;
56              
57 280         661 foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) {
58 1400 100       3168 $self->{$_} = delete $params{$_} if exists $params{$_};
59             }
60              
61 280 100       703 if( my $on_closed = $params{on_closed} ) {
62             $params{on_closed} = sub {
63 65     65   13350 my $self = shift;
64              
65 65         363 $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" );
66              
67 65         373 $self->error_all( "Connection closed" );
68              
69 65         141 undef $self->{ready_queue};
70 65         223 $on_closed->( $self );
71 98         466 };
72             }
73              
74 280 50       737 croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight};
75              
76 280         1110 $self->SUPER::configure( %params );
77             }
78              
79             sub should_pipeline
80             {
81 201     201 0 525 my $self = shift;
82             return $self->{pipeline} &&
83             $self->{can_pipeline} &&
84 201   66     1654 ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} );
85             }
86              
87             sub connect
88             {
89 98     98 1 275 my $self = shift;
90 98         457 my %args = @_;
91              
92 98         787 $self->debug_printf( "CONNECT $args{host}:$args{service}" );
93              
94 98 50       547 defined wantarray or die "VOID ->connect";
95              
96             $self->SUPER::connect(
97             socktype => "stream",
98             %args
99             )->on_done( sub {
100 90     90   31668 $self->debug_printf( "CONNECTED" );
101 98         622 });
102             }
103              
104             sub ready
105             {
106 203     203 0 351 my $self = shift;
107              
108 203 100       567 my $queue = $self->{ready_queue} or return;
109              
110 182 100 66     403 if( $self->should_pipeline ) {
    100          
111 77         237 $self->debug_printf( "READY pipelined" );
112 77   100     395 while( @$queue && $self->should_pipeline ) {
113 18         85 my $ready = shift @$queue;
114 18         67 my $f = $ready->future;
115 18 100       152 next if $f->is_cancelled;
116              
117 17 100       112 $ready->connecting and $ready->connecting->cancel;
118              
119 17         129 $f->done( $self );
120             }
121             }
122             elsif( @$queue and $self->is_idle ) {
123 97         333 $self->debug_printf( "READY non-pipelined" );
124 97         430 while( @$queue ) {
125 97         200 my $ready = shift @$queue;
126 97         303 my $f = $ready->future;
127 97 50       765 next if $f->is_cancelled;
128              
129 97 100       623 $ready->connecting and $ready->connecting->cancel;
130              
131 97         908 $f->done( $self );
132 97         3899 last;
133             }
134             }
135             else {
136 8 50       21 $self->debug_printf( "READY cannot-run queue=%d idle=%s",
137             scalar @$queue, $self->is_idle ? "Y" : "N");
138             }
139             }
140              
141             sub is_idle
142             {
143 156     156 0 277 my $self = shift;
144 156         655 return $self->{requests_in_flight} == 0;
145             }
146              
147             sub on_read
148             {
149 242     242 1 371961 my $self = shift;
150 242         499 my ( $buffref, $closed ) = @_;
151              
152 242         1499 while( my $head = $self->{request_queue}[0]) {
153 214 100 50     1344 shift @{ $self->{request_queue} } and next if $head->is_done;
  1         9  
154              
155 213 100       1344 $head->stall_timer->reset if $head->stall_timer;
156              
157 213         1944 my $ret = $head->on_read->( $self, $buffref, $closed, $head );
158              
159 212 100       651 if( defined $ret ) {
160 100 100       432 return $ret if !ref $ret;
161              
162 69         239 $head->on_read = $ret;
163 69         593 return 1;
164             }
165              
166 112 50       313 $head->is_done or die "ARGH: undef return without being marked done";
167              
168 112         630 shift @{ $self->{request_queue} };
  112         283  
169 112 100 100     617 return 1 if !$closed and length $$buffref;
170 110         5271 return;
171             }
172              
173             # Reinvoked after switch back to baseline, but may be idle again
174 29 100 66     140 return if $closed or !length $$buffref;
175              
176 1         13 $self->invoke_error( "Spurious on_read of connection while idle",
177             http_connection => read => $$buffref );
178 1         47 $$buffref = "";
179             }
180              
181             sub on_write_eof
182             {
183 0     0 1 0 my $self = shift;
184 0         0 $self->error_all( "Connection closed", http => undef, undef );
185             }
186              
187             sub error_all
188             {
189 65     65 0 190 my $self = shift;
190              
191 65         109 while( my $head = shift @{ $self->{request_queue} } ) {
  99         1880  
192 34 100 100     304 $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready;
193             }
194             }
195              
196             sub request
197             {
198 127     127 0 252 my $self = shift;
199 127         518 my %args = @_;
200              
201 127 50       406 my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref";
202              
203 127         240 my $req = $args{request};
204 127 50 33     992 ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference";
205              
206 127         441 $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri );
207              
208 127         2426 my $request_body = $args{request_body};
209 127         281 my $expect_continue = !!$args{expect_continue};
210              
211 127         276 my $method = $req->method;
212              
213 127 100 100     1773 if( $method eq "POST" or $method eq "PUT" or length $req->content ) {
      66        
214 11         51 $req->init_header( "Content-Length", length $req->content );
215             }
216              
217 127 100       2182 if( $expect_continue ) {
218 1         2 $req->init_header( "Expect", "100-continue" );
219             }
220              
221 127 100       364 if( $self->{decode_content} ) {
222             #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode )
223 2         13 $req->init_header( "Accept-Encoding", "gzip" );
224             }
225              
226 127         550 my $f = $self->loop->new_future
227             ->set_label( "$method " . $req->uri );
228              
229             # TODO: Cancelling a request Future shouldn't necessarily close the socket
230             # if we haven't even started writing the request yet. But we can't know
231             # that currently.
232             $f->on_cancel( sub {
233 5     5   7625 $self->debug_printf( "CLOSE on_cancel" );
234 5         70 $self->close_now;
235 127         5807 });
236              
237 127         2279 my $stall_timer;
238 127 100       387 if( $args{stall_timeout} ) {
239             $stall_timer = Net::Async::HTTP::StallTimer->new(
240             delay => $args{stall_timeout},
241 4         55 future => $f,
242             );
243 4         269 $self->add_child( $stall_timer );
244             # Don't start it yet
245              
246             my $remove_timer = sub {
247 4 50   4   454 $self->remove_child( $stall_timer ) if $stall_timer;
248 4         568 undef $stall_timer;
249 4         384 };
250              
251 4         20 $f->on_ready( $remove_timer );
252             }
253              
254 127         614 push @{ $self->{request_queue} }, my $ctx = RequestContext(
255             req => $req,
256             on_read => undef, # will be set later
257             stall_timer => $stall_timer,
258             resp_header => undef, # will be set later
259             resp_bytes => 0,
260             on_done => $args{on_done},
261 127         322 is_done => 0,
262             f => $f,
263             );
264              
265 127         5017 my $on_body_write;
266 127 100 100     597 if( $stall_timer or $args{on_body_write} ) {
267 5         14 my $inner_on_body_write = $args{on_body_write};
268 5         10 my $written = 0;
269             $on_body_write = sub {
270 10 50   10   12376 $stall_timer->reset if $stall_timer;
271 10 50       39 $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write;
272 5         26 };
273             }
274              
275             my $write_request_body = defined $request_body ? sub {
276 4     4   12 my ( $self ) = @_;
277 4         13 $self->write( $request_body,
278             on_write => $on_body_write
279             );
280 127 100       342 } : undef;
281              
282             # Unless the request method is CONNECT, or we are connecting to a
283             # non-transparent proxy, the URL is not allowed to contain
284             # an authority; only path
285             # Take a copy of the headers since we'll be hacking them up
286 127         485 my $headers = $req->headers->clone;
287 127         1955 my $path;
288 127 50       389 if( $method eq "CONNECT" ) {
289 0         0 $path = $req->uri->as_string;
290             }
291             else {
292 127         353 my $uri = $req->uri;
293 127 100       1042 if ( $self->{is_proxy} ) {
294 1         9 $path = $uri->as_string;
295             }
296             else {
297 126         525 $path = $uri->path_query;
298 126 100       2124 $path = "/$path" unless $path =~ m{^/};
299             }
300 127         397 my $authority = $uri->authority;
301 127 100 100     2251 if( defined $authority and
302             my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) {
303 2         8 $headers->init_header( Host => $host );
304 2         54 $headers->authorization_basic( $user, $pass );
305             }
306             else {
307 125         498 $headers->init_header( Host => $authority );
308             }
309             }
310              
311 127   100     5055 my $protocol = $req->protocol || "HTTP/1.1";
312 127         1673 my @headers = ( "$method $path $protocol" );
313             $headers->scan( sub {
314 303     303   4533 my ( $name, $value ) = @_;
315 303         548 $name =~ s/^://; # non-canonical header
316 303         897 push @headers, "$name: $value";
317 127         951 } );
318              
319 127 100       765 $stall_timer->start if $stall_timer;
320 127 100       1097 $stall_timer->reason = "writing request" if $stall_timer;
321              
322 127 100   3   330 my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef;
  3         1674  
323              
324 127         1054 $self->write( join( $CRLF, @headers ) .
325             $CRLF . $CRLF,
326             on_write => $on_header_write );
327              
328 127 100       22500 $self->write( $req->content,
329             on_write => $on_body_write ) if length $req->content;
330 127 100 100     2129 $write_request_body->( $self ) if $write_request_body and !$expect_continue;
331              
332             $self->write( "", on_flush => sub {
333 3 50   3   1685 return unless $stall_timer; # test again in case it was cancelled in the meantime
334 3         13 $stall_timer->reset;
335 3         722 $stall_timer->reason = "waiting for response";
336 127 100       472 }) if $stall_timer;
337              
338 127         447 $self->{requests_in_flight}++;
339              
340             $ctx->on_read = $self->_mk_on_read_header(
341 127 100       689 $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header
342             );
343              
344 127         1568 return $f;
345             }
346              
347             sub _mk_on_read_header
348             {
349 127     127   211 shift; # $self
350 127         394 my ( $previous_response, $write_request_body, $on_header ) = @_;
351              
352             sub {
353 119     119   659 my ( $self, $buffref, $closed, $ctx ) = @_;
354              
355 119         296 my $req = $ctx->req;
356 119         608 my $stall_timer = $ctx->stall_timer;
357 119         562 my $f = $ctx->f;
358              
359 119 100       556 if( $stall_timer ) {
360 2         7 $stall_timer->reason = "receiving response header";
361 2         8 $stall_timer->reset;
362             }
363              
364 119 100 100     1452 if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) {
365 1         4 $self->debug_printf( "ERROR fail" );
366 1 50       4 $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled;
367 1         46 $self->close_now;
368             }
369              
370 119 100       1528 unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) {
371 3 100       10 if( $closed ) {
372 1         3 $self->debug_printf( "ERROR closed" );
373 1 50       5 $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled;
374 1         93 $self->close_now;
375             }
376 3         21 return 0;
377             }
378              
379 116         429 $ctx->resp_bytes += $+[0];
380              
381 116         1309 my $header = HTTP::Response->parse( $1 );
382             # HTTP::Response doesn't strip the \rs from this
383 116         22478 ( my $status_line = $header->status_line ) =~ s/\r$//;
384              
385 116         1382 $ctx->resp_header = $header;
386              
387 116         615 unless( HTTP_MESSAGE_TRIMS_LWS ) {
388 116         181 my @headers;
389             $header->scan( sub {
390 236         4858 my ( $name, $value ) = @_;
391 236         850 s/^\s+//, s/\s+$// for $value;
392 236         696 push @headers, $name => $value;
393 116         1039 } );
394 116 100       1098 $header->header( @headers ) if @headers;
395             }
396              
397 116         8789 my $protocol = $header->protocol;
398 116 100 66     1917 if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
399 109         295 $self->{can_pipeline} = 1;
400             }
401              
402 116 100       331 if( $header->code =~ m/^1/ ) { # 1xx is not a final response
403 1         19 $self->debug_printf( "HEADER [provisional] %s", $status_line );
404 1 50       7 $write_request_body->( $self ) if $write_request_body;
405 1         131 return 1;
406             }
407              
408 115         1411 $header->request( $req );
409 115 100       1143 $header->previous( $previous_response ) if $previous_response;
410              
411 115         459 $self->debug_printf( "HEADER %s", $status_line );
412              
413 115         459 my $on_body_chunk = $on_header->( $header );
414              
415 115         434 my $code = $header->code;
416              
417 115         1241 my $content_encoding = $header->header( "Content-Encoding" );
418              
419 115         4469 my $decoder;
420 115 100 66     380 if( $content_encoding and
421             $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) {
422 2         15 $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) );
423             }
424              
425             # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set
426 115   66     498 my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) )
427             eq "close";
428              
429 115 100 50     4579 if( $connection_close ) {
    50          
430 21         48 $self->{max_in_flight} = 1;
431             }
432             elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) {
433 94         4958 my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ );
434 94 50 33     301 $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight};
435             }
436              
437             my $on_more = sub {
438 81         226 my ( $chunk ) = @_;
439              
440 81 50 66     231 if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) {
  2         7  
  2         8  
441 0         0 $self->debug_printf( "ERROR decode failed" );
442 0         0 $f->fail( "Decode error $@", http => undef, $req );
443 0         0 $self->close;
444 0         0 return undef;
445             }
446              
447 81         249 $on_body_chunk->( $chunk );
448              
449 81         1341 return 1;
450 115         585 };
451             my $on_done = sub {
452 113         242 my ( $ctx ) = @_;
453              
454 113         307 $ctx->is_done++;
455              
456             # TODO: IO::Async probably ought to do this. We need to fire the
457             # on_closed event _before_ calling on_body_chunk, to clear the
458             # connection cache in case another request comes - e.g. HEAD->GET
459 113 100       768 $self->close if $connection_close;
460              
461 113         492 my $final;
462 113 50 66     416 if( $decoder and not eval { $final = $decoder->(); 1 } ) {
  2         5  
  2         8  
463 0         0 $self->debug_printf( "ERROR decode failed" );
464 0         0 $f->fail( "Decode error $@", http => undef, $req );
465 0         0 $self->close;
466 0         0 return undef;
467             }
468              
469 113 50 66     342 $on_body_chunk->( $final ) if defined $final and length $final;
470              
471 113         326 my $response = $on_body_chunk->();
472 113 50       223 my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@;
  113 100       354  
  111         6120  
473              
474 113 50       436 $ctx->on_done->( $ctx ) if $ctx->on_done;
475              
476 113         1161 $self->{requests_in_flight}--;
477 113         662 $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" );
478 113         581 $self->ready;
479              
480 113 100       981 if( defined $e ) {
481 2         6 chomp $e;
482 2         15 $self->invoke_error( $e, perl => );
483             # This might not return, if it top-level croaks
484             }
485              
486 112         864 return undef; # Finished
487 115         577 };
488              
489             # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor
490             # 204 (No Content) nor 304 (Not Modified)
491 115 100 66     396 if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
      66        
      66        
492 3         43 $self->debug_printf( "BODY done [none]" );
493 3         11 return $on_done->( $ctx );
494             }
495              
496 112         2149 my $transfer_encoding = $header->header( "Transfer-Encoding" );
497 112         4440 my $content_length = $header->content_length;
498              
499 112 100 66     4134 if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) {
    100          
500 4         11 $self->debug_printf( "BODY chunks" );
501              
502 4 50       14 $stall_timer->reason = "receiving body chunks" if $stall_timer;
503 4         12 return $self->_mk_on_read_chunked( $on_more, $on_done );
504             }
505             elsif( defined $content_length ) {
506 97         542 $self->debug_printf( "BODY length $content_length" );
507              
508 97 100       595 if( $content_length == 0 ) {
509 46         189 $self->debug_printf( "BODY done [length=0]" );
510 46         182 return $on_done->( $ctx );
511             }
512              
513 51 100       136 $stall_timer->reason = "receiving body" if $stall_timer;
514 51         182 return $self->_mk_on_read_length( $content_length, $on_more, $on_done );
515             }
516             else {
517 11         37 $self->debug_printf( "BODY until EOF" );
518              
519 11 50       43 $stall_timer->reason = "receiving body until EOF" if $stall_timer;
520 11         42 return $self->_mk_on_read_until_eof( $on_more, $on_done );
521             }
522 127         1533 };
523             }
524              
525             sub _mk_on_read_chunked
526             {
527 4     4   5 shift; # $self
528 4         6 my ( $on_more, $on_done ) = @_;
529              
530 4         4 my $chunk_length;
531              
532             sub {
533 16     16   65 my ( $self, $buffref, $closed, $ctx ) = @_;
534              
535 16         29 my $req = $ctx->req;
536 16         63 my $f = $ctx->f;
537              
538 16 100 66     158 if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) {
539 10         26 my $header = $1;
540 10         19 $ctx->resp_bytes += $+[0];
541              
542             # Chunk header
543 10 100       87 unless( $header =~ s/^([A-Fa-f0-9]+).*// ) {
544 1 50       4 $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled;
545 1         31 $self->close_now;
546 1         12 return 0;
547             }
548              
549 9         21 $chunk_length = hex( $1 );
550 9 100       25 return 1 if $chunk_length;
551              
552 3         9 return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f );
553             }
554              
555             # Chunk is followed by a CRLF, which isn't counted in the length;
556 6 50 33     25 if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) {
557             # Chunk body
558 6         29 my $chunk = substr( $$buffref, 0, $chunk_length, "" );
559 6         13 $ctx->resp_bytes += length $chunk;
560              
561 6 50       67 unless( $$buffref =~ s/^$CRLF// ) {
562 0         0 $self->debug_printf( "ERROR chunk without CRLF" );
563 0 0       0 $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled;
564 0         0 $self->close;
565             }
566              
567 6         14 $ctx->resp_bytes += $+[0];
568              
569 6         30 undef $chunk_length;
570              
571 6         13 return $on_more->( $chunk );
572             }
573              
574 0 0       0 if( $closed ) {
575 0         0 $self->debug_printf( "ERROR closed" );
576 0 0       0 $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled;
577             }
578 0         0 return 0;
579 4         26 };
580             }
581              
582             sub _mk_on_read_chunk_trailer
583             {
584 3     3   5 shift; # $self
585 3         6 my ( undef, $on_more, $on_done ) = @_;
586              
587 3         6 my $trailer = "";
588              
589             sub {
590 3     3   14 my ( $self, $buffref, $closed, $ctx ) = @_;
591              
592 3         7 my $req = $ctx->req;
593 3         14 my $f = $ctx->f;
594              
595 3 50       15 if( $closed ) {
596 0         0 $self->debug_printf( "ERROR closed" );
597 0 0       0 $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled;
598             }
599              
600 3 50       49 $$buffref =~ s/^(.*)$CRLF// or return 0;
601 3         10 $trailer .= $1;
602 3         8 $ctx->resp_bytes += $+[0];
603              
604 3 50       20 return 1 if length $1;
605              
606             # TODO: Actually use the trailer
607              
608 3         11 $self->debug_printf( "BODY done [chunked]" );
609 3         11 return $on_done->( $ctx );
610 3         16 };
611             }
612              
613             sub _mk_on_read_length
614             {
615 51     51   101 shift; # $self
616 51         124 my ( $content_length, $on_more, $on_done ) = @_;
617              
618             sub {
619 52     52   320 my ( $self, $buffref, $closed, $ctx ) = @_;
620              
621 52         220 my $req = $ctx->req;
622 52         293 my $f = $ctx->f;
623              
624             # This will truncate it if the server provided too much
625 52         466 my $content = substr( $$buffref, 0, $content_length, "" );
626 52         139 $content_length -= length $content;
627 52         197 $ctx->resp_bytes += length $content;
628              
629 52 50       277 return undef unless $on_more->( $content );
630              
631 52 100       141 if( $content_length == 0 ) {
632 50         155 $self->debug_printf( "BODY done [length]" );
633 50         221 return $on_done->( $ctx );
634             }
635              
636 2 50       6 if( $closed ) {
637 0         0 $self->debug_printf( "ERROR closed" );
638 0 0       0 $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled;
639             }
640 2         6 return 0;
641 51         330 };
642             }
643              
644             sub _mk_on_read_until_eof
645             {
646 11     11   17 shift; # $self
647 11         25 my ( $on_more, $on_done ) = @_;
648              
649             sub {
650 23     23   114 my ( $self, $buffref, $closed, $ctx ) = @_;
651              
652 23         38 my $content = $$buffref;
653 23         40 $$buffref = "";
654 23         85 $ctx->resp_bytes += length $content;
655              
656 23 50       110 return undef unless $on_more->( $content );
657              
658 23 100       62 return 0 unless $closed;
659              
660 11         39 $self->debug_printf( "BODY done [eof]" );
661 11         56 return $on_done->( $ctx );
662 11         68 };
663             }
664              
665             =head1 AUTHOR
666              
667             Paul Evans
668              
669             =cut
670              
671             0x55AA;