File Coverage

blib/lib/Monoceros/Server.pm
Criterion Covered Total %
statement 521 670 77.7
branch 182 346 52.6
condition 97 229 42.3
subroutine 59 63 93.6
pod 0 17 0.0
total 859 1325 64.8


line stmt bran cond sub pod time code
1             package Monoceros::Server;
2              
3 88     88   2086 use strict;
  88         660  
  88         3841  
4 88     88   716 use warnings;
  88         369  
  88         3082  
5 88     88   607 use base qw/Plack::Handler::Starlet/;
  88         203  
  88         49379  
6 88     88   4831800 use IO::Socket;
  88         278  
  88         794  
7 88     88   163158 use IO::FDPass;
  88         30613  
  88         3722  
8 88     88   643 use Parallel::Prefork;
  88         213  
  88         658  
9 88     88   123998 use AnyEvent;
  88         551888  
  88         3782  
10 88     88   55674 use AnyEvent::Util qw(fh_nonblocking portable_socketpair);
  88         514061  
  88         7920  
11 88     88   1406 use Time::HiRes qw/time/;
  88         215  
  88         772  
12 88     88   10362 use Plack::TempBuffer;
  88         1001  
  88         2382  
13 88     88   550 use Plack::Util;
  88         209  
  88         2240  
14 88     88   446 use Plack::HTTPParser qw( parse_http_request );
  88         206  
  88         4125  
15 88     88   473 use POSIX qw(EINTR EAGAIN EWOULDBLOCK ESPIPE ENOBUFS :sys_wait_h);
  88         200  
  88         530  
16 88     88   60884 use POSIX::getpeername qw/_getpeername/;
  88         53300  
  88         5428  
17 88     88   648 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  88         195  
  88         4332  
18 88     88   519 use File::Temp qw/tempfile/;
  88         161  
  88         3380  
19 88     88   500 use Digest::MD5 qw/md5/;
  88         175  
  88         6527  
20              
21 88     88   617 use constant WRITER => 0;
  88         249  
  88         6480  
22 88     88   559 use constant READER => 1;
  88         599  
  88         4930  
23              
24 88     88   751 use constant S_GD => 0;
  88         173  
  88         4395  
25 88     88   493 use constant S_FD => 1;
  88         248  
  88         4041  
26 88     88   500 use constant S_TIME => 2;
  88         167  
  88         4862  
27 88     88   558 use constant S_REQS => 3;
  88         165  
  88         4827  
28 88     88   608 use constant S_STATE => 4; # 0:idle 1:queue
  88         214  
  88         4335  
29              
30 88     88   471 use constant MAX_REQUEST_SIZE => 131072;
  88         234  
  88         4871  
31 88     88   146658 use constant CHUNKSIZE => 64 * 1024;
  88         252  
  88         6523  
32 88   50 88   542 use constant DEBUG => $ENV{MONOCEROS_DEBUG} || 0;
  88         157  
  88         539500  
