File Coverage

blib/lib/Net/Async/HTTP/Connection.pm
Criterion Covered Total %
statement 326 348 93.6
branch 143 186 76.8
condition 62 84 73.8
subroutine 36 37 97.3
pod 4 9 44.4
total 571 664 85.9


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2008-2023 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::HTTP::Connection 0.50;
7              
8 38     38   469 use v5.14;
  38         126  
9 38     38   193 use warnings;
  38         99  
  38         2064  
10              
11 38     38   249 use Carp;
  38         99  
  38         3023  
12              
13 38     38   277 use base qw( IO::Async::Stream );
  38         72  
  38         24423  
14             IO::Async::Stream->VERSION( '0.59' ); # ->write( ..., on_write )
15              
16 38     38   1960776 use Net::Async::HTTP::StallTimer;
  38         134  
  38         1446  
17              
18 38     38   19590 use HTTP::Response;
  38         1147588  
  38         2580  
19              
20             my $CRLF = "\x0d\x0a"; # More portable than \r\n
21              
22 38     38   371 use Struct::Dumb;
  38         241  
  38         407  
23             struct RequestContext => [qw( req on_read stall_timer resp_header resp_bytes on_done is_done f )],
24             named_constructor => 1;
25              
26             # Detect whether HTTP::Message properly trims whitespace in header values. If
27             # it doesn't, we have to deploy a workaround to fix them up.
28             # https://rt.cpan.org/Ticket/Display.html?id=75224
29 38     38   4362 use constant HTTP_MESSAGE_TRIMS_LWS => HTTP::Message->parse( "Name: value " )->header("Name") eq "value";
  38         72  
  38         250  
