File Coverage

blib/lib/Monoceros/Server.pm
Criterion Covered Total %
statement 522 676 77.2
branch 183 350 52.2
condition 99 236 41.9
subroutine 59 63 93.6
pod 0 17 0.0
total 863 1342 64.3


line stmt bran cond sub pod time code
1             package Monoceros::Server;
2              
3 88     88   2644 use strict;
  88         245  
  88         3886  
4 88     88   542 use warnings;
  88         606  
  88         6128  
5 88     88   791 use base qw/Plack::Handler::Starlet/;
  88         959  
  88         61408  
6 88     88   6058335 use IO::Socket;
  88         200  
  88         918  
7 88     88   119915 use IO::FDPass;
  88         57332  
  88         3470  
8 88     88   599 use Parallel::Prefork;
  88         165  
  88         653  
9 88     88   112199 use AnyEvent;
  88         601464  
  88         3893  
10 88     88   63153 use AnyEvent::Util qw(fh_nonblocking portable_socketpair);
  88         649435  
  88         10888  
11 88     88   841 use Time::HiRes qw/time/;
  88         802  
  88         939  
12 88     88   7195 use Plack::TempBuffer;
  88         436  
  88         2599  
13 88     88   464 use Plack::Util;
  88         204  
  88         2437  
14 88     88   654 use Plack::HTTPParser qw( parse_http_request );
  88         245  
  88         5405  
15 88     88   622 use POSIX qw(EINTR EAGAIN EWOULDBLOCK ESPIPE ENOBUFS :sys_wait_h);
  88         938  
  88         781  
16 88     88   72048 use POSIX::getpeername qw/_getpeername/;
  88         74292  
  88         7056  
17 88     88   641 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  88         167  
  88         6346  
18 88     88   569 use File::Temp qw/tempfile/;
  88         182  
  88         5062  
19 88     88   533 use Digest::MD5 qw/md5/;
  88         195  
  88         10119  
20              
21 88     88   578 use constant WRITER => 0;
  88         286  
  88         9547  
22 88     88   526 use constant READER => 1;
  88         179  
  88         5050  
23              
24 88     88   553 use constant S_GD => 0;
  88         173  
  88         5583  
25 88     88   517 use constant S_FD => 1;
  88         150  
  88         4394  
26 88     88   435 use constant S_TIME => 2;
  88         167  
  88         4539  
27 88     88   445 use constant S_REQS => 3;
  88         155  
  88         4804  
28 88     88   428 use constant S_STATE => 4; # 0:idle 1:queue
  88         175  
  88         4150  
29              
30 88     88   431 use constant MAX_REQUEST_SIZE => 131072;
  88         161  
  88         4759  
31 88     88   616 use constant CHUNKSIZE => 64 * 1024;
  88         563  
  88         5973  
32 88   50 88   622 use constant DEBUG => $ENV{MONOCEROS_DEBUG} || 0;
  88         212  
  88         538578  