33              
34             my $null_io = do { open my $io, "<", \""; $io };
35             my $have_accept4 = eval {
36             require Linux::Socket::Accept4;
37             Linux::Socket::Accept4::SOCK_CLOEXEC()|Linux::Socket::Accept4::SOCK_NONBLOCK();
38             };
39              
40             my $have_sendfile = eval {
41             return 0;
42             require Sys::Sendfile;
43             };
44              
45             sub new {
46 83     83 0 116110 my $class = shift;
47 83         1311 my %args = @_;
48              
49             # setup before instantiation
50 83         542 my $listen_sock;
51 83 50       1616 if (defined $ENV{SERVER_STARTER_PORT}) {
52 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
53 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
54 0         0 $args{host} = $1;
55 0         0 $args{port} = $2;
56             } else {
57 0         0 $args{port} = $hostport;
58             }
59 0 0       0 $listen_sock = IO::Socket::INET->new(
60             Proto => 'tcp',
61             ) or die "failed to create socket:$!";
62 0 0       0 $listen_sock->fdopen($fd, 'w')
63             or die "failed to bind to listening socket:$!";
64             }
65 83         646 my $max_workers = 5;
66 83         555 for (qw(max_workers workers)) {
67             $max_workers = delete $args{$_}
68 166 100       1389 if defined $args{$_};
69             }
70              
71 83   50     781 my $open_max = eval { POSIX::sysconf (POSIX::_SC_OPEN_MAX ()) - 1 } || 1023;
72             my $self = bless {
73             host => $args{host} || 0,
74             port => $args{port} || 8080,
75             socket_path => (defined $args{socket_path}) ? $args{socket_path} : undef,
76             max_workers => $max_workers,
77             timeout => $args{timeout} || 300,
78             disable_keepalive => (exists $args{keepalive} && !$args{keepalive}) ? 1 : 0,
79             keepalive_timeout => $args{keepalive_timeout} || 10,
80             max_keepalive_connection => $args{max_keepalive_connection} || int($open_max/2),
81             max_readahead_reqs => (
82             defined $args{max_readahead_reqs}
83             ? $args{max_readahead_reqs} : 100
84             ),
85             min_readahead_reqs => (
86             defined $args{min_readahead_reqs}
87             ? $args{min_readahead_reqs} : undef,
88             ),
89             server_software => $args{server_software} || $class,
90       61     server_ready => $args{server_ready} || sub {},
91             min_reqs_per_child => (
92             defined $args{min_reqs_per_child}
93             ? $args{min_reqs_per_child} : undef,
94             ),
95             max_reqs_per_child => (
96             $args{max_reqs_per_child} || $args{max_requests} || 1000,
97             ),
98             err_respawn_interval => (
99             defined $args{err_respawn_interval}
100             ? $args{err_respawn_interval} : undef,
101 83 50 100     11960 ),
    100 50        
    50 50        
    50 100        
    50 50        
    50 33        
    50 33        
      100        
      50        
102             _using_defer_accept => 1,
103             listen_sock => ( defined $listen_sock ? $listen_sock : undef),
104             }, $class;
105              
106 83         1830 $self;
107             }
108              
109             sub setup_listener {
110 83     83 0 292 my $self = shift;
111 83 50       1823 if ( my $path = $self->{socket_path} ) {
112 0 0       0 if (-S $path) {
113 0         0 warn "removing existing socket file:$path";
114 0 0       0 unlink $path
115             or die "failed to remove existing socket file:$path:$!";
116             }
117 0         0 unlink $path;
118 0         0 my $saved_umask = umask(0);
119 0 0       0 $self->{listen_sock} = IO::Socket::UNIX->new(
120             Listen => Socket::SOMAXCONN(),
121             Local => $path,
122             ) or die "failed to listen to socket $path:$!";
123 0         0 umask($saved_umask);
124 0         0 $self->{use_unix_domain} = 1;
125             }
126             else {
127             $self->{listen_sock} ||= IO::Socket::INET->new(
128             Listen => SOMAXCONN,
129             LocalPort => $self->{port},
130             LocalAddr => $self->{host},
131 83 50 33     4299 Proto => 'tcp',
132             ReuseAddr => 1,
133             ) or die "failed to listen to port $self->{port}:$!";
134             # set defer accept
135 83 50 33     63210 if ($^O eq 'linux' && !$self->{use_unix_domain}) {
136             setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1)
137 83 50       1272 and $self->{_using_defer_accept} = 1;
138             }
139             }
140              
141 83         667 $self->{server_ready}->($self);
142             }
143              
144             sub run {
145 83     83 0 4425 my ($self, $app) = @_;
146 83         1116 $self->setup_listener();
147 83         12739 $self->setup_sockpair();
148 83         917 $self->run_workers($app);
149             }
150              
151             sub setup_sockpair {
152 83     83 0 244 my $self = shift;
153              
154 83         401 my %workers;
155 83         724 for my $wid ( 1..$self->{max_workers} ) {
156 333 50       1940 my @pair = portable_socketpair()
157             or die "failed to create socketpair: $!";
158 333         23715 $workers{$wid} = {
159             running => 0,
160             sock => \@pair
161             };
162             }
163 83         698 $self->{workers} = \%workers;
164              
165 83 50       971 my @fdpass_sock = portable_socketpair()
166             or die "failed to create socketpair: $!";
167 83         4855 $self->{fdpass_sock} = \@fdpass_sock;
168              
169 83         3075 my ($fh, $filename) = tempfile('monoceros_stats_XXXXXX',UNLINK => 0, SUFFIX => '.dat', TMPDIR => 1);
170 83         75016 $self->{stats_fh} = $fh;
171 83         243 $self->{stats_filename} = $filename;
172 83         1567 $self->update_stats();
173              
174 83         338 1;
175             }
176              
177             sub run_workers {
178 83     83 0 649 my ($self,$app) = @_;
179 83         2233 local $SIG{PIPE} = 'IGNORE';
180 83         125877 my $pid = fork;
181 83 100       10983 if ( $pid ) {
    50          
182             #parent
183 12         1838 $self->connection_manager($pid);
184 12         1841 delete $self->{stats_fh};
185 12         5027 unlink $self->{stats_filename};
186             }
187             elsif ( defined $pid ) {
188 71         10654 $self->request_worker($app);
189 0         0 exit;
190             }
191             else {
192 0         0 die "failed fork:$!";
193             }
194             }
195              
196             sub queued_send {
197 0     0 0 0 my $self = shift;
198 0         0 my $sockid = shift;
199              
200 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
201 0         0 return;
202             }
203 0         0 $self->{sockets}{$sockid}[S_STATE] = 1;
204              
205 0         0 push @{$self->{fdsend_queue}}, $sockid;
  0         0  
206             $self->{fdsend_worker} ||= AE::io $self->{fdpass_sock}[WRITER], 1, sub {
207 0     0   0 while ( my $sockid = shift @{$self->{fdsend_queue}} ) {
  0         0  
208 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
209 0         0 next;
210             }
211 0 0       0 if ( _getpeername($self->{sockets}{$sockid}[S_FD], my $addr) < 0 ) {
212 0         0 delete $self->{sockets}{$sockid};
213 0         0 next;
214             }
215             my $ret = IO::FDPass::send(
216             fileno $self->{fdpass_sock}[WRITER],
217 0         0 $self->{sockets}{$sockid}[S_FD]
218             );
219 0 0       0 if ( !$ret ) {
220 0 0 0     0 if ( $! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR ) {
      0        
221 0         0 unshift @{$self->{fdsend_queue}}, $sockid;
  0         0  
222 0         0 return;
223             }
224 0         0 die "unable to pass queue: $!";
225 0         0 undef $self->{fdsend_worker};
226             }
227             }
228 0         0 undef $self->{fdsend_worker};
229 0   0     0 };
230 0         0 1;
231             }
232              
233             my $prev_stats = '';
234             sub update_stats {
235 119     119 0 348 my $self = shift;
236 119         228 my $total = scalar keys %{$self->{sockets}};
  119         597  
237 119         342 my $processing = scalar grep { !$self->{sockets}{$_}[S_STATE] == 1 } keys %{$self->{sockets}};
  0         0  
  119         437  
238 119         231 my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0 } keys %{$self->{sockets}};
  0         0  
  119         319  