30              
31             =head1 NAME
32              
33             C - HTTP client protocol handler
34              
35             =head1 DESCRIPTION
36              
37             This class provides a connection to a single HTTP server, and is used
38             internally by L. It is not intended for general use.
39              
40             =cut
41              
42             sub _init
43             {
44 103     103   1839 my $self = shift;
45 103         832 $self->SUPER::_init( @_ );
46              
47 103         2825 $self->{requests_in_flight} = 0;
48             }
49              
50             sub configure
51             {
52 299     299 1 704754 my $self = shift;
53 299         1139 my %params = @_;
54              
55 299         799 foreach (qw( pipeline max_in_flight ready_queue decode_content is_proxy )) {
56 1495 100       3917 $self->{$_} = delete $params{$_} if exists $params{$_};
57             }
58              
59 299 100       859 if( my $on_closed = $params{on_closed} ) {
60             $params{on_closed} = sub {
61 69     69   16354 my $self = shift;
62              
63 69         385 $self->debug_printf( "CLOSED in-flight=$self->{requests_in_flight}" );
64              
65 69         402 $self->error_all( "Connection closed" );
66              
67 69         152 undef $self->{ready_queue};
68 69         274 $on_closed->( $self );
69 103         596 };
70             }
71              
72 299 50       841 croak "max_in_flight parameter required, may be zero" unless defined $self->{max_in_flight};
73              
74 299         1535 $self->SUPER::configure( %params );
75             }
76              
77             sub should_pipeline
78             {
79 212     212 0 738 my $self = shift;
80             return $self->{pipeline} &&
81             $self->{can_pipeline} &&
82 212   66     2237 ( !$self->{max_in_flight} || $self->{requests_in_flight} < $self->{max_in_flight} );
83             }
84              
85             sub connect
86             {
87 103     103 1 189 my $self = shift;
88 103         504 my %args = @_;
89              
90 103 100       448 my $key = defined $args{path} ? $args{path} : "$args{host}:$args{service}";
91 103         792 $self->debug_printf( "CONNECT $key" );
92              
93 103 50       633 defined wantarray or die "VOID ->connect";
94              
95             $self->SUPER::connect(
96             socktype => "stream",
97             %args
98             )->on_done( sub {
99 95     95   39806 $self->debug_printf( "CONNECTED" );
100 103         692 });
101             }
102              
103             sub ready
104             {
105 215     215 0 424 my $self = shift;
106              
107 215 100       747 my $queue = $self->{ready_queue} or return;
108              
109 192 100 66     647 if( $self->should_pipeline ) {
    100          
110 82         323 $self->debug_printf( "READY pipelined" );
111 82   100     463 while( @$queue && $self->should_pipeline ) {
112 19         65 my $ready = shift @$queue;
113 19         497 my $f = $ready->future;
114 19 100       138 next if $f->is_cancelled;
115              
116 18 100       498 $ready->connecting and $ready->connecting->cancel;
117              
118 18         169 $f->done( $self );
119             }
120             }
121             elsif( @$queue and $self->is_idle ) {
122 102         395 $self->debug_printf( "READY non-pipelined" );
123 102         486 while( @$queue ) {
124 102         263 my $ready = shift @$queue;
125 102         4189 my $f = $ready->future;
126 102 50       958 next if $f->is_cancelled;
127              
128 102 100       2691 $ready->connecting and $ready->connecting->cancel;
129              
130 102         1580 $f->done( $self );
131 102         4850 last;
132             }
133             }
134             else {
135 8 50       28 $self->debug_printf( "READY cannot-run queue=%d idle=%s",
136             scalar @$queue, $self->is_idle ? "Y" : "N");
137             }
138             }
139              
140             sub is_idle
141             {
142 163     163 0 304 my $self = shift;
143 163         859 return $self->{requests_in_flight} == 0;
144             }
145              
146             sub on_read
147             {
148 256     256 1 558486 my $self = shift;
149 256         658 my ( $buffref, $closed ) = @_;
150              
151 256         2357 while( my $head = $self->{request_queue}[0]) {
152 226 100 50     8091 shift @{ $self->{request_queue} } and next if $head->is_done;
  1         13  
153              
154 225 100       5843 $head->stall_timer->reset if $head->stall_timer;
155              
156 225         5799 my $ret = $head->on_read->( $self, $buffref, $closed, $head );
157              
158 224 100       621 if( defined $ret ) {
159 105 100       2239 return $ret if !ref $ret;
160              
161 72         2312 $head->on_read = $ret;
162 72         679 return 1;
163             }
164              
165 119 50       4762 $head->is_done or die "ARGH: undef return without being marked done";
166              
167 119         840 shift @{ $self->{request_queue} };
  119         314  
168 119 100 100     807 return 1 if !$closed and length $$buffref;
169 117         8861 return;
170             }
171              
172             # Reinvoked after switch back to baseline, but may be idle again
173 31 100 66     182 return if $closed or !length $$buffref;
174              
175 1         11 $self->invoke_error( "Spurious on_read of connection while idle",
176             http_connection => read => $$buffref );
177 1         52 $$buffref = "";
178             }
179              
180             sub on_write_eof
181             {
182 0     0 1 0 my $self = shift;
183 0         0 $self->error_all( "Connection closed", http => undef, undef );
184             }
185              
186             sub error_all
187             {
188 69     69 0 123 my $self = shift;
189              
190 69         161 while( my $head = shift @{ $self->{request_queue} } ) {
  105         2282  
191 36 100 100     1315 $head->f->fail( @_ ) unless $head->is_done or $head->f->is_ready;
192             }
193             }
194              
195             sub request
196             {
197 134     134 0 367 my $self = shift;
198 134         798 my %args = @_;
199              
200 134 50       673 my $on_header = $args{on_header} or croak "Expected 'on_header' as a CODE ref";
201              
202 134         283 my $req = $args{request};
203 134 50 33     1766 ref $req and $req->isa( "HTTP::Request" ) or croak "Expected 'request' as a HTTP::Request reference";
204              
205 134         644 $self->debug_printf( "REQUEST %s %s", $req->method, $req->uri );
206              
207 134         3675 my $request_body = $args{request_body};
208 134         443 my $expect_continue = !!$args{expect_continue};
209              
210 134         436 my $method = $req->method;
211              
212 134 100 100     2286 if( $method eq "POST" or $method eq "PUT" or length $req->content ) {
      100        
213 13         90 $req->init_header( "Content-Length", length $req->content );
214             }
215              
216 134 100       2994 if( $expect_continue ) {
217 1         4 $req->init_header( "Expect", "100-continue" );
218             }
219              
220 134 100       497 if( $self->{decode_content} ) {
221             #$req->init_header( "Accept-Encoding", Net::Async::HTTP->can_decode )
222 2         5 $req->init_header( "Accept-Encoding", "gzip" );
223             }
224              
225 134         542 my $f = $self->loop->new_future
226             ->set_label( "$method " . $req->uri );
227              
228             # TODO: Cancelling a request Future shouldn't necessarily close the socket
229             # if we haven't even started writing the request yet. But we can't know
230             # that currently.
231             $f->on_cancel( sub {
232 5     5   2928 $self->debug_printf( "CLOSE on_cancel" );
233 5         49 $self->close_now;
234 134         9311 });
235              
236 134         2686 my $stall_timer;
237 134 100       453 if( $args{stall_timeout} ) {
238             $stall_timer = Net::Async::HTTP::StallTimer->new(
239             delay => $args{stall_timeout},
240 4         53 future => $f,
241             );
242 4         276 $self->add_child( $stall_timer );
243             # Don't start it yet
244              
245             my $remove_timer = sub {
246 4 50   4   383 $self->remove_child( $stall_timer ) if $stall_timer;
247 4         532 undef $stall_timer;
248 4         392 };
249              
250 4         21 $f->on_ready( $remove_timer );
251             }
252              
253 134         838 push @{ $self->{request_queue} }, my $ctx = RequestContext(
254             req => $req,
255             on_read => undef, # will be set later
256             stall_timer => $stall_timer,
257             resp_header => undef, # will be set later
258             resp_bytes => 0,
259             on_done => $args{on_done},
260 134         394 is_done => 0,
261             f => $f,
262             );
263              
264 134         13880 my $on_body_write;
265 134 100 100     788 if( $stall_timer or $args{on_body_write} ) {
266 5         16 my $inner_on_body_write = $args{on_body_write};
267 5         11 my $written = 0;
268             $on_body_write = sub {
269 10 50   10   7817 $stall_timer->reset if $stall_timer;
270 10 50       35 $inner_on_body_write->( $written += $_[1] ) if $inner_on_body_write;
271 5         28 };
272             }
273              
274             my $write_request_body = defined $request_body ? sub {
275 4     4   11 my ( $self ) = @_;
276 4         60 $self->write( $request_body,
277             on_write => $on_body_write
278             );
279 134 100       495 } : undef;
280              
281             # Unless the request method is CONNECT, or we are connecting to a
282             # non-transparent proxy, the URL is not allowed to contain
283             # an authority; only path
284             # Take a copy of the headers since we'll be hacking them up
285 134         560 my $headers = $req->headers->clone;
286 134         2618 my $path;
287 134 50       445 if( $method eq "CONNECT" ) {
288 0         0 $path = $req->uri->as_string;
289             }
290             else {
291 134         523 my $uri = $req->uri;
292 134 100       1286 if ( $self->{is_proxy} ) {
293 2         24 $path = $uri->as_string;
294             }
295             else {
296 132         876 $path = $uri->path_query;
297 132 100       2547 $path = "/$path" unless $path =~ m{^/};
298             }
299 134         442 my $authority = $uri->authority;
300 134 100 100     3227 if( defined $authority and
301             my ( $user, $pass, $host ) = $authority =~ m/^(.*?):(.*)@(.*)$/ ) {
302 2         12 $headers->init_header( Host => $host );
303 2         80 $headers->authorization_basic( $user, $pass );
304             }
305             else {
306 132         750 $headers->init_header( Host => $authority );
307             }
308             }
309              
310 134   100     6864 my $protocol = $req->protocol || "HTTP/1.1";
311 134         2041 my @headers = ( "$method $path $protocol" );
312             $headers->scan( sub {
313 320     320   5875 my ( $name, $value ) = @_;
314 320         612 $name =~ s/^://; # non-canonical header
315 320         1025 push @headers, "$name: $value";
316 134         1118 } );
317              
318 134 100       1110 $stall_timer->start if $stall_timer;
319 134 100       947 $stall_timer->reason = "writing request" if $stall_timer;
320              
321 134 100   3   2025 my $on_header_write = $stall_timer ? sub { $stall_timer->reset } : undef;
  3         2076  
322              
323 134         3276 $self->write( join( $CRLF, @headers ) .
324             $CRLF . $CRLF,
325             on_write => $on_header_write );
326              
327 134 100       36807 $self->write( $req->content,
328             on_write => $on_body_write ) if length $req->content;
329 134 100 100     3099 $write_request_body->( $self ) if $write_request_body and !$expect_continue;
330              
331             $self->write( "", on_flush => sub {
332 3 50   3   1962 return unless $stall_timer; # test again in case it was cancelled in the meantime
333 3         14 $stall_timer->reset;
334 3         379 $stall_timer->reason = "waiting for response";
335 134 100       876 }) if $stall_timer;
336              
337 134         869 $self->{requests_in_flight}++;
338              
339             $ctx->on_read = $self->_mk_on_read_header(
340 134 100       1026 $args{previous_response}, $expect_continue ? $write_request_body : undef, $on_header
341             );
342              
343 134         2436 return $f;
344             }
345              
346             sub _mk_on_read_header
347             {
348 134     134   466 shift; # $self
349 134         634 my ( $previous_response, $write_request_body, $on_header ) = @_;
350              
351             sub {
352 126     126   913 my ( $self, $buffref, $closed, $ctx ) = @_;
353              
354 126         2505 my $req = $ctx->req;
355 126         2844 my $stall_timer = $ctx->stall_timer;
356 126         2794 my $f = $ctx->f;
357              
358 126 100       889 if( $stall_timer ) {
359 2         11 $stall_timer->reason = "receiving response header";
360 2         8 $stall_timer->reset;
361             }
362              
363 126 100 100     1480 if( length $$buffref >= 4 and $$buffref !~ m/^HTTP/ ) {
364 1         5 $self->debug_printf( "ERROR fail" );
365 1 50       7 $f->fail( "Did not receive HTTP response from server", http => undef, $req ) unless $f->is_cancelled;
366 1         55 $self->close_now;
367             }
368              
369 126 100       2010 unless( $$buffref =~ s/^(.*?$CRLF$CRLF)//s ) {
370 3 100       16 if( $closed ) {
371 1         3 $self->debug_printf( "ERROR closed" );
372 1 50       3 $f->fail( "Connection closed while awaiting header", http => undef, $req ) unless $f->is_cancelled;
373 1         84 $self->close_now;
374             }
375 3         21 return 0;
376             }
377              
378 123         2807 $ctx->resp_bytes += $+[0];
379              
380 123         2668 my $header = HTTP::Response->parse( $1 );
381             # HTTP::Response doesn't strip the \rs from this
382 123         30266 ( my $status_line = $header->status_line ) =~ s/\r$//;
383              
384 123         5039 $ctx->resp_header = $header;
385              
386 123         918 unless( HTTP_MESSAGE_TRIMS_LWS ) {
387 123         205 my @headers;
388             $header->scan( sub {
389 253         7720 my ( $name, $value ) = @_;
390 253         1054 s/^\s+//, s/\s+$// for $value;
391 253         2096 push @headers, $name => $value;
392 123         1507 } );
393 123 100       1658 $header->header( @headers ) if @headers;
394             }
395              
396 123         11411 my $protocol = $header->protocol;
397 123 100 66     2421 if( $protocol =~ m{^HTTP/1\.(\d+)$} and $1 >= 1 ) {
398 116         381 $self->{can_pipeline} = 1;
399             }
400              
401 123 100       425 if( $header->code =~ m/^1/ ) { # 1xx is not a final response
402 1         17 $self->debug_printf( "HEADER [provisional] %s", $status_line );
403 1 50       9 $write_request_body->( $self ) if $write_request_body;
404 1         317 return 1;
405             }
406              
407 122         1680 $header->request( $req );
408 122 100       1529 $header->previous( $previous_response ) if $previous_response;
409              
410 122         639 $self->debug_printf( "HEADER %s", $status_line );
411              
412 122         615 my $on_body_chunk = $on_header->( $header );
413              
414 122         608 my $code = $header->code;
415              
416 122         1357 my $content_encoding = $header->header( "Content-Encoding" );
417              
418 122         6330 my $decoder;
419 122 100 66     498 if( $content_encoding and
420             $decoder = Net::Async::HTTP->can_decode( $content_encoding ) ) {
421 2         13 $header->init_header( "X-Original-Content-Encoding" => $header->remove_header( "Content-Encoding" ) );
422             }
423              
424             # can_pipeline is set for HTTP/1.1 or above; presume it can keep-alive if set
425 122   66     2082 my $connection_close = lc( $header->header( "Connection" ) || ( $self->{can_pipeline} ? "keep-alive" : "close" ) )
426             eq "close";
427              
428 122 100 50     6190 if( $connection_close ) {
    50          
429 23         61 $self->{max_in_flight} = 1;
430             }
431             elsif( defined( my $keep_alive = lc( $header->header("Keep-Alive") || "" ) ) ) {
432 99         12793 my ( $max ) = ( $keep_alive =~ m/max=(\d+)/ );
433 99 50 33     398 $self->{max_in_flight} = $max if $max && $max < $self->{max_in_flight};
434             }
435              
436             my $on_more = sub {
437 86         198 my ( $chunk ) = @_;
438              
439 86 50 66     352 if( $decoder and not eval { $chunk = $decoder->( $chunk ); 1 } ) {
  2         5  
  2         6  
440 0         0 $self->debug_printf( "ERROR decode failed" );
441 0         0 $f->fail( "Decode error $@", http => undef, $req );
442 0         0 $self->close;
443 0         0 return undef;
444             }
445              
446 86         2166 $on_body_chunk->( $chunk );
447              
448 86         1688 return 1;
449 122         2065 };
450             my $on_done = sub {
451 120         288 my ( $ctx ) = @_;
452              
453 120         3351 $ctx->is_done++;
454              
455             # TODO: IO::Async probably ought to do this. We need to fire the
456             # on_closed event _before_ calling on_body_chunk, to clear the
457             # connection cache in case another request comes - e.g. HEAD->GET
458 120 100       2848 $self->close if $connection_close;
459              
460 120         553 my $final;
461 120 50 66     418 if( $decoder and not eval { $final = $decoder->(); 1 } ) {
  2         3  
  2         4  
462 0         0 $self->debug_printf( "ERROR decode failed" );
463 0         0 $f->fail( "Decode error $@", http => undef, $req );
464 0         0 $self->close;
465 0         0 return undef;
466             }
467              
468 120 50 66     1746 $on_body_chunk->( $final ) if defined $final and length $final;
469              
470 120         369 my $response = $on_body_chunk->();
471 120 50       329 my $e = eval { $f->done( $response ) unless $f->is_cancelled; 1 } ? undef : $@;
  120 100       527  
  118         8883  
472              
473 120 50       6250 $ctx->on_done->( $ctx ) if $ctx->on_done;
474              
475 120         1396 $self->{requests_in_flight}--;
476 120         716 $self->debug_printf( "DONE remaining in-flight=$self->{requests_in_flight}" );
477 120         696 $self->ready;
478              
479 120 100       1272 if( defined $e ) {
480 2         7 chomp $e;
481 2         36 $self->invoke_error( $e, perl => );
482             # This might not return, if it top-level croaks
483             }
484              
485 119         1355 return undef; # Finished
486 122         712 };
487              
488             # RFC 2616 says "HEAD" does not have a body, nor do any 1xx codes, nor
489             # 204 (No Content) nor 304 (Not Modified)
490 122 100 66     553 if( $req->method eq "HEAD" or $code =~ m/^1..$/ or $code eq "204" or $code eq "304" ) {
      66        
      66        
491 3         103 $self->debug_printf( "BODY done [none]" );
492 3         17 return $on_done->( $ctx );
493             }
494              
495 119         2749 my $transfer_encoding = $header->header( "Transfer-Encoding" );
496 119         5545 my $content_length = $header->content_length;
497              
498 119 100 66     6041 if( defined $transfer_encoding and $transfer_encoding eq "chunked" ) {
    100          
499 4         12 $self->debug_printf( "BODY chunks" );
500              
501 4 50       16 $stall_timer->reason = "receiving body chunks" if $stall_timer;
502 4         11 return $self->_mk_on_read_chunked( $on_more, $on_done );
503             }
504             elsif( defined $content_length ) {
505 102         555 $self->debug_printf( "BODY length $content_length" );
506              
507 102 100       554 if( $content_length == 0 ) {
508 50         179 $self->debug_printf( "BODY done [length=0]" );
509 50         263 return $on_done->( $ctx );
510             }
511              
512 52 100       180 $stall_timer->reason = "receiving body" if $stall_timer;
513 52         262 return $self->_mk_on_read_length( $content_length, $on_more, $on_done );
514             }
515             else {
516 13         52 $self->debug_printf( "BODY until EOF" );
517              
518 13 50       78 $stall_timer->reason = "receiving body until EOF" if $stall_timer;
519 13         61 return $self->_mk_on_read_until_eof( $on_more, $on_done );
520             }
521 134         5845 };
522             }
523              
524             sub _mk_on_read_chunked
525             {
526 4     4   6 shift; # $self
527 4         8 my ( $on_more, $on_done ) = @_;
528              
529 4         7 my $chunk_length;
530              
531             sub {
532 16     16   76 my ( $self, $buffref, $closed, $ctx ) = @_;
533              
534 16         221 my $req = $ctx->req;
535 16         250 my $f = $ctx->f;
536              
537 16 100 66     212 if( !defined $chunk_length and $$buffref =~ s/^(.*?)$CRLF// ) {
538 10         22 my $header = $1;
539 10         150 $ctx->resp_bytes += $+[0];
540              
541             # Chunk header
542 10 100       79 unless( $header =~ s/^([A-Fa-f0-9]+).*// ) {
543 1 50       4 $f->fail( "Corrupted chunk header", http => undef, $req ) unless $f->is_cancelled;
544 1         31 $self->close_now;
545 1         11 return 0;
546             }
547              
548 9         21 $chunk_length = hex( $1 );
549 9 100       25 return 1 if $chunk_length;
550              
551 3         10 return $self->_mk_on_read_chunk_trailer( $req, $on_more, $on_done, $f );
552             }
553              
554             # Chunk is followed by a CRLF, which isn't counted in the length;
555 6 50 33     28 if( defined $chunk_length and length( $$buffref ) >= $chunk_length + 2 ) {
556             # Chunk body
557 6         17 my $chunk = substr( $$buffref, 0, $chunk_length, "" );
558 6         84 $ctx->resp_bytes += length $chunk;
559              
560 6 50       61 unless( $$buffref =~ s/^$CRLF// ) {
561 0         0 $self->debug_printf( "ERROR chunk without CRLF" );
562 0 0       0 $f->fail( "Chunk of size $chunk_length wasn't followed by CRLF", http => undef, $req ) unless $f->is_cancelled;
563 0         0 $self->close;
564             }
565              
566 6         86 $ctx->resp_bytes += $+[0];
567              
568 6         32 undef $chunk_length;
569              
570 6         14 return $on_more->( $chunk );
571             }
572              
573 0 0       0 if( $closed ) {
574 0         0 $self->debug_printf( "ERROR closed" );
575 0 0       0 $f->fail( "Connection closed while awaiting chunk", http => undef, $req ) unless $f->is_cancelled;
576             }
577 0         0 return 0;
578 4         21 };
579             }
580              
581             sub _mk_on_read_chunk_trailer
582             {
583 3     3   6 shift; # $self
584 3         7 my ( undef, $on_more, $on_done ) = @_;
585              
586 3         8 my $trailer = "";
587              
588             sub {
589 3     3   17 my ( $self, $buffref, $closed, $ctx ) = @_;
590              
591 3         43 my $req = $ctx->req;
592 3         49 my $f = $ctx->f;
593              
594 3 50       18 if( $closed ) {
595 0         0 $self->debug_printf( "ERROR closed" );
596 0 0       0 $f->fail( "Connection closed while awaiting chunk trailer", http => undef, $req ) unless $f->is_cancelled;
597             }
598              
599 3 50       46 $$buffref =~ s/^(.*)$CRLF// or return 0;
600 3         10 $trailer .= $1;
601 3         46 $ctx->resp_bytes += $+[0];
602              
603 3 50       20 return 1 if length $1;
604              
605             # TODO: Actually use the trailer
606              
607 3         13 $self->debug_printf( "BODY done [chunked]" );
608 3         9 return $on_done->( $ctx );
609 3         30 };
610             }
611              
612             sub _mk_on_read_length
613             {
614 52     52   90 shift; # $self
615 52         170 my ( $content_length, $on_more, $on_done ) = @_;
616              
617             sub {
618 53     53   2491 my ( $self, $buffref, $closed, $ctx ) = @_;
619              
620 53         1060 my $req = $ctx->req;
621 53         1105 my $f = $ctx->f;
622              
623             # This will truncate it if the server provided too much
624 53         365 my $content = substr( $$buffref, 0, $content_length, "" );
625 53         109 $content_length -= length $content;
626 53         1000 $ctx->resp_bytes += length $content;
627              
628 53 50       346 return undef unless $on_more->( $content );
629              
630 53 100       2036 if( $content_length == 0 ) {
631 51         220 $self->debug_printf( "BODY done [length]" );
632 51         251 return $on_done->( $ctx );
633             }
634              
635 2 50       10 if( $closed ) {
636 0         0 $self->debug_printf( "ERROR closed" );
637 0 0       0 $f->fail( "Connection closed while awaiting body", http => undef, $req ) unless $f->is_cancelled;
638             }
639 2         6 return 0;
640 52         429 };
641             }
642              
643             sub _mk_on_read_until_eof
644             {
645 13     13   24 shift; # $self
646 13         28 my ( $on_more, $on_done ) = @_;
647              
648             sub {
649 27     27   174 my ( $self, $buffref, $closed, $ctx ) = @_;
650              
651 27         53 my $content = $$buffref;
652 27         49 $$buffref = "";
653 27         451 $ctx->resp_bytes += length $content;
654              
655 27 50       163 return undef unless $on_more->( $content );
656              
657 27 100       104 return 0 unless $closed;
658              
659 13         54 $self->debug_printf( "BODY done [eof]" );
660 13         53 return $on_done->( $ctx );
661 13         100 };
662             }
663              
664             =head1 AUTHOR
665              
666             Paul Evans
667              
668             =cut
669              
670             0x55AA;