33              
34             my $null_io = do { open my $io, "<", \""; $io };
35             my $have_accept4 = eval {
36             require Linux::Socket::Accept4;
37             Linux::Socket::Accept4::SOCK_CLOEXEC()|Linux::Socket::Accept4::SOCK_NONBLOCK();
38             };
39              
40             my $have_sendfile = eval {
41             return 0;
42             require Sys::Sendfile;
43             };
44              
45             sub new {
46 83     83 0 7472139 my $class = shift;
47 83         1817 my %args = @_;
48              
49             # setup before instantiation
50 83         549 my $listen_sock;
51 83 50       1455 if (defined $ENV{SERVER_STARTER_PORT}) {
52 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
53 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
54 0         0 $args{host} = $1;
55 0         0 $args{port} = $2;
56             } else {
57 0         0 $args{port} = $hostport;
58             }
59 0 0       0 $listen_sock = IO::Socket::INET->new(
60             Proto => 'tcp',
61             ) or die "failed to create socket:$!";
62 0 0       0 $listen_sock->fdopen($fd, 'w')
63             or die "failed to bind to listening socket:$!";
64             }
65 83         689 my $max_workers = 5;
66 83         554 for (qw(max_workers workers)) {
67             $max_workers = delete $args{$_}
68 166 100       1334 if defined $args{$_};
69             }
70              
71 83   50     433 my $open_max = eval { POSIX::sysconf (POSIX::_SC_OPEN_MAX ()) - 1 } || 1023;
72             my $self = bless {
73             host => $args{host} || 0,
74             port => $args{port} || 8080,
75             socket_path => (defined $args{socket_path}) ? $args{socket_path} : undef,
76             max_workers => $max_workers,
77             timeout => $args{timeout} || 300,
78             disable_keepalive => (exists $args{keepalive} && !$args{keepalive}) ? 1 : 0,
79             keepalive_timeout => $args{keepalive_timeout} || 10,
80             max_keepalive_connection => $args{max_keepalive_connection} || int($open_max/2),
81             max_readahead_reqs => (
82             defined $args{max_readahead_reqs}
83             ? $args{max_readahead_reqs} : 100
84             ),
85             min_readahead_reqs => (
86             defined $args{min_readahead_reqs}
87             ? $args{min_readahead_reqs} : undef,
88             ),
89             server_software => $args{server_software} || $class,
90       61     server_ready => $args{server_ready} || sub {},
91             min_reqs_per_child => (
92             defined $args{min_reqs_per_child}
93             ? $args{min_reqs_per_child} : undef,
94             ),
95             max_reqs_per_child => (
96             $args{max_reqs_per_child} || $args{max_requests} || 1000,
97             ),
98             err_respawn_interval => (
99             defined $args{err_respawn_interval}
100             ? $args{err_respawn_interval} : undef,
101 83 50 100     14660 ),
    100 50        
    50 50        
    50 100        
    50 50        
    50 33        
    50 33        
      66        
      50        
102             _using_defer_accept => 1,
103             listen_sock => ( defined $listen_sock ? $listen_sock : undef),
104             }, $class;
105              
106 83         2139 $self;
107             }
108              
109             sub setup_listener {
110 83     83 0 291 my $self = shift;
111 83 50       2426 if ( my $path = $self->{socket_path} ) {
112 0 0       0 if (-S $path) {
113 0         0 warn "removing existing socket file:$path";
114 0 0       0 unlink $path
115             or die "failed to remove existing socket file:$path:$!";
116             }
117 0         0 unlink $path;
118 0         0 my $saved_umask = umask(0);
119 0 0       0 $self->{listen_sock} = IO::Socket::UNIX->new(
120             Listen => Socket::SOMAXCONN(),
121             Local => $path,
122             ) or die "failed to listen to socket $path:$!";
123 0         0 umask($saved_umask);
124 0         0 $self->{use_unix_domain} = 1;
125             }
126             else {
127             $self->{listen_sock} ||= IO::Socket::INET->new(
128             Listen => SOMAXCONN,
129             LocalPort => $self->{port},
130             LocalAddr => $self->{host},
131 83 50 33     4156 Proto => 'tcp',
132             ReuseAddr => 1,
133             ) or die "failed to listen to port $self->{port}:$!";
134             # set defer accept
135 83 50 33     73186 if ($^O eq 'linux' && !$self->{use_unix_domain}) {
136             setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1)
137 83 50       1186 and $self->{_using_defer_accept} = 1;
138             }
139             }
140              
141 83         519 $self->{server_ready}->($self);
142             }
143              
144             sub run {
145 83     83 0 7120 my ($self, $app) = @_;
146 83         1245 $self->setup_listener();
147 83         4503 $self->setup_sockpair();
148 83         786 $self->run_workers($app);
149             }
150              
151             sub setup_sockpair {
152 83     83 0 433 my $self = shift;
153              
154 83         1261 my %workers;
155 83         1414 for my $wid ( 1..$self->{max_workers} ) {
156 333 50       1873 my @pair = portable_socketpair()
157             or die "failed to create socketpair: $!";
158 333         28138 $workers{$wid} = {
159             running => 0,
160             sock => \@pair
161             };
162             }
163 83         589 $self->{workers} = \%workers;
164              
165 83 50       1171 my @fdpass_sock = portable_socketpair()
166             or die "failed to create socketpair: $!";
167 83         4638 $self->{fdpass_sock} = \@fdpass_sock;
168              
169 83         3783 my ($fh, $filename) = tempfile('monoceros_stats_XXXXXX',UNLINK => 0, SUFFIX => '.dat', TMPDIR => 1);
170 83         139353 $self->{stats_fh} = $fh;
171 83         280 $self->{stats_filename} = $filename;
172 83         1532 $self->update_stats();
173              
174 83         299 1;
175             }
176              
177             sub run_workers {
178 83     83 0 255 my ($self,$app) = @_;
179 83         2166 local $SIG{PIPE} = 'IGNORE';
180 83         373931 my $pid = fork;
181 83 100       11697 if ( $pid ) {
    50          
182             #parent
183 12         2533 $self->connection_manager($pid);
184 12         2010 delete $self->{stats_fh};
185 12         9351 unlink $self->{stats_filename};
186             }
187             elsif ( defined $pid ) {
188 71         13945 $self->request_worker($app);
189 0         0 exit;
190             }
191             else {
192 0         0 die "failed fork:$!";
193             }
194             }
195              
196             sub queued_send {
197 0     0 0 0 my $self = shift;
198 0         0 my $sockid = shift;
199              
200 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
201 0         0 return;
202             }
203 0         0 $self->{sockets}{$sockid}[S_STATE] = 1;
204              
205 0         0 push @{$self->{fdsend_queue}}, $sockid;
  0         0  
206             $self->{fdsend_worker} ||= AE::io $self->{fdpass_sock}[WRITER], 1, sub {
207 0     0   0 while ( my $sockid = shift @{$self->{fdsend_queue}} ) {
  0         0  
208 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
209 0         0 next;
210             }
211 0 0       0 if ( _getpeername($self->{sockets}{$sockid}[S_FD], my $addr) < 0 ) {
212 0         0 delete $self->{sockets}{$sockid};
213 0         0 next;
214             }
215             my $ret = IO::FDPass::send(
216             fileno $self->{fdpass_sock}[WRITER],
217 0         0 $self->{sockets}{$sockid}[S_FD]
218             );
219 0 0       0 if ( !$ret ) {
220 0 0 0     0 if ( $! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR ) {
      0        
221 0         0 unshift @{$self->{fdsend_queue}}, $sockid;
  0         0  
222 0         0 return;
223             }
224 0         0 die "unable to pass queue: $!";
225 0         0 undef $self->{fdsend_worker};
226             }
227             }
228 0         0 undef $self->{fdsend_worker};
229 0   0     0 };
230 0         0 1;
231             }
232              
233             my $prev_stats = '';
234             sub update_stats {
235 130     130 0 560 my $self = shift;
236 130         273 my $total = scalar keys %{$self->{sockets}};
  130         1321  
237 130         427 my $processing = scalar grep { !$self->{sockets}{$_}[S_STATE] == 1 } keys %{$self->{sockets}};
  0         0  
  130         819  
238 130         326 my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0 } keys %{$self->{sockets}};
  0         0  
  130         1137  