239              
240 119         550 my $stats = "total=$total&";
241 119         572 $stats .= "waiting=$idle&";
242 119         674 $stats .= "processing=$processing&";
243 119         770 $stats .= "max_workers=".$self->{max_workers}."&";
244 119 100 66     1508 return if $stats eq $prev_stats && @_;
245 83         473 $prev_stats = $stats;
246 83         860 seek($self->{stats_fh},0,0);
247 83         11717 syswrite($self->{stats_fh}, $stats);
248             }
249              
250             sub can_keepalive {
251 57     57 0 219 my $self = shift;
252 57         638 seek($self->{stats_fh},0,0);
253 57         1516 sysread($self->{stats_fh},my $buf, 1024);
254 57 50       290 return 1 unless $buf;
255 57 50       1231 if ( $buf =~ m!total=(\d+)&! ){
256 57 50       473 return if $1 >= $self->{max_keepalive_connection};
257             }
258 57         226 return 1;
259             }
260              
261             sub connection_manager {
262 12     12 0 376 my ($self, $worker_pid) = @_;
263              
264 12         1673 $self->{workers}{$_}{sock}[WRITER]->close for 1..$self->{max_workers};
265 12         1212 $self->{fdpass_sock}[READER]->close;
266 12         667 fh_nonblocking $self->{fdpass_sock}[WRITER], 1;
267 12         857 fh_nonblocking $self->{listen_sock}, 1;
268              
269 12         214 my %manager;
270             my %hash2fd;
271 12         0 my %wait_read;
272 12         117 my $term_received = 0;
273 12         341 $self->{sockets} = {};
274 12         177 $self->{fdsend_queue} = [];
275              
276 12         102 warn sprintf "Set max_keepalive_connection to %s", $self->{max_keepalive_connection} if DEBUG;
277              
278 12         2529 my $cv = AE::cv;
279 12         88289 my $close_all = 0;
280 12         29 my $sig2;$sig2 = AE::signal 'USR1', sub {
281 12     12   9307 my $t;$t = AE::timer 0, 1, sub {
282 12 50       232 return unless $close_all;
283 12         46 undef $t;
284 12         478 kill 'TERM', $worker_pid;
285 12         55 my $t2;$t2 = AE::timer 0, 1, sub {
286 35         2652021 my $kid = waitpid($worker_pid, WNOHANG);
287 35 100       366 return if $kid >= 0;
288 12         79 undef $t2;
289 12         285 $cv->send;
290 12         276 };
291 12         265 };
292 12         507 };
293 12         52 my $sig;$sig = AE::signal 'TERM', sub {
294 13     13   3325971 $term_received++;
295 13         717 kill 'USR1', $worker_pid; #stop accept
296 13         68 my $t;$t = AE::timer 0, 1, sub {
297 13         254 my $time = time;
298 13 50       38 return if keys %{$self->{sockets}};
  13         160  
299 13         61 undef $t;
300 13         230 $close_all=1;
301 13         602 };
302 12         284 };
303              
304             $manager{disconnect_keepalive_timeout} = AE::timer 0, 1, sub {
305 36     36   9844544 my $time = time;
306 36         117 if ( DEBUG ) {
307             my $total = scalar keys %{$self->{sockets}};
308             my $processing = scalar grep { $self->{sockets}{$_}[S_STATE] == 1} keys %{$self->{sockets}};
309             my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0} keys %{$self->{sockets}};
310             warn "working: $processing | total: $total | idle: $idle";
311             }
312 36         90 for my $key ( keys %{$self->{sockets}} ) { #key = fd
  36         316  
313 0 0 0     0 if ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] == 0
    0 0        
      0        
      0        
314             && $time - $self->{sockets}{$key}[S_TIME] > $self->{timeout} ) { #idle && first req
315 0         0 delete $wait_read{$key};
316 0         0 delete $self->{sockets}{$key};
317              
318             }
319             elsif ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] > 0 &&
320             $time - $self->{sockets}{$key}[S_TIME] > $self->{keepalive_timeout} ) { #idle && keepalivew
321 0         0 delete $wait_read{$key};
322 0         0 delete $self->{sockets}{$key};
323             }
324             }
325              
326 36         390 $self->update_stats(1);
327 12         379 };
328              
329 12         64 my %m_state;
330             my %workers;
331 12         71 for my $wid ( 1..$self->{max_workers} ) {
332              
333 37         146 my $sock = $self->{workers}{$wid}{sock}[READER];
334 37         140 fh_nonblocking($sock,1);
335 37 50       421 return unless $sock;
336              
337 37         323 $m_state{$wid} = {};
338 37         102 my $state = $m_state{$wid};
339 37         199 $state->{buf} = '';
340 37         182 $state->{state} = 'cmd';
341 37         99 $state->{sockid} = '';
342 37         69 $state->{reqs} = 0;
343              
344             $workers{$wid} = AE::io fileno $sock, 0, sub {
345 260622 50   260622   7574103 if ( $state->{state} eq 'cmd' ) {
346 260622         1434103 my $ret = recv($sock, my $buf, 28 - length($state->{buf}), 0);
347 260622 0 0     605004 if ( !defined $ret && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      33        
348 0         0 return;
349             }
350 260622 50       411119 if ( !defined $ret ) {
351 0         0 warn "failed to recv from sock: $!";
352 0         0 return;
353             }
354 260622 50 33     725793 if ( defined $buf && length $buf == 0) {
355 260622         746569 return;
356             }
357 0         0 $state->{buf} .= $buf;
358 0 0       0 return if length $state->{buf} < 28;
359 0         0 my $msg = substr $state->{buf}, 0, 28, '';
360 0         0 my $method = substr($msg, 0, 4,'');
361 0         0 my $sockid = substr($msg, 0, 16, '');
362 0         0 my $reqs = hex($msg);
363              
364 0 0       0 if ( $method eq 'push' ) {
    0          
    0          
365 0         0 $state->{state} = 'recv_fd';
366 0         0 $state->{sockid} = $sockid;
367 0         0 $state->{reqs} = $reqs;
368             }
369             elsif ( $method eq 'keep' ) {
370 0 0       0 if ( exists $self->{sockets}{$sockid} ) {
371 0         0 $self->{sockets}{$sockid}[S_TIME] = AE::now;
372 0         0 $self->{sockets}{$sockid}[S_REQS] += $reqs;
373 0         0 $self->{sockets}{$sockid}[S_STATE] = 0;
374             $wait_read{$sockid} = AE::io $self->{sockets}{$sockid}[S_FD], 0, sub {
375 0         0 delete $wait_read{$sockid};
376 0         0 $self->queued_send($sockid);
377 0         0 };
378             }
379             }
380             elsif ( $method eq 'clos' ) {
381 0         0 delete $self->{sockets}{$sockid};
382 0         0 $self->update_stats();
383             }
384             }
385              
386 0 0       0 if ( $state->{state} eq 'recv_fd' ) {
387 0         0 my $fd = IO::FDPass::recv(fileno $sock);
388 0 0 0     0 if ( $fd < 0 && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      0        
389 0         0 return;
390             }
391 0         0 $state->{state} = 'cmd';
392 0 0       0 if ( $fd <= 0 ) {
393 0         0 warn sprintf 'Failed recv fd: %s (%d)', $!, $!;
394 0         0 return;
395             }
396 0         0 my $sockid = $state->{sockid};
397 0         0 my $reqs = $state->{reqs};
398             $self->{sockets}{$sockid} = [
399 0         0 AnyEvent::Util::guard { POSIX::close($fd) },
400 0         0 $fd,
401             AE::now,
402             $reqs,
403             0
404             ]; #guard,fd,time,reqs,state
405 0         0 $self->update_stats();
406             $wait_read{$sockid} = AE::io $fd, 0, sub {
407 0         0 delete $wait_read{$sockid};
408 0         0 $self->queued_send($sockid);
409 0         0 };
410             } # cmd
411             } # AE::io
412 37         536 } # for 1..max_workers
413              
414 12         111 $manager{workers} = \%workers;
415 12         243 $cv->recv;
416             }
417              
418             sub request_worker {
419 71     71 0 2310 my ($self,$app) = @_;
420              
421 71         5852 delete $self->{stats_fh};
422 71         5385 fh_nonblocking($self->{listen_sock},1);
423 71         9646 $self->{fdpass_sock}[WRITER]->close;
424 71         2995 fh_nonblocking($self->{fdpass_sock}[READER],1);
425 71         2972 $self->{workers}{$_}{sock}[READER]->close for 1..$self->{max_workers};
426              
427             # use Parallel::Prefork
428             my %pm_args = (
429             max_workers => $self->{max_workers},
430 71         9488 trap_signals => {
431             TERM => 'TERM',
432             HUP => 'TERM',
433             USR1 => 'USR1',
434             },
435             );
436 71 50       1639 if (defined $self->{err_respawn_interval}) {
437 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
438             }
439              
440 71         401 my $next;
441             $pm_args{on_child_reap} = sub {
442 160     160   34250977 my ( $pm, $exit_pid, $status ) = @_;
443 160         1628 for my $wid (1..$self->{max_workers} ) {
444 465 100 100     4336 if ( $self->{workers}{$wid}{running} && $self->{workers}{$wid}{running} == $exit_pid ) {
445             #warn sprintf "finished wid:%s pid:%s", $next, $exit_pid if DEBUG;
446 160         643 $self->{workers}{$wid}{running} = 0;
447 160         1069 last;
448             }
449             }
450 71         4532 };
451             $pm_args{before_fork} = sub {
452 343     343   48860 for my $wid (1..$self->{max_workers} ) {
453 921 100       3309 if ( ! $self->{workers}{$wid}{running} ) {
454 343         2578 $next = $wid;
455 343         1520 last;
456             }
457             }
458              
459 71         2005 };
460             $pm_args{after_fork} = sub {
461 285     285   319117 my ($pm, $pid) = @_;
462 285 50       2612 if ( defined $next ) {
463             #warn sprintf "assign wid:%s to pid:%s", $next, $pid if DEBUG;
464 285         3910 $self->{workers}{$next}{running} = $pid;
465             }
466             else {
467 0         0 warn "worker start but next is undefined";
468             }
469 71         1850 };
470              
471 71         4056 my $pm = Parallel::Prefork->new(\%pm_args);
472              
473 71         17853 while ($pm->signal_received !~ /^(?:TERM|USR1)$/) {
474             $pm->start(sub {
475 58 50   58   90710 die 'worker start but next is undefined' unless $next;
476 58         2416 for my $wid ( 1..$self->{max_workers} ) {
477 254 100       5349 next if $wid == $next;
478 196         4583 $self->{workers}{$wid}{sock}[WRITER]->close;
479             }
480 58         2142 $self->{mgr_sock} = $self->{workers}{$next}{sock}[WRITER];
481 58         2901 fh_nonblocking($self->{mgr_sock},1);
482              
483             open($self->{stats_fh}, '<', $self->{stats_filename})
484 58 50       11936 or die "could not open stats file: $!";
485              
486 58         1090 $self->{fhlist} = [$self->{listen_sock},$self->{fdpass_sock}[READER]];
487 58         980 $self->{fhbits} = '';
488 58         609 for ( @{$self->{fhlist}} ) {
  58         745  
489 116         1833 vec($self->{fhbits}, fileno $_, 1) = 1;
490             }
491              
492             my $max_reqs_per_child = $self->_calc_minmax_per_child(
493             $self->{max_reqs_per_child},
494             $self->{min_reqs_per_child}
495 58         4364 );
496             my $max_readahead_reqs = $self->_calc_minmax_per_child(
497             $self->{max_readahead_reqs},
498             $self->{min_readahead_reqs}
499 58         1088 );
500              
501 58         447 my $proc_req_count = 0;
502              
503 58         767 $self->{term_received} = 0;
504 58         510 $self->{stop_accept} = 0;
505             local $SIG{TERM} = sub {
506 35         331 $self->{term_received}++;
507 35 50       616 exit 0 if $self->{term_received} > 1;
508 58         3364 };
509             local $SIG{USR1} = sub {
510 35         925467 $self->{fhlist} = [$self->{fdpass_sock}[READER]];
511 35         292 $self->{fhbits} = '';
512 35         425 vec($self->{fhbits}, fileno($self->{fdpass_sock}[READER]), 1) = 1;
513 35         567 $self->{stop_accept}++;
514 58         1408 };
515              
516 58         1066 local $SIG{PIPE} = 'IGNORE';
517              
518 58         428 my $next_conn;
519 58   100     2777 while ( $next_conn || $self->{stop_accept} || $proc_req_count < $max_reqs_per_child ) {
      100        
520             last if ( $self->{term_received}
521 383 100 100     3692 && !$next_conn );
522 348         870 my $conn;
523 348 50 66     1838 if ( $next_conn && $next_conn->{buf} ) { #read ahead or pipeline
524 0         0 $conn = $next_conn;
525 0         0 $next_conn = undef;
526             }
527             else {
528 348         787 my @rfh = @{$self->{fhlist}};
  348         1321  
529 348         1344 my $rfd = $self->{fhbits};
530 348 100       1430 if ( $next_conn ) {
531 74         188 push @rfh, $next_conn->{fh};
532 74         486 vec($rfd, fileno $next_conn->{fh}, 1) = 1;
533             }
534 348         822 my @can_read;
535 348 100       7891151 if ( select($rfd, undef, undef, 1) > 0 ) {
536 280         3443 for ( my $i = 0; $i <= $#rfh; $i++ ) {
537 633         2341 my $try_read_fd = fileno $rfh[$i];
538 633 100 66     5476 if ( !defined $rfd || vec($rfd, $try_read_fd, 1) ) {
539 286 100 66     1800 if ( $next_conn && fileno $next_conn->{fh} == $try_read_fd ) {
540 74         211 $conn = $next_conn;
541 74         195 last;
542             }
543 212         1628 push @can_read, $self->{fhlist}[$i];
544             }
545             }
546             }
547             #accept or recv
548 348 100       1580 if ( !$conn ) {
549 274         4062 $conn = $self->accept_or_recv( @can_read );
550             }
551             # exists new conn && exists next_conn && next_conn is not ready => keep
552 348 50 100     2316 if ( $conn && $next_conn && $conn != $next_conn ) {
      66        
553 0         0 $self->keep_it($next_conn);
554             }
555             # try to re-read next_conn
556 348 50 66     2497 if ( !$conn && $next_conn ) {
557 0         0 @rfh = ();
558 0         0 next;
559             }
560             #clear next_conn
561 348         930 @rfh = ();
562 348         976 $next_conn = undef;
563             }
564 348 100       3385 next unless $conn;
565              
566             my $env = {
567             SERVER_PORT => $self->{port} || 0,
568             SERVER_NAME => $self->{host} || 0,
569             SCRIPT_NAME => '',
570             REMOTE_ADDR => $conn->{peeraddr},
571             REMOTE_PORT => $conn->{peerport} || 0,
572             'psgi.version' => [ 1, 1 ],
573             'psgi.errors' => *STDERR,
574             'psgi.url_scheme' => 'http',
575             'psgi.run_once' => Plack::Util::FALSE,
576             'psgi.multithread' => Plack::Util::FALSE,
577             'psgi.multiprocess' => Plack::Util::TRUE,
578             'psgi.streaming' => Plack::Util::TRUE,
579             'psgi.nonblocking' => Plack::Util::FALSE,
580             'psgix.input.buffered' => Plack::Util::TRUE,
581             'psgix.io' => $conn->{fh},
582             'psgix.harakiri' => 1,
583             'X_MONOCEROS_WORKER_STATS' => $self->{stats_filename},
584 159   50     6586 };
      100        
      50        
585 159 50       1896 $env->{'X_REMOTE_PID'} = $$ if $ENV{HARNESS_ACTIVE};
586 159         794 $self->{_is_deferred_accept} = 1; # ready to read
587 159         505 my $prebuf;
588 159 50       718 if ( exists $conn->{buf} ) {
589 0         0 $prebuf = delete $conn->{buf};
590             }
591             else {
592             #pre-read
593 159         3677 my $ret = sysread($conn->{fh}, $prebuf, MAX_REQUEST_SIZE);
594 159 50 0     1838 if ( ! defined $ret && ($! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR) ) {
    100 33        
      66        
595 0         0 $self->keep_it($conn);
596 0         0 next;
597             }
598             elsif ( defined $ret && $ret == 0) {
599             #closed?
600             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
601 64 100       582 if !$conn->{direct};
602 64         4635 next;
603             }
604             }
605             # stop keepalive if SIG{TERM} or SIG{USR1}. but go-on if pipline req
606 95         305 my $may_keepalive = 1;
607 95 50 33     1162 $may_keepalive = 0 if ($self->{term_received} || $self->{stop_accept});
608 95 100       449 $may_keepalive = 0 if $self->{disable_keepalive};
609 95         310 my $is_keepalive = 1; # to use "keepalive_timeout" in handle_connection,
610             # treat every connection as keepalive
611             my ($keepalive,$pipelined_buf) = $self->handle_connection($env, $conn->{fh}, $app,
612             $may_keepalive, $is_keepalive, $prebuf,
613 95         1534 $conn->{reqs});
614             # harakiri
615 95 100       460 if ($env->{'psgix.harakiri.commit'}) {
616 18         44 $proc_req_count = $max_reqs_per_child + 1;
617             }
618              
619 95         224 ++$proc_req_count;
620 95         199 $conn->{reqs}++;
621 95 100       284 if ( !$keepalive ) {
622             #close
623             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
624 3 50       27 if !$conn->{direct};
625 3         206 next;
626             }
627              
628             # pipeline
629 92 50 33     804 if ( defined $pipelined_buf && length $pipelined_buf ) {
630 0         0 $next_conn = $conn;
631 0         0 $next_conn->{buf} = $pipelined_buf;
632 0         0 next;
633             }
634              
635             # read ahead
636 92 100 66     560 if ( $conn->{reqs} < $max_readahead_reqs && $proc_req_count <= $max_reqs_per_child ) {
637 74         151 $next_conn = $conn;
638 74         1095 next;
639             }
640              
641             # wait
642 18         159 $self->keep_it($conn);
643             }
644 71         3790 }); #start
645             }
646             local $SIG{TERM} = sub {
647 12     12   3483 $pm->signal_all_children('TERM');
648 13         1883232 };
649 13         683 kill 'USR1', getppid();
650 13         345 my $children_left = $pm->wait_all_children;
651 13 50       525 warn "wait_all_children returned unterminated children. This should not happen!" if $children_left;
652 13         10215 exit;
653             }
654              
655             sub cmd_to_mgr {
656 32     32 0 181 my ($self,$cmd,$peername,$reqs) = @_;
657 32         767 my $msg = $cmd . Digest::MD5::md5($peername) . sprintf('%08x',$reqs);
658 32         1761 send($self->{mgr_sock}, $msg, 0);
659             }
660              
661             sub keep_it {
662 18     18 0 64 my ($self,$conn) = @_;
663 18 50       92 if ( $conn->{direct} ) {
664 18         140 $self->cmd_to_mgr("push", $conn->{peername}, $conn->{reqs});
665 18         56 my $ret;
666 18         58 do {
667 18         1016 $ret = IO::FDPass::send(fileno $self->{mgr_sock}, fileno $conn->{fh});
668 18 0 33     1367 die $! if ( !defined $ret && $! != EAGAIN && $! != EWOULDBLOCK && $! != EINTR);
      33        
      0        
669             #need select?
670             } while (!$ret);
671             }
672             else {
673 0         0 $self->cmd_to_mgr("keep", $conn->{peername}, $conn->{reqs});
674             }
675             }
676              
677             sub accept_or_recv {
678 274     274 0 991 my $self = shift;
679 274         1129 my @for_read = @_;
680 274         782 my $conn;
681 88     88   48511 use open 'IO' => ':unix';
  88         109083  
  88         523  
682 274         1668 for my $sock ( @for_read ) {
683 206 100       1794 if ( fileno $sock == fileno $self->{listen_sock} ) {
    50          
684 185         503 my ($fh,$peer);
685 185 50       901 if ( $have_accept4 ) {
686 0         0 $peer = Linux::Socket::Accept4::accept4($fh,$self->{listen_sock}, $have_accept4);
687             }
688             else {
689 185         10881 $peer = accept($fh,$self->{listen_sock});
690 185 100       1622 fh_nonblocking($fh,1) if $peer;
691             }
692 185 0 33     5907 if ( !$peer && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      33        
      33        
      66        
693 0         0 warn sprintf 'failed to accept: %s (%d)', $!, $!;
694 0         0 next;
695             }
696 185 100       1569 next unless $peer;
697 71 50       457 if ( !$self->{use_unix_domain} ) {
698 71 50       994 setsockopt($fh, IPPROTO_TCP, TCP_NODELAY, 1)
699             or die "setsockopt(TCP_NODELAY) failed:$!";
700             }
701 71         433 my ($peerport,$peerhost,$peeraddr);
702 71 50       404 if ( $self->{use_unix_domain} ) {
703             }
704             else {
705 71         1266 ($peerport,$peerhost) = unpack_sockaddr_in $peer;
706 71         807 $peeraddr = inet_ntoa($peerhost);
707             }
708 71         1811 $conn = {
709             fh => $fh,
710             peername => $peer,
711             peerport => $peerport,
712             peeraddr => $peeraddr,
713             direct => 1,
714             reqs => 0,
715             };
716 71         325 last;
717             }
718             elsif ( fileno $sock == fileno $self->{fdpass_sock}[READER] ) {
719 21         1242 my $fd = IO::FDPass::recv(fileno $self->{fdpass_sock}[READER]);
720 21 0 33     346 if ( $fd < 0 && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      33        
      33        
      66        
721 0         0 warn sprintf("could not recv fd: %s (%d)", $!, $!);
722             }
723 21 100       234 next if $fd <= 0;
724 14         62 my $peer;
725 14 50       423 if ( _getpeername($fd, $peer) < 0 ) {
726 0         0 next;
727             }
728 14 50       479 open(my $fh, '>>&='.$fd)
729             or die "could not open fd: $!";
730 14         153 my ($peerport,$peerhost,$peeraddr);
731 14 50       122 if ( $self->{use_unix_domain} ) {
732             }
733             else {
734 14         647 ($peerport,$peerhost) = unpack_sockaddr_in $peer;
735 14         180 $peeraddr = inet_ntoa($peerhost);
736             }
737 14         464 $conn = {
738             fh => $fh,
739             peername => $peer,
740             peerport => $peerport,
741             peeraddr => $peeraddr,
742             direct => 0,
743             reqs => 1, #xx
744             };
745 14         74 last;
746             }
747             }
748 274 100       1282 return unless $conn;
749 85         355 $conn;
750             }
751              
752             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
753             sub handle_connection {
754 95     95 0 921 my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf, $reqs) = @_;
755            
756 95         546 my $buf = '';
757 95         568 my $pipelined_buf='';
758 95         427 my $res = $bad_response;
759              
760 95         230 while (1) {
761 96         204 my $rlen;
762 96 100       403 if ( defined $prebuf ) {
763 95         239 $rlen = length $prebuf;
764 95         315 $buf = $prebuf;
765 95         244 undef $prebuf;
766             }
767             else {
768             $rlen = $self->read_timeout(
769             $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
770             $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
771 1 50       6 ) or return;
    50          
772             }
773              
774 95         1596 my $reqlen = parse_http_request($buf, $env);
775 95 100       33521 if ($reqlen >= 0) {
776             # handle request
777 94         377 my $protocol = $env->{SERVER_PROTOCOL};
778 94 100       414 if ($use_keepalive) {
779 93 100       375 if ( $protocol eq 'HTTP/1.1' ) {
780 92 100       483 if (my $c = $env->{HTTP_CONNECTION}) {
781 12 50       161 $use_keepalive = undef
782             if $c =~ /^\s*close\s*/i;
783             }
784             }
785             else {
786 1 50       5 if (my $c = $env->{HTTP_CONNECTION}) {
787 0 0       0 $use_keepalive = undef
788             unless $c =~ /^\s*keep-alive\s*/i;
789             } else {
790 1         5 $use_keepalive = undef;
791             }
792             }
793 93 100 100     970 if ( $use_keepalive && $reqs <= 1 ) {
794 57         1146 $use_keepalive = $self->can_keepalive;
795             }
796             }
797 94         374 $buf = substr $buf, $reqlen;
798 88     88   73387 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  88         216  
  88         196317  
  94         181  
  94         530  
799 94 100       502 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
800 5         209 my $buffer = Plack::TempBuffer->new($cl);
801 5         745 while ($cl > 0) {
802 6         18 my $chunk;
803 6 100       34 if (length $buf) {
804 2         6 $chunk = $buf;
805 2         24 $buf = '';
806             } else {
807             $self->read_timeout(
808             $conn, \$chunk, $cl, 0, $self->{timeout})
809 4 100       190 or return;
810             }
811 5         49 $buffer->print($chunk);
812 5         392 $cl -= length $chunk;
813             }
814 4         24 $env->{'psgi.input'} = $buffer->rewind;
815             }
816             elsif ($chunked) {
817 1         61 my $buffer = Plack::TempBuffer->new;
818 1         237 my $chunk_buffer = '';
819 1         4 my $length;
820 1         1 DECHUNK: while(1) {
821 3         5 my $chunk;
822 3 100       9 if ( length $buf ) {
823 1         3 $chunk = $buf;
824 1         9 $buf = '';
825             }
826             else {
827             $self->read_timeout($conn, \$chunk, CHUNKSIZE, 0, $self->{timeout})
828 2 50       80 or return;
829             }
830              
831 3         37 $chunk_buffer .= $chunk;
832 3         92 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
833 16         34 my $trailer = $1;
834 16         29 my $chunk_len = hex $2;
835 16 100       37 if ($chunk_len == 0) {
    50          
836 1         3 last DECHUNK;
837             } elsif (length $chunk_buffer < $chunk_len + 2) {
838 0         0 $chunk_buffer = $trailer . $chunk_buffer;
839 0         0 last;
840             }
841 15         63 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
842 15         303 $chunk_buffer =~ s/^\015\012//;
843 15         70 $length += $chunk_len;
844             }
845             }
846 1         16 $env->{CONTENT_LENGTH} = $length;
847 1         6 $env->{'psgi.input'} = $buffer->rewind;
848            
849             } else {
850 88 50       372 if ( $buf =~ m!^(?:GET|HEAD)! ) { #pipeline
851 0         0 $pipelined_buf = $buf;
852 0         0 $use_keepalive = 1;
853             } # else clear buffer
854 88         622 $env->{'psgi.input'} = $null_io;
855             }
856              
857 93 50       893 if ( $env->{HTTP_EXPECT} ) {
858 0 0       0 if ( $env->{HTTP_EXPECT} eq '100-continue' ) {
859 0 0       0 $self->write_all($conn, "HTTP/1.1 100 Continue\015\012\015\012")
860             or return;
861             } else {
862 0         0 $res = [417,[ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Expectation Failed' ] ];
863 0         0 last;
864             }
865             }
866              
867 93         1465 $res = Plack::Util::run_app $app, $env;
868 93         41978 last;
869             }
870 1 50       6 if ($reqlen == -2) {
    0          
871             # request is incomplete, do nothing
872             } elsif ($reqlen == -1) {
873             # error, close conn
874 0         0 last;
875             }
876             }
877              
878 93 100       493 if (ref $res eq 'ARRAY') {
    50          
879 90         1166 $self->_handle_response($env, $res, $conn, \$use_keepalive);
880             } elsif (ref $res eq 'CODE') {
881             $res->(sub {
882 3     3   282 $self->_handle_response($env, $_[0], $conn, \$use_keepalive);
883 3         44 });
884             } else {
885 0         0 die "Bad response $res";
886             }
887              
888 93         797 return ($use_keepalive, $pipelined_buf);
889             }
890              
891             sub _handle_response {
892 93     93   371 my($self, $env, $res, $conn, $use_keepalive_r) = @_;
893 93         363 my $protocol = $env->{SERVER_PROTOCOL};
894 93         231 my $status_code = $res->[0];
895 93         243 my $headers = $res->[1];
896 93         211 my $body = $res->[2];
897            
898 93         202 my @lines;
899             my %send_headers;
900 93         465 for (my $i = 0; $i < @$headers; $i += 2) {
901 106         257 my $k = $headers->[$i];
902 106         279 my $v = $headers->[$i + 1];
903 106         343 my $lck = lc $k;
904 106 50       345 if ($lck eq 'connection') {
905 0 0 0     0 $$use_keepalive_r = undef
906             if $$use_keepalive_r && lc $v ne 'keep-alive';
907             } else {
908 106         490 push @lines, "$k: $v\015\012";
909 106         888 $send_headers{$lck} = $v;
910             }
911             }
912 93 50       351 if ( ! exists $send_headers{server} ) {
913 93         522 unshift @lines, "Server: $self->{server_software}\015\012";
914             }
915 93 50       410 if ( ! exists $send_headers{date} ) {
916 93         283 unshift @lines, "Date: @{[HTTP::Date::time2str()]}\015\012";
  93         1002  
917             }
918              
919             # try to set content-length when keepalive can be used, or disable it
920 93         3845 my $use_chunked;
921 93 50       477 if ( $protocol eq 'HTTP/1.0' ) {
    50          
922 0 0       0 if ($$use_keepalive_r) {
923 0 0 0     0 if (defined $send_headers{'content-length'}
    0 0        
924             || defined $send_headers{'transfer-encoding'}) {
925             # ok
926             }
927             elsif ( ! Plack::Util::status_with_no_entity_body($status_code)
928             && defined(my $cl = Plack::Util::content_length($body))) {
929 0         0 push @lines, "Content-Length: $cl\015\012";
930             }
931             else {
932 0         0 $$use_keepalive_r = undef
933             }
934             }
935 0 0       0 push @lines, "Connection: keep-alive\015\012" if $$use_keepalive_r;
936 0 0       0 push @lines, "Connection: close\015\012" if !$$use_keepalive_r; #fmm..
937             }
938             elsif ( $protocol eq 'HTTP/1.1' ) {
939 93 100 66     1735 if (defined $send_headers{'content-length'}
    100          
940             || defined $send_headers{'transfer-encoding'}) {
941             # ok
942             } elsif ( !Plack::Util::status_with_no_entity_body($status_code) ) {
943 88         1260 push @lines, "Transfer-Encoding: chunked\015\012";
944 88         227 $use_chunked = 1;
945             }
946 93 100       406 push @lines, "Connection: close\015\012" unless $$use_keepalive_r;
947              
948             }
949              
950 93         317 unshift @lines, "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  93         1108  
951 93         1569 push @lines, "\015\012";
952            
953 93 100 100     1663 if (defined $body && ref $body eq 'ARRAY' && @$body == 1
      100        
      100        
954             && length $body->[0] < 8192) {
955             # combine response header and small request body
956 83         230 my $buf = $body->[0];
957 83 100       268 if ($use_chunked ) {
958 82         186 my $len = length $buf;
959 82         433 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012" . '0' . "\015\012\015\012";
960             }
961             my $len = $self->write_all(
962             $conn, join('', @lines, $buf), $self->{timeout},
963 83         3290 );
964 83 50       1189 warn $! unless $len;
965 83         483 return;
966             }
967              
968 10 0 33     48 if ( $have_sendfile && !$use_chunked
      33        
      0        
      0        
969             && defined $body && ref $body ne 'ARRAY'
970             && fileno($body) ) {
971 0   0     0 my $cl = $send_headers{'content-length'} || -s $body;
972             # sendfile
973 0         0 my $use_cork = 0;
974 0 0 0     0 if ( $^O eq 'linux' && !$self->{use_unix_domain} ) {
975 0 0       0 setsockopt($conn, IPPROTO_TCP, 3, 1)
976             and $use_cork = 1;
977             }
978             $self->write_all($conn, join('', @lines), $self->{timeout})
979 0 0       0 or return;
980 0         0 my $len = $self->sendfile_all($conn, $body, $cl, $self->{timeout});
981             #warn sprintf('%d:%s',$!, $!) unless $len;
982 0 0 0     0 if ( $use_cork && $$use_keepalive_r && !$self->{use_unix_domain} ) {
      0        
983 0         0 setsockopt($conn, IPPROTO_TCP, 3, 0);
984             }
985 0         0 return;
986             }
987              
988             $self->write_all($conn, join('', @lines), $self->{timeout})
989 10 50       188 or return;
990              
991 10 100       141 if (defined $body) {
992 8         22 my $failed;
993             my $completed;
994 8 100       56 my $body_count = (ref $body eq 'ARRAY') ? $#{$body} + 1 : -1;
  3         10  
995             Plack::Util::foreach(
996             $body,
997             sub {
998 10 50   10   889 unless ($failed) {
999 10         25 my $buf = $_[0];
1000 10         16 --$body_count;
1001 10 100       28 if ( $use_chunked ) {
1002 7         14 my $len = length $buf;
1003 7 50       22 return unless $len;
1004 7         208 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012";
1005 7 100       22 if ( $body_count == 0 ) {
1006 2         36 $buf .= '0' . "\015\012\015\012";
1007 2         7 $completed = 1;
1008             }
1009             }
1010             $self->write_all($conn, $buf, $self->{timeout})
1011 10 50       45 or $failed = 1;
1012             }
1013             },
1014 8         193 );
1015 8 100 100     3313 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked && !$completed;
1016             } else {
1017             return Plack::Util::inline_object
1018             write => sub {
1019 5     5   326 my $buf = $_[0];
1020 5 50       17 if ( $use_chunked ) {
1021 5         8 my $len = length $buf;
1022 5 100       27 return unless $len;
1023 4         21 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012"
1024             }
1025             $self->write_all($conn, $buf, $self->{timeout})
1026 4         19 },
1027             close => sub {
1028 2 50   2   68 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked;
1029 2         108 };
1030             }
1031             }
1032              
1033             sub _calc_minmax_per_child {
1034 116     116   1048 my $self = shift;
1035 116         765 my ($max,$min) = @_;
1036 116 50       899 if (defined $min) {
1037 0         0 srand((rand() * 2 ** 30) ^ $$ ^ time);
1038 0         0 return $max - int(($max - $min + 1) * rand);
1039             } else {
1040 116         841 return $max;
1041             }
1042             }
1043              
1044             # returns value returned by $cb, or undef on timeout or network error
1045             sub do_io {
1046 118     118 0 4132 my ($self, $is_write, $sock, $buf, $len, $off, $timeout) = @_;
1047 118         242 my $ret;
1048 118 100 100     474 unless ($is_write || delete $self->{_is_deferred_accept}) {
1049 2         16 goto DO_SELECT;
1050             }
1051             DO_READWRITE:
1052             # try to do the IO
1053 120 100 66     922 if ($is_write && $is_write == 1) {
    50 33        
1054 111 50       9841 $ret = syswrite $sock, $buf, $len, $off
1055             and return $ret;
1056             } elsif ($is_write && $is_write == 2) {
1057 0 0       0 $ret = Sys::Sendfile::sendfile($sock, $buf, $len)
1058             and return $ret;
1059 0 0 0     0 $ret = undef if defined $ret && $ret == 0 && $! == EAGAIN; #hmm
      0        
1060             } else {
1061 9 100       465 $ret = sysread $sock, $$buf, $len, $off
1062             and return $ret;
1063             }
1064 4 100 33     147 unless ((! defined($ret)
      66        
1065             && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
1066 2         85 return;
1067             }
1068             # wait for data
1069             DO_SELECT:
1070 4         20 while (1) {
1071 4         13 my ($rfd, $wfd);
1072 4         26 my $efd = '';
1073 4         26 vec($efd, fileno($sock), 1) = 1;
1074 4 50       16 if ($is_write) {
1075 0         0 ($rfd, $wfd) = ('', $efd);
1076             } else {
1077 4         29 ($rfd, $wfd) = ($efd, '');
1078             }
1079 4         47 my $start_at = time;
1080 4         40439 my $nfound = select($rfd, $wfd, $efd, $timeout);
1081 4         47 $timeout -= (time - $start_at);
1082 4 50       30 last if $nfound;
1083 0 0       0 return if $timeout <= 0;
1084             }
1085 4         16 goto DO_READWRITE;
1086             }
1087              
1088             sub sendfile_timeout {
1089 0     0 0   my ($self, $sock, $fh, $len, $off, $timeout) = @_;
1090 0           $self->do_io(2, $sock, $fh, $len, $off, $timeout);
1091             }
1092              
1093             sub sendfile_all {
1094 0     0 0   my ($self, $sock, $fh, $cl, $timeout) = @_;
1095 0           my $off = 0;
1096 0           while (my $len = $cl - $off) {
1097 0 0         my $ret = $self->sendfile_timeout($sock, $fh, $len, $off, $timeout)
1098             or return;
1099 0           $off += $ret;
1100 0 0         seek($fh, $off, 0) if $cl != $off;
1101             }
1102 0           return $cl;
1103             }
1104              
1105              
1106             1;