239              
240 130         631 my $stats = "total=$total&";
241 130         397 $stats .= "waiting=$idle&";
242 130         288 $stats .= "processing=$processing&";
243 130         424 $stats .= "max_workers=".$self->{max_workers}."&";
244 130 100 66     1463 return if $stats eq $prev_stats && @_;
245 83         538 $prev_stats = $stats;
246 83         697 seek($self->{stats_fh},0,0);
247 83         4559 syswrite($self->{stats_fh}, $stats);
248             }
249              
250             sub can_keepalive {
251 57     57 0 156 my $self = shift;
252 57         1035 seek($self->{stats_fh},0,0);
253 57         1581 sysread($self->{stats_fh},my $buf, 1024);
254 57 50       380 return 1 unless $buf;
255 57 50       1071 if ( $buf =~ m!total=(\d+)&! ){
256 57 50       377 return if $1 >= $self->{max_keepalive_connection};
257             }
258 57         237 return 1;
259             }
260              
261             sub connection_manager {
262 12     12 0 461 my ($self, $worker_pid) = @_;
263              
264 12         2856 $self->{workers}{$_}{sock}[WRITER]->close for 1..$self->{max_workers};
265 12         1310 $self->{fdpass_sock}[READER]->close;
266 12         841 fh_nonblocking $self->{fdpass_sock}[WRITER], 1;
267 12         409 fh_nonblocking $self->{listen_sock}, 1;
268              
269 12         233 my %manager;
270             my %hash2fd;
271 12         0 my %wait_read;
272 12         478 $self->{sockets} = {};
273 12         372 $self->{fdsend_queue} = [];
274              
275 12         188 warn sprintf "Set max_keepalive_connection to %s", $self->{max_keepalive_connection} if DEBUG;
276              
277 12         3971 my $cv = AE::cv;
278 12         129386 my $close_all = 0;
279 12         173 my $i = 0;
280 12         67 my $sig2;$sig2 = AE::signal 'USR1', sub {
281 12     12   24053 my $t;$t = AE::timer 0, 1, sub {
282 12 50 33     377 if ( not $close_all && $i < 4 ) {
    50          
283 0         0 return;
284             }
285             elsif ( $i >= 4 ) {
286 0         0 warn "Worker $worker_pid sockets did not close in time. Forcing abort.";
287             }
288 12         56 undef $t;
289 12         483 kill 'TERM', $worker_pid;
290 12         40 ++$i;
291 12         33 my $j = 0;
292 12         40 my $t2;$t2 = AE::timer 0, 1, sub {
293 44         2859643 my $kid = waitpid($worker_pid, WNOHANG);
294 44         128 ++$j;
295 44 100 66     499 if ( $kid >= 0 && $j < 60 ) {
    50          
296 32         277 return;
297             }
298             elsif ( $j >= 60 ) {
299 0         0 warn "Worker $worker_pid still here after $j iterations. Forcing abort.";
300             }
301 12         91 undef $t2;
302 12         516 $cv->send;
303 12         331 };
304 12         419 };
305 12         636 };
306 12         34 my $sig;$sig = AE::signal 'TERM', sub {
307 13     13   2329940 kill 'USR1', $worker_pid; #stop accept
308 13         53 my $t;$t = AE::timer 0, 1, sub {
309 13         224 my $time = time;
310 13 50       37 return if keys %{$self->{sockets}};
  13         513  
311 13         72 undef $t;
312 13         728 $close_all=1;
313 13         593 };
314 12         328 };
315              
316             $manager{disconnect_keepalive_timeout} = AE::timer 0, 1, sub {
317 47     47   20809279 my $time = time;
318 47         166 if ( DEBUG ) {
319             my $total = scalar keys %{$self->{sockets}};
320             my $processing = scalar grep { $self->{sockets}{$_}[S_STATE] == 1} keys %{$self->{sockets}};
321             my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0} keys %{$self->{sockets}};
322             warn "working: $processing | total: $total | idle: $idle";
323             }
324 47         116 for my $key ( keys %{$self->{sockets}} ) { #key = fd
  47         361  
325 0 0 0     0 if ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] == 0
    0 0        
      0        
      0        
326             && $time - $self->{sockets}{$key}[S_TIME] > $self->{timeout} ) { #idle && first req
327 0         0 delete $wait_read{$key};
328 0         0 delete $self->{sockets}{$key};
329              
330             }
331             elsif ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] > 0 &&
332             $time - $self->{sockets}{$key}[S_TIME] > $self->{keepalive_timeout} ) { #idle && keepalivew
333 0         0 delete $wait_read{$key};
334 0         0 delete $self->{sockets}{$key};
335             }
336             }
337              
338 47         587 $self->update_stats(1);
339 12         422 };
340              
341 12         50 my %m_state;
342             my %workers;
343 12         191 for my $wid ( 1..$self->{max_workers} ) {
344              
345 37         161 my $sock = $self->{workers}{$wid}{sock}[READER];
346 37         166 fh_nonblocking($sock,1);
347 37 50       352 return unless $sock;
348              
349 37         565 $m_state{$wid} = {};
350 37         112 my $state = $m_state{$wid};
351 37         337 $state->{buf} = '';
352 37         303 $state->{state} = 'cmd';
353 37         128 $state->{sockid} = '';
354 37         73 $state->{reqs} = 0;
355              
356             $workers{$wid} = AE::io fileno $sock, 0, sub {
357 281261 50   281261   7984098 if ( $state->{state} eq 'cmd' ) {
358 281261         975680 my $ret = recv($sock, my $buf, 28 - length($state->{buf}), 0);
359 281261 0 0     492541 if ( !defined $ret && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      33        
360 0         0 return;
361             }
362 281261 50       468581 if ( !defined $ret ) {
363 0         0 warn "failed to recv from sock: $!";
364 0         0 return;
365             }
366 281261 50 33     786465 if ( defined $buf && length $buf == 0) {
367 281261         602150 return;
368             }
369 0         0 $state->{buf} .= $buf;
370 0 0       0 return if length $state->{buf} < 28;
371 0         0 my $msg = substr $state->{buf}, 0, 28, '';
372 0         0 my $method = substr($msg, 0, 4,'');
373 0         0 my $sockid = substr($msg, 0, 16, '');
374 0         0 my $reqs = hex($msg);
375              
376 0 0       0 if ( $method eq 'push' ) {
    0          
    0          
377 0         0 $state->{state} = 'recv_fd';
378 0         0 $state->{sockid} = $sockid;
379 0         0 $state->{reqs} = $reqs;
380             }
381             elsif ( $method eq 'keep' ) {
382 0 0       0 if ( exists $self->{sockets}{$sockid} ) {
383 0         0 $self->{sockets}{$sockid}[S_TIME] = AE::now;
384 0         0 $self->{sockets}{$sockid}[S_REQS] += $reqs;
385 0         0 $self->{sockets}{$sockid}[S_STATE] = 0;
386             $wait_read{$sockid} = AE::io $self->{sockets}{$sockid}[S_FD], 0, sub {
387 0         0 delete $wait_read{$sockid};
388 0         0 $self->queued_send($sockid);
389 0         0 };
390             }
391             }
392             elsif ( $method eq 'clos' ) {
393 0         0 delete $self->{sockets}{$sockid};
394 0         0 $self->update_stats();
395             }
396             }
397              
398 0 0       0 if ( $state->{state} eq 'recv_fd' ) {
399 0         0 my $fd = IO::FDPass::recv(fileno $sock);
400 0 0 0     0 if ( $fd < 0 && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      0        
401 0         0 return;
402             }
403 0         0 $state->{state} = 'cmd';
404 0 0       0 if ( $fd <= 0 ) {
405 0         0 warn sprintf 'Failed recv fd: %s (%d)', $!, $!;
406 0         0 return;
407             }
408 0         0 my $sockid = $state->{sockid};
409 0         0 my $reqs = $state->{reqs};
410             $self->{sockets}{$sockid} = [
411 0         0 AnyEvent::Util::guard { POSIX::close($fd) },
412 0         0 $fd,
413             AE::now,
414             $reqs,
415             0
416             ]; #guard,fd,time,reqs,state
417 0         0 $self->update_stats();
418             $wait_read{$sockid} = AE::io $fd, 0, sub {
419 0         0 delete $wait_read{$sockid};
420 0         0 $self->queued_send($sockid);
421 0         0 };
422             } # cmd
423             } # AE::io
424 37         905 } # for 1..max_workers
425              
426 12         216 $manager{workers} = \%workers;
427 12         332 $cv->recv;
428             }
429              
430             sub request_worker {
431 71     71 0 2500 my ($self,$app) = @_;
432              
433 71         10154 delete $self->{stats_fh};
434 71         5293 fh_nonblocking($self->{listen_sock},1);
435 71         9439 $self->{fdpass_sock}[WRITER]->close;
436 71         2060 fh_nonblocking($self->{fdpass_sock}[READER],1);
437 71         3641 $self->{workers}{$_}{sock}[READER]->close for 1..$self->{max_workers};
438              
439             # use Parallel::Prefork
440             my %pm_args = (
441             max_workers => $self->{max_workers},
442 71         10498 trap_signals => {
443             TERM => 'TERM',
444             HUP => 'TERM',
445             USR1 => 'USR1',
446             },
447             );
448 71 50       1538 if (defined $self->{err_respawn_interval}) {
449 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
450             }
451              
452 71         1502 my $next;
453             $pm_args{on_child_reap} = sub {
454 160     160   68760747 my ( $pm, $exit_pid, $status ) = @_;
455 160         1477 for my $wid (1..$self->{max_workers} ) {
456 435 100 100     4503 if ( $self->{workers}{$wid}{running} && $self->{workers}{$wid}{running} == $exit_pid ) {
457             #warn sprintf "finished wid:%s pid:%s", $next, $exit_pid if DEBUG;
458 160         839 $self->{workers}{$wid}{running} = 0;
459 160         840 last;
460             }
461             }
462 71         9637 };
463             $pm_args{before_fork} = sub {
464 343     343   56983 for my $wid (1..$self->{max_workers} ) {
465 891 100       4139 if ( ! $self->{workers}{$wid}{running} ) {
466 343         2838 $next = $wid;
467 343         1313 last;
468             }
469             }
470              
471 71         4311 };
472             $pm_args{after_fork} = sub {
473 285     285   1172413 my ($pm, $pid) = @_;
474 285 50       4914 if ( defined $next ) {
475             #warn sprintf "assign wid:%s to pid:%s", $next, $pid if DEBUG;
476 285         4936 $self->{workers}{$next}{running} = $pid;
477             }
478             else {
479 0         0 warn "worker start but next is undefined";
480             }
481 71         3365 };
482              
483 71         6665 my $pm = Parallel::Prefork->new(\%pm_args);
484              
485 71         26266 while ($pm->signal_received !~ /^(?:TERM|USR1)$/) {
486             $pm->start(sub {
487 58 50   58   256701 die 'worker start but next is undefined' unless $next;
488 58         3022 for my $wid ( 1..$self->{max_workers} ) {
489 254 100       7238 next if $wid == $next;
490 196         5903 $self->{workers}{$wid}{sock}[WRITER]->close;
491             }
492 58         2750 $self->{mgr_sock} = $self->{workers}{$next}{sock}[WRITER];
493 58         4025 fh_nonblocking($self->{mgr_sock},1);
494              
495             open($self->{stats_fh}, '<', $self->{stats_filename})
496 58 50       15679 or die "could not open stats file: $!";
497              
498 58         2020 $self->{fhlist} = [$self->{listen_sock},$self->{fdpass_sock}[READER]];
499 58         1427 $self->{fhbits} = '';
500 58         478 for ( @{$self->{fhlist}} ) {
  58         646  
501 116         1827 vec($self->{fhbits}, fileno $_, 1) = 1;
502             }
503              
504             my $max_reqs_per_child = $self->_calc_minmax_per_child(
505             $self->{max_reqs_per_child},
506             $self->{min_reqs_per_child}
507 58         8626 );
508             my $max_readahead_reqs = $self->_calc_minmax_per_child(
509             $self->{max_readahead_reqs},
510             $self->{min_readahead_reqs}
511 58         965 );
512              
513 58         687 my $proc_req_count = 0;
514              
515 58         1450 $self->{term_received} = 0;
516 58         898 $self->{stop_accept} = 0;
517             local $SIG{TERM} = sub {
518 35         318 $self->{term_received}++;
519 35 50       698 exit 0 if $self->{term_received} > 1;
520 58         5924 };
521             local $SIG{USR1} = sub {
522 35         937664 $self->{fhlist} = [$self->{fdpass_sock}[READER]];
523 35         185 $self->{fhbits} = '';
524 35         392 vec($self->{fhbits}, fileno($self->{fdpass_sock}[READER]), 1) = 1;
525 35         370 $self->{stop_accept}++;
526 58         1175 };
527              
528 58         1002 local $SIG{PIPE} = 'IGNORE';
529              
530 58         385 my $next_conn;
531 58   100     3116 while ( $next_conn || $self->{stop_accept} || $proc_req_count < $max_reqs_per_child ) {
      100        
532             last if ( $self->{term_received}
533 287 100 66     4092 && !$next_conn );
534 252         676 my $conn;
535 252 50 66     12809 if ( $next_conn && $next_conn->{buf} ) { #read ahead or pipeline
536 0         0 $conn = $next_conn;
537 0         0 $next_conn = undef;
538             }
539             else {
540 252         994 my @rfh = @{$self->{fhlist}};
  252         2647  
541 252         5069 my $rfd = $self->{fhbits};
542 252 100       1023 if ( $next_conn ) {
543 74         178 push @rfh, $next_conn->{fh};
544 74         410 vec($rfd, fileno $next_conn->{fh}, 1) = 1;
545             }
546 252         695 my @can_read;
547 252 100       8154353 if ( select($rfd, undef, undef, 1) > 0 ) {
548 187         2239 for ( my $i = 0; $i <= $#rfh; $i++ ) {
549 447         4469 my $try_read_fd = fileno $rfh[$i];
550 447 100 66     4142 if ( !defined $rfd || vec($rfd, $try_read_fd, 1) ) {
551 200 100 66     1819 if ( $next_conn && fileno $next_conn->{fh} == $try_read_fd ) {
552 74         150 $conn = $next_conn;
553 74         210 last;
554             }
555 126         1112 push @can_read, $self->{fhlist}[$i];
556             }
557             }
558             }
559             #accept or recv
560 252 100       1388 if ( !$conn ) {
561 178         3504 $conn = $self->accept_or_recv( @can_read );
562             }
563             # exists new conn && exists next_conn && next_conn is not ready => keep
564 252 50 100     2205 if ( $conn && $next_conn && $conn != $next_conn ) {
      66        
565 0         0 $self->keep_it($next_conn);
566             }
567             # try to re-read next_conn
568 252 50 66     1914 if ( !$conn && $next_conn ) {
569 0         0 @rfh = ();
570 0         0 next;
571             }
572             #clear next_conn
573 252         666 @rfh = ();
574 252         771 $next_conn = undef;
575             }
576 252 100       1819 next unless $conn;
577              
578             my $env = {
579             SERVER_PORT => $self->{port} || 0,
580             SERVER_NAME => $self->{host} || 0,
581             SCRIPT_NAME => '',
582             REMOTE_ADDR => $conn->{peeraddr},
583             REMOTE_PORT => $conn->{peerport} || 0,
584             'psgi.version' => [ 1, 1 ],
585             'psgi.errors' => *STDERR,
586             'psgi.url_scheme' => 'http',
587             'psgi.run_once' => Plack::Util::FALSE,
588             'psgi.multithread' => Plack::Util::FALSE,
589             'psgi.multiprocess' => Plack::Util::TRUE,
590             'psgi.streaming' => Plack::Util::TRUE,
591             'psgi.nonblocking' => Plack::Util::FALSE,
592             'psgix.input.buffered' => Plack::Util::TRUE,
593             'psgix.io' => $conn->{fh},
594             'psgix.harakiri' => 1,
595             'X_MONOCEROS_WORKER_STATS' => $self->{stats_filename},
596 149   50     8670 };
      100        
      50        
597 149 50       2847 $env->{'X_REMOTE_PID'} = $$ if $ENV{HARNESS_ACTIVE};
598 149         848 $self->{_is_deferred_accept} = 1; # ready to read
599 149         308 my $prebuf;
600 149 50       484 if ( exists $conn->{buf} ) {
601 0         0 $prebuf = delete $conn->{buf};
602             }
603             else {
604             #pre-read
605 149         4265 my $ret = sysread($conn->{fh}, $prebuf, MAX_REQUEST_SIZE);
606 149 50 0     1832 if ( ! defined $ret && ($! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR) ) {
    100 33        
      66        
607 0         0 $self->keep_it($conn);
608 0         0 next;
609             }
610             elsif ( defined $ret && $ret == 0) {
611             #closed?
612             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
613 54 100       272 if !$conn->{direct};
614 54         5872 next;
615             }
616             }
617             # stop keepalive if SIG{TERM} or SIG{USR1}. but go-on if pipline req
618 95         255 my $may_keepalive = 1;
619 95 50 33     777 $may_keepalive = 0 if ($self->{term_received} || $self->{stop_accept});
620 95 100       328 $may_keepalive = 0 if $self->{disable_keepalive};
621 95         411 my $is_keepalive = 1; # to use "keepalive_timeout" in handle_connection,
622             # treat every connection as keepalive
623             my ($keepalive,$pipelined_buf) = $self->handle_connection($env, $conn->{fh}, $app,
624             $may_keepalive, $is_keepalive, $prebuf,
625 95         2193 $conn->{reqs});
626             # harakiri
627 95 100       501 if ($env->{'psgix.harakiri.commit'}) {
628 18         40 $proc_req_count = $max_reqs_per_child + 1;
629             }
630              
631 95         347 ++$proc_req_count;
632 95         473 $conn->{reqs}++;
633 95 100       273 if ( !$keepalive ) {
634             #close
635             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
636 3 50       16 if !$conn->{direct};
637 3         284 next;
638             }
639              
640             # pipeline
641 92 50 33     697 if ( defined $pipelined_buf && length $pipelined_buf ) {
642 0         0 $next_conn = $conn;
643 0         0 $next_conn->{buf} = $pipelined_buf;
644 0         0 next;
645             }
646              
647             # read ahead
648 92 100 66     563 if ( $conn->{reqs} < $max_readahead_reqs && $proc_req_count <= $max_reqs_per_child ) {
649 74         151 $next_conn = $conn;
650 74         1191 next;
651             }
652              
653             # wait
654 18         435 $self->keep_it($conn);
655             }
656 71         5387 }); #start
657             }
658             local $SIG{TERM} = sub {
659 12     12   20081 $pm->signal_all_children('TERM');
660 13         2051747 };
661 13         398 kill 'USR1', getppid();
662 13         273 my $children_left = $pm->wait_all_children;
663 13 50       565 warn "wait_all_children returned unterminated children. This should not happen!" if $children_left;
664 13         14578 exit;
665             }
666              
667             sub cmd_to_mgr {
668 22     22 0 93 my ($self,$cmd,$peername,$reqs) = @_;
669 22         509 my $msg = $cmd . Digest::MD5::md5($peername) . sprintf('%08x',$reqs);
670 22         12555 send($self->{mgr_sock}, $msg, 0);
671             }
672              
673             sub keep_it {
674 18     18 0 86 my ($self,$conn) = @_;
675 18 50       80 if ( $conn->{direct} ) {
676 18         159 $self->cmd_to_mgr("push", $conn->{peername}, $conn->{reqs});
677 18         50 my $ret;
678 18         39 do {
679 18         1539 $ret = IO::FDPass::send(fileno $self->{mgr_sock}, fileno $conn->{fh});
680 18 0 33     1362 die $! if ( !defined $ret && $! != EAGAIN && $! != EWOULDBLOCK && $! != EINTR);
      33        
      0        
681             #need select?
682             } while (!$ret);
683             }
684             else {
685 0         0 $self->cmd_to_mgr("keep", $conn->{peername}, $conn->{reqs});
686             }
687             }
688              
689             sub accept_or_recv {
690 178     178 0 531 my $self = shift;
691 178         561 my @for_read = @_;
692 178         604 my $conn;
693 88     88   55028 use open 'IO' => ':unix';
  88         130345  
  88         546  
694 178         2029 for my $sock ( @for_read ) {
695 113 100       1096 if ( fileno $sock == fileno $self->{listen_sock} ) {
    50          
696 106         298 my ($fh,$peer);
697 106 50       430 if ( $have_accept4 ) {
698 0         0 $peer = Linux::Socket::Accept4::accept4($fh,$self->{listen_sock}, $have_accept4);
699             }
700             else {
701 106         7752 $peer = accept($fh,$self->{listen_sock});
702 106 100       1213 fh_nonblocking($fh,1) if $peer;
703             }
704 106 0 33     2635 if ( !$peer && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      33        
      33        
      66        
705 0         0 warn sprintf 'failed to accept: %s (%d)', $!, $!;
706 0         0 next;
707             }
708 106 100       728 next unless $peer;
709 71 50       511 if ( !$self->{use_unix_domain} ) {
710 71 50       925 setsockopt($fh, IPPROTO_TCP, TCP_NODELAY, 1)
711             or die "setsockopt(TCP_NODELAY) failed:$!";
712             }
713 71         228 my ($peerport,$peerhost,$peeraddr);
714 71 50       269 if ( $self->{use_unix_domain} ) {
715             }
716             else {
717 71         1176 ($peerport,$peerhost) = unpack_sockaddr_in $peer;
718 71         775 $peeraddr = inet_ntoa($peerhost);
719             }
720 71         7210 $conn = {
721             fh => $fh,
722             peername => $peer,
723             peerport => $peerport,
724             peeraddr => $peeraddr,
725             direct => 1,
726             reqs => 0,
727             };
728 71         326 last;
729             }
730             elsif ( fileno $sock == fileno $self->{fdpass_sock}[READER] ) {
731 7         203 my $fd = IO::FDPass::recv(fileno $self->{fdpass_sock}[READER]);
732 7 0 33     328 if ( $fd < 0 && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      33        
      33        
      66        
733 0         0 warn sprintf("could not recv fd: %s (%d)", $!, $!);
734             }
735 7 100       34 next if $fd <= 0;
736 4         10 my $peer;
737 4 50       112 if ( _getpeername($fd, $peer) < 0 ) {
738 0         0 next;
739             }
740 4 50       145 open(my $fh, '>>&='.$fd)
741             or die "could not open fd: $!";
742 4         20 my ($peerport,$peerhost,$peeraddr);
743 4 50       37 if ( $self->{use_unix_domain} ) {
744             }
745             else {
746 4         54 ($peerport,$peerhost) = unpack_sockaddr_in $peer;
747 4         36 $peeraddr = inet_ntoa($peerhost);
748             }
749 4         258 $conn = {
750             fh => $fh,
751             peername => $peer,
752             peerport => $peerport,
753             peeraddr => $peeraddr,
754             direct => 0,
755             reqs => 1, #xx
756             };
757 4         22 last;
758             }
759             }
760 178 100       831 return unless $conn;
761 75         254 $conn;
762             }
763              
764             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
765             sub handle_connection {
766 95     95 0 1310 my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf, $reqs) = @_;
767            
768 95         546 my $buf = '';
769 95         439 my $pipelined_buf='';
770 95         856 my $res = $bad_response;
771              
772 95         250 while (1) {
773 96         239 my $rlen;
774 96 100       345 if ( defined $prebuf ) {
775 95         218 $rlen = length $prebuf;
776 95         289 $buf = $prebuf;
777 95         185 undef $prebuf;
778             }
779             else {
780             $rlen = $self->read_timeout(
781             $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
782             $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
783 1 50       8 ) or return;
    50          
784             }
785              
786 95         2009 my $reqlen = parse_http_request($buf, $env);
787 95 100       71522 if ($reqlen >= 0) {
788             # handle request
789 94         411 my $protocol = $env->{SERVER_PROTOCOL};
790 94 100       326 if ($use_keepalive) {
791 93 100       342 if ( $protocol eq 'HTTP/1.1' ) {
792 92 100       343 if (my $c = $env->{HTTP_CONNECTION}) {
793 12 50       540 $use_keepalive = undef
794             if $c =~ /^\s*close\s*/i;
795             }
796             }
797             else {
798 1 50       3 if (my $c = $env->{HTTP_CONNECTION}) {
799 0 0       0 $use_keepalive = undef
800             unless $c =~ /^\s*keep-alive\s*/i;
801             } else {
802 1         2 $use_keepalive = undef;
803             }
804             }
805 93 100 100     994 if ( $use_keepalive && $reqs <= 1 ) {
806 57         1918 $use_keepalive = $self->can_keepalive;
807             }
808             }
809 94         326 $buf = substr $buf, $reqlen;
810 88     88   96628 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  88         206  
  88         248271  
  94         165  
  94         558  
811 94 100       858 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
812 5         221 my $buffer = Plack::TempBuffer->new($cl);
813 5         741 while ($cl > 0) {
814 6         9 my $chunk;
815 6 100       21 if (length $buf) {
816 1         4 $chunk = $buf;
817 1         4 $buf = '';
818             } else {
819             $self->read_timeout(
820             $conn, \$chunk, $cl, 0, $self->{timeout})
821 5 100       111 or return;
822             }
823 5         51 $buffer->print($chunk);
824 5         380 $cl -= length $chunk;
825             }
826 4         37 $env->{'psgi.input'} = $buffer->rewind;
827             }
828             elsif ($chunked) {
829 1         155 my $buffer = Plack::TempBuffer->new;
830 1         346 my $chunk_buffer = '';
831 1         4 my $length;
832 1         2 DECHUNK: while(1) {
833 1         3 my $chunk;
834 1 50       8 if ( length $buf ) {
835 0         0 $chunk = $buf;
836 0         0 $buf = '';
837             }
838             else {
839             $self->read_timeout($conn, \$chunk, CHUNKSIZE, 0, $self->{timeout})
840 1 50       610 or return;
841             }
842              
843 1         29 $chunk_buffer .= $chunk;
844 1         42 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
845 16         34 my $trailer = $1;
846 16         34 my $chunk_len = hex $2;
847 16 100       44 if ($chunk_len == 0) {
    50          
848 1         6 last DECHUNK;
849             } elsif (length $chunk_buffer < $chunk_len + 2) {
850 0         0 $chunk_buffer = $trailer . $chunk_buffer;
851 0         0 last;
852             }
853 15         85 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
854 15         886 $chunk_buffer =~ s/^\015\012//;
855 15         90 $length += $chunk_len;
856             }
857             }
858 1         18 $env->{CONTENT_LENGTH} = $length;
859 1         6 $env->{'psgi.input'} = $buffer->rewind;
860            
861             } else {
862 88 50       2582 if ( $buf =~ m!^(?:GET|HEAD)! ) { #pipeline
863 0         0 $pipelined_buf = $buf;
864 0         0 $use_keepalive = 1;
865             } # else clear buffer
866 88         1187 $env->{'psgi.input'} = $null_io;
867             }
868              
869 93 50       1274 if ( $env->{HTTP_EXPECT} ) {
870 0 0       0 if ( $env->{HTTP_EXPECT} eq '100-continue' ) {
871 0 0       0 $self->write_all($conn, "HTTP/1.1 100 Continue\015\012\015\012")
872             or return;
873             } else {
874 0         0 $res = [417,[ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Expectation Failed' ] ];
875 0         0 last;
876             }
877             }
878              
879 93         10207 $res = Plack::Util::run_app $app, $env;
880 93         76005 last;
881             }
882 1 50       5 if ($reqlen == -2) {
    0          
883             # request is incomplete, do nothing
884             } elsif ($reqlen == -1) {
885             # error, close conn
886 0         0 last;
887             }
888             }
889              
890 93 100       660 if (ref $res eq 'ARRAY') {
    50          
891 90         1555 $self->_handle_response($env, $res, $conn, \$use_keepalive);
892             } elsif (ref $res eq 'CODE') {
893             $res->(sub {
894 3     3   166 $self->_handle_response($env, $_[0], $conn, \$use_keepalive);
895 3         60 });
896             } else {
897 0         0 die "Bad response $res";
898             }
899              
900 93         873 return ($use_keepalive, $pipelined_buf);
901             }
902              
903             sub _handle_response {
904 93     93   579 my($self, $env, $res, $conn, $use_keepalive_r) = @_;
905 93         346 my $protocol = $env->{SERVER_PROTOCOL};
906 93         227 my $status_code = $res->[0];
907 93         228 my $headers = $res->[1];
908 93         172 my $body = $res->[2];
909            
910 93         176 my @lines;
911             my %send_headers;
912 93         379 for (my $i = 0; $i < @$headers; $i += 2) {
913 106         222 my $k = $headers->[$i];
914 106         421 my $v = $headers->[$i + 1];
915 106         220 my $lck = lc $k;
916 106 50       308 if ($lck eq 'connection') {
917 0 0 0     0 $$use_keepalive_r = undef
918             if $$use_keepalive_r && lc $v ne 'keep-alive';
919             } else {
920 106         342 push @lines, "$k: $v\015\012";
921 106         1125 $send_headers{$lck} = $v;
922             }
923             }
924 93 50       469 if ( ! exists $send_headers{server} ) {
925 93         419 unshift @lines, "Server: $self->{server_software}\015\012";
926             }
927 93 50       279 if ( ! exists $send_headers{date} ) {
928 93         190 unshift @lines, "Date: @{[HTTP::Date::time2str()]}\015\012";
  93         1446  
929             }
930              
931             # try to set content-length when keepalive can be used, or disable it
932 93         2567 my $use_chunked;
933 93 50       470 if ( $protocol eq 'HTTP/1.0' ) {
    50          
934 0 0       0 if ($$use_keepalive_r) {
935 0 0 0     0 if (defined $send_headers{'content-length'}
    0 0        
936             || defined $send_headers{'transfer-encoding'}) {
937             # ok
938             }
939             elsif ( ! Plack::Util::status_with_no_entity_body($status_code)
940             && defined(my $cl = Plack::Util::content_length($body))) {
941 0         0 push @lines, "Content-Length: $cl\015\012";
942             }
943             else {
944 0         0 $$use_keepalive_r = undef
945             }
946             }
947 0 0       0 push @lines, "Connection: keep-alive\015\012" if $$use_keepalive_r;
948 0 0       0 push @lines, "Connection: close\015\012" if !$$use_keepalive_r; #fmm..
949             }
950             elsif ( $protocol eq 'HTTP/1.1' ) {
951 93 100 66     2569 if (defined $send_headers{'content-length'}
    100          
952             || defined $send_headers{'transfer-encoding'}) {
953             # ok
954             } elsif ( !Plack::Util::status_with_no_entity_body($status_code) ) {
955 88         995 push @lines, "Transfer-Encoding: chunked\015\012";
956 88         201 $use_chunked = 1;
957             }
958 93 100       498 push @lines, "Connection: close\015\012" unless $$use_keepalive_r;
959              
960             }
961              
962 93         292 unshift @lines, "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  93         1962  
963 93         1842 push @lines, "\015\012";
964            
965 93 100 100     2244 if (defined $body && ref $body eq 'ARRAY' && @$body == 1
      100        
      100        
966             && length $body->[0] < 8192) {
967             # combine response header and small request body
968 83         217 my $buf = $body->[0];
969 83 100       234 if ($use_chunked ) {
970 82         201 my $len = length $buf;
971 82         482 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012" . '0' . "\015\012\015\012";
972             }
973             my $len = $self->write_all(
974             $conn, join('', @lines, $buf), $self->{timeout},
975 83         4523 );
976 83 50       1117 warn $! unless $len;
977 83         642 return;
978             }
979              
980 10 0 33     71 if ( $have_sendfile && !$use_chunked
      33        
      0        
      0        
981             && defined $body && ref $body ne 'ARRAY'
982             && fileno($body) ) {
983 0   0     0 my $cl = $send_headers{'content-length'} || -s $body;
984             # sendfile
985 0         0 my $use_cork = 0;
986 0 0 0     0 if ( $^O eq 'linux' && !$self->{use_unix_domain} ) {
987 0 0       0 setsockopt($conn, IPPROTO_TCP, 3, 1)
988             and $use_cork = 1;
989             }
990             $self->write_all($conn, join('', @lines), $self->{timeout})
991 0 0       0 or return;
992 0         0 my $len = $self->sendfile_all($conn, $body, $cl, $self->{timeout});
993             #warn sprintf('%d:%s',$!, $!) unless $len;
994 0 0 0     0 if ( $use_cork && $$use_keepalive_r && !$self->{use_unix_domain} ) {
      0        
995 0         0 setsockopt($conn, IPPROTO_TCP, 3, 0);
996             }
997 0         0 return;
998             }
999              
1000             $self->write_all($conn, join('', @lines), $self->{timeout})
1001 10 50       1072 or return;
1002              
1003 10 100       127 if (defined $body) {
1004 8         19 my $failed;
1005             my $completed;
1006 8 100       31 my $body_count = (ref $body eq 'ARRAY') ? $#{$body} + 1 : -1;
  3         273  
1007             Plack::Util::foreach(
1008             $body,
1009             sub {
1010 10 50   10   535 unless ($failed) {
1011 10         24 my $buf = $_[0];
1012 10         18 --$body_count;
1013 10 100       28 if ( $use_chunked ) {
1014 7         15 my $len = length $buf;
1015 7 50       34 return unless $len;
1016 7         206 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012";
1017 7 100       23 if ( $body_count == 0 ) {
1018 2         8 $buf .= '0' . "\015\012\015\012";
1019 2         6 $completed = 1;
1020             }
1021             }
1022             $self->write_all($conn, $buf, $self->{timeout})
1023 10 50       46 or $failed = 1;
1024             }
1025             },
1026 8         163 );
1027 8 100 100     4713 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked && !$completed;
1028             } else {
1029             return Plack::Util::inline_object
1030             write => sub {
1031 5     5   450 my $buf = $_[0];
1032 5 50       15 if ( $use_chunked ) {
1033 5         12 my $len = length $buf;
1034 5 100       14 return unless $len;
1035 4         108 $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012"
1036             }
1037             $self->write_all($conn, $buf, $self->{timeout})
1038 4         21 },
1039             close => sub {
1040 2 50   2   102 $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked;
1041 2         125 };
1042             }
1043             }
1044              
1045             sub _calc_minmax_per_child {
1046 116     116   1280 my $self = shift;
1047 116         1140 my ($max,$min) = @_;
1048 116 50       1641 if (defined $min) {
1049 0         0 srand((rand() * 2 ** 30) ^ $$ ^ time);
1050 0         0 return $max - int(($max - $min + 1) * rand);
1051             } else {
1052 116         838 return $max;
1053             }
1054             }
1055              
1056             # returns value returned by $cb, or undef on timeout or network error
1057             sub do_io {
1058 118     118 0 3968 my ($self, $is_write, $sock, $buf, $len, $off, $timeout) = @_;
1059 118         234 my $ret;
1060 118 100 100     444 unless ($is_write || delete $self->{_is_deferred_accept}) {
1061 1         6 goto DO_SELECT;
1062             }
1063             DO_READWRITE:
1064             # try to do the IO
1065 121 100 66     1061 if ($is_write && $is_write == 1) {
    50 33        
1066 111 50       19842 $ret = syswrite $sock, $buf, $len, $off
1067             and return $ret;
1068             } elsif ($is_write && $is_write == 2) {
1069 0 0       0 $ret = Sys::Sendfile::sendfile($sock, $buf, $len)
1070             and return $ret;
1071 0 0 0     0 $ret = undef if defined $ret && $ret == 0 && $! == EAGAIN; #hmm
      0        
1072             } else {
1073 10 100       1681 $ret = sysread $sock, $$buf, $len, $off
1074             and return $ret;
1075             }
1076 5 100 33     213 unless ((! defined($ret)
      66        
1077             && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK))) {
1078 2         91 return;
1079             }
1080             # wait for data
1081             DO_SELECT:
1082 4         10 while (1) {
1083 4         22 my ($rfd, $wfd);
1084 4         27 my $efd = '';
1085 4         32 vec($efd, fileno($sock), 1) = 1;
1086 4 50       14 if ($is_write) {
1087 0         0 ($rfd, $wfd) = ('', $efd);
1088             } else {
1089 4         263 ($rfd, $wfd) = ($efd, '');
1090             }
1091 4         50 my $start_at = time;
1092 4         123211 my $nfound = select($rfd, $wfd, $efd, $timeout);
1093 4         56 $timeout -= (time - $start_at);
1094 4 50       30 last if $nfound;
1095 0 0       0 return if $timeout <= 0;
1096             }
1097 4         24 goto DO_READWRITE;
1098             }
1099              
1100             sub sendfile_timeout {
1101 0     0 0   my ($self, $sock, $fh, $len, $off, $timeout) = @_;
1102 0           $self->do_io(2, $sock, $fh, $len, $off, $timeout);
1103             }
1104              
1105             sub sendfile_all {
1106 0     0 0   my ($self, $sock, $fh, $cl, $timeout) = @_;
1107 0           my $off = 0;
1108 0           while (my $len = $cl - $off) {
1109 0 0         my $ret = $self->sendfile_timeout($sock, $fh, $len, $off, $timeout)
1110             or return;
1111 0           $off += $ret;
1112 0 0         seek($fh, $off, 0) if $cl != $off;
1113             }
1114 0           return $cl;
1115             }
1116              
1117              
1118             1;