File Coverage

blib/lib/Monoceros/Server.pm
Criterion Covered Total %
statement 143 599 23.8
branch 14 274 5.1
condition 13 169 7.6
subroutine 37 59 62.7
pod 0 13 0.0
total 207 1114 18.5


line stmt bran cond sub pod time code
1             package Monoceros::Server;
2              
3 14     14   800 use strict;
  14         27  
  14         438  
4 14     14   56 use warnings;
  14         26  
  14         711  
5 14     14   58 use base qw/Plack::Handler::Starlet/;
  14         11  
  14         6977  
6 14     14   765918 use IO::Socket;
  14         23  
  14         107  
7 14     14   14312 use IO::FDPass;
  14         3511  
  14         409  
8 14     14   75 use Parallel::Prefork;
  14         28  
  14         99  
9 14     14   12821 use AnyEvent;
  14         62272  
  14         481  
10 14     14   7188 use AnyEvent::Util qw(fh_nonblocking portable_socketpair);
  14         64819  
  14         1273  
11 14     14   101 use Time::HiRes qw/time/;
  14         19  
  14         113  
12 14     14   1506 use Plack::TempBuffer;
  14         20  
  14         252  
13 14     14   52 use Plack::Util;
  14         20  
  14         274  
14 14     14   48 use Plack::HTTPParser qw( parse_http_request );
  14         19  
  14         575  
15 14     14   62 use POSIX qw(EINTR EAGAIN EWOULDBLOCK ESPIPE ENOBUFS :sys_wait_h);
  14         12  
  14         91  
16 14     14   8379 use POSIX::getpeername qw/_getpeername/;
  14         6267  
  14         753  
17 14     14   5482 use POSIX::Socket;
  14         5749  
  14         1340  
18 14     14   77 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  14         19  
  14         635  
19 14     14   59 use File::Temp qw/tempfile/;
  14         17  
  14         683  
20 14     14   61 use Digest::MD5 qw/md5/;
  14         27  
  14         720  
21              
22 14     14   66 use constant WRITER => 0;
  14         14  
  14         878  
23 14     14   59 use constant READER => 1;
  14         14  
  14         547  
24              
25 14     14   67 use constant S_GD => 0;
  14         17  
  14         565  
26 14     14   58 use constant S_FD => 1;
  14         20  
  14         551  
27 14     14   50 use constant S_TIME => 2;
  14         12  
  14         511  
28 14     14   49 use constant S_REQS => 3;
  14         11  
  14         497  
29 14     14   44 use constant S_STATE => 4; # 0:idle 1:queue
  14         16  
  14         491  
30              
31 14     14   46 use constant MAX_REQUEST_SIZE => 131072;
  14         16  
  14         528  
32 14     14   46 use constant CHUNKSIZE => 64 * 1024;
  14         14  
  14         631  
33 14   50 14   58 use constant DEBUG => $ENV{MONOCEROS_DEBUG} || 0;
  14         101  
  14         55209  
34              
35             my $null_io = do { open my $io, "<", \""; $io };
36              
37             sub new {
38 9     9 0 9170 my $class = shift;
39 9         76 my %args = @_;
40              
41             # setup before instantiation
42 9         31 my $listen_sock;
43 9 50       78 if (defined $ENV{SERVER_STARTER_PORT}) {
44 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
45 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
46 0         0 $args{host} = $1;
47 0         0 $args{port} = $2;
48             } else {
49 0         0 $args{port} = $hostport;
50             }
51 0 0       0 $listen_sock = IO::Socket::INET->new(
52             Proto => 'tcp',
53             ) or die "failed to create socket:$!";
54 0 0       0 $listen_sock->fdopen($fd, 'w')
55             or die "failed to bind to listening socket:$!";
56             }
57 9         32 my $max_workers = 5;
58 9         38 for (qw(max_workers workers)) {
59             $max_workers = delete $args{$_}
60 18 100       118 if defined $args{$_};
61             }
62              
63 9   50     35 my $open_max = eval { POSIX::sysconf (POSIX::_SC_OPEN_MAX ()) - 1 } || 1023;
64              
65             my $self = bless {
66             host => $args{host} || 0,
67             port => $args{port} || 8080,
68             max_workers => $max_workers,
69             timeout => $args{timeout} || 300,
70             keepalive_timeout => $args{keepalive_timeout} || 10,
71             max_keepalive_connection => $args{max_keepalive_connection} || int($open_max/2),
72             max_readahead_reqs => (
73             defined $args{max_readahead_reqs}
74             ? $args{max_readahead_reqs} : 100
75             ),
76             min_readahead_reqs => (
77             defined $args{min_readahead_reqs}
78             ? $args{min_readahead_reqs} : undef,
79             ),
80             server_software => $args{server_software} || $class,
81       7     server_ready => $args{server_ready} || sub {},
82             min_reqs_per_child => (
83             defined $args{min_reqs_per_child}
84             ? $args{min_reqs_per_child} : undef,
85             ),
86             max_reqs_per_child => (
87             $args{max_reqs_per_child} || $args{max_requests} || 100,
88             ),
89             err_respawn_interval => (
90             defined $args{err_respawn_interval}
91             ? $args{err_respawn_interval} : undef,
92 9 50 100     769 ),
    50 50        
    50 50        
    50 50        
    50 33        
      33        
      100        
      50        
93             _using_defer_accept => 1,
94             listen_sock => ( defined $listen_sock ? $listen_sock : undef),
95             }, $class;
96              
97 9         101 $self;
98             }
99              
100             sub run {
101 9     9 0 330 my ($self, $app) = @_;
102 9         215 $self->setup_listener();
103 9         1157 $self->setup_sockpair();
104 9         82 $self->run_workers($app);
105             }
106              
107             sub setup_sockpair {
108 9     9 0 24 my $self = shift;
109              
110 9         18 my %workers;
111 9         52 for my $wid ( 1..$self->{max_workers} ) {
112 37 50       144 my @pair = portable_socketpair()
113             or die "failed to create socketpair: $!";
114 37         1174 $workers{$wid} = {
115             running => 0,
116             sock => \@pair
117             };
118             }
119 9         31 $self->{workers} = \%workers;
120              
121 9 50       48 my @fdpass_sock = portable_socketpair()
122             or die "failed to create socketpair: $!";
123 9         202 $self->{fdpass_sock} = \@fdpass_sock;
124              
125 9         145 my ($fh, $filename) = tempfile('monoceros_stats_XXXXXX',UNLINK => 0, SUFFIX => '.dat', TMPDIR => 1);
126 9         6054 $self->{stats_fh} = $fh;
127 9         23 $self->{stats_filename} = $filename;
128 9         108 $self->update_stats();
129              
130 9         15 1;
131             }
132              
133             sub run_workers {
134 9     9 0 18 my ($self,$app) = @_;
135 9         177 local $SIG{PIPE} = 'IGNORE';
136 9         9547 my $pid = fork;
137 9 100       499 if ( $pid ) {
    50          
138             #parent
139 3         171 $self->connection_manager($pid);
140 0         0 delete $self->{stats_fh};
141 0         0 unlink $self->{stats_filename};
142             }
143             elsif ( defined $pid ) {
144 6         512 $self->request_worker($app);
145 0         0 exit;
146             }
147             else {
148 0         0 die "failed fork:$!";
149             }
150             }
151              
152             sub queued_send {
153 0     0 0 0 my $self = shift;
154 0         0 my $sockid = shift;
155              
156 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
157 0         0 return;
158             }
159 0         0 $self->{sockets}{$sockid}[S_STATE] = 1;
160              
161 0         0 push @{$self->{fdsend_queue}}, $sockid;
  0         0  
162             $self->{fdsend_worker} ||= AE::io $self->{fdpass_sock}[WRITER], 1, sub {
163 0     0   0 while ( my $sockid = shift @{$self->{fdsend_queue}} ) {
  0         0  
164 0 0       0 if ( ! exists $self->{sockets}{$sockid} ) {
165 0         0 next;
166             }
167 0 0       0 if ( _getpeername($self->{sockets}{$sockid}[S_FD], my $addr) < 0 ) {
168 0         0 delete $self->{sockets}{$sockid};
169 0         0 next;
170             }
171             my $ret = IO::FDPass::send(
172             fileno $self->{fdpass_sock}[WRITER],
173 0         0 $self->{sockets}{$sockid}[S_FD]
174             );
175 0 0       0 if ( !$ret ) {
176 0 0 0     0 if ( $! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR ) {
      0        
177 0         0 unshift @{$self->{fdsend_queue}}, $sockid;
  0         0  
178 0         0 return;
179             }
180 0         0 die "unable to pass queue: $!";
181 0         0 undef $self->{fdsend_worker};
182             }
183             }
184 0         0 undef $self->{fdsend_worker};
185 0   0     0 };
186 0         0 1;
187             }
188              
189             my $prev_stats = '';
190             sub update_stats {
191 9     9 0 17 my $self = shift;
192 9         16 my $total = scalar keys %{$self->{sockets}};
  9         47  
193 9         19 my $processing = scalar grep { !$self->{sockets}{$_}[S_STATE] == 1 } keys %{$self->{sockets}};
  0         0  
  9         32  
194 9         24 my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0 } keys %{$self->{sockets}};
  0         0  
  9         20  
195              
196 9         30 my $stats = "total=$total&";
197 9         54 $stats .= "waiting=$idle&";
198 9         35 $stats .= "processing=$processing&";
199 9         28 $stats .= "max_workers=".$self->{max_workers}."&";
200 9 50 33     53 return if $stats eq $prev_stats && @_;
201 9         30 $prev_stats = $stats;
202 9         67 seek($self->{stats_fh},0,0);
203 9         357 syswrite($self->{stats_fh}, $stats);
204             }
205              
206             sub can_keepalive {
207 0     0 0 0 my $self = shift;
208 0         0 seek($self->{stats_fh},0,0);
209 0         0 sysread($self->{stats_fh},my $buf, 1024);
210 0 0       0 return 1 unless $buf;
211 0 0       0 if ( $buf =~ m!total=(\d+)&! ){
212 0 0       0 return if $1 >= $self->{max_keepalive_connection};
213             }
214 0         0 return 1;
215             }
216              
217             sub connection_manager {
218 3     3 0 33 my ($self, $worker_pid) = @_;
219              
220 3         229 $self->{workers}{$_}{sock}[WRITER]->close for 1..$self->{max_workers};
221 3         172 $self->{fdpass_sock}[READER]->close;
222 3         68 fh_nonblocking $self->{fdpass_sock}[WRITER], 1;
223 3         51 fh_nonblocking $self->{listen_sock}, 1;
224              
225 0         0 my %manager;
226             my %hash2fd;
227 0         0 my %wait_read;
228 0         0 my $term_received = 0;
229 0         0 $self->{sockets} = {};
230 0         0 $self->{fdsend_queue} = [];
231              
232 0         0 warn sprintf "Set max_keepalive_connection to %s", $self->{max_keepalive_connection} if DEBUG;
233              
234 0         0 my $cv = AE::cv;
235 0         0 my $close_all = 0;
236 0         0 my $sig2;$sig2 = AE::signal 'USR1', sub {
237 0     0   0 my $t;$t = AE::timer 0, 1, sub {
238 0 0       0 return unless $close_all;
239 0         0 undef $t;
240 0         0 kill 'TERM', $worker_pid;
241 0         0 my $t2;$t2 = AE::timer 0, 1, sub {
242 0         0 my $kid = waitpid($worker_pid, WNOHANG);
243 0 0       0 return if $kid >= 0;
244 0         0 undef $t2;
245 0         0 $cv->send;
246 0         0 };
247 0         0 };
248 0         0 };
249 0         0 my $sig;$sig = AE::signal 'TERM', sub {
250 0     0   0 $term_received++;
251 0         0 kill 'USR1', $worker_pid; #stop accept
252 0         0 my $t;$t = AE::timer 0, 1, sub {
253 0         0 my $time = time;
254 0 0       0 return if keys %{$self->{sockets}};
  0         0  
255 0         0 undef $t;
256 0         0 $close_all=1;
257 0         0 };
258 0         0 };
259              
260             $manager{disconnect_keepalive_timeout} = AE::timer 0, 1, sub {
261 0     0   0 my $time = time;
262 0         0 if ( DEBUG ) {
263             my $total = scalar keys %{$self->{sockets}};
264             my $processing = scalar grep { $self->{sockets}{$_}[S_STATE] == 1} keys %{$self->{sockets}};
265             my $idle = scalar grep { $self->{sockets}{$_}[S_STATE] == 0} keys %{$self->{sockets}};
266             warn "working: $processing | total: $total | idle: $idle";
267             }
268 0         0 for my $key ( keys %{$self->{sockets}} ) { #key = fd
  0         0  
269 0 0 0     0 if ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] == 0
    0 0        
      0        
      0        
270             && $time - $self->{sockets}{$key}[S_TIME] > $self->{timeout} ) { #idle && first req
271 0         0 delete $wait_read{$key};
272 0         0 delete $self->{sockets}{$key};
273            
274             }
275             elsif ( $self->{sockets}{$key}[S_STATE] == 0 && $self->{sockets}{$key}[S_REQS] > 0 &&
276             $time - $self->{sockets}{$key}[S_TIME] > $self->{keepalive_timeout} ) { #idle && keepalivew
277 0         0 delete $wait_read{$key};
278 0         0 delete $self->{sockets}{$key};
279             }
280             }
281 0         0 $self->update_stats(1);
282 0         0 };
283              
284 0         0 my %m_state;
285             my %workers;
286 0         0 for my $wid ( 1..$self->{max_workers} ) {
287            
288 0         0 my $sock = $self->{workers}{$wid}{sock}[READER];
289 0         0 fh_nonblocking($sock,1);
290 0 0       0 return unless $sock;
291              
292 0         0 $m_state{$wid} = {};
293 0         0 my $state = $m_state{$wid};
294 0         0 $state->{buf} = '';
295 0         0 $state->{state} = 'cmd';
296 0         0 $state->{sockid} = '';
297 0         0 $state->{reqs} = 0;
298            
299             $workers{$wid} = AE::io fileno $sock, 0, sub {
300 0 0   0   0 if ( $state->{state} eq 'cmd' ) {
301 0         0 my $ret = _recv(fileno($sock), my $buf, 28 - length($state->{buf}), 0);
302 0 0 0     0 if ( !defined $ret && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      0        
303 0         0 return;
304             }
305 0 0       0 if ( !defined $ret ) {
306 0         0 warn "failed to recv from sock: $!";
307 0         0 return;
308             }
309 0 0 0     0 if ( defined $buf && length $buf == 0) {
310 0         0 return;
311             }
312 0         0 $state->{buf} .= $buf;
313 0 0       0 return if length $state->{buf} < 28;
314 0         0 my $msg = substr $state->{buf}, 0, 28, '';
315 0         0 my $method = substr($msg, 0, 4,'');
316 0         0 my $sockid = substr($msg, 0, 16, '');
317 0         0 my $reqs = hex($msg);
318              
319 0 0       0 if ( $method eq 'push' ) {
    0          
    0          
320 0         0 $state->{state} = 'recv_fd';
321 0         0 $state->{sockid} = $sockid;
322 0         0 $state->{reqs} = $reqs;
323             }
324             elsif ( $method eq 'keep' ) {
325 0 0       0 if ( exists $self->{sockets}{$sockid} ) {
326 0         0 $self->{sockets}{$sockid}[S_TIME] = AE::now;
327 0         0 $self->{sockets}{$sockid}[S_REQS] += $reqs;
328 0         0 $self->{sockets}{$sockid}[S_STATE] = 0;
329             $wait_read{$sockid} = AE::io $self->{sockets}{$sockid}[S_FD], 0, sub {
330 0         0 delete $wait_read{$sockid};
331 0         0 $self->queued_send($sockid);
332 0         0 };
333             }
334             }
335             elsif ( $method eq 'clos' ) {
336 0         0 delete $self->{sockets}{$sockid};
337 0         0 $self->update_stats();
338             }
339             }
340              
341 0 0       0 if ( $state->{state} eq 'recv_fd' ) {
342 0         0 my $fd = IO::FDPass::recv(fileno $sock);
343 0 0 0     0 if ( $fd < 0 && ($! == EINTR || $! == EAGAIN || $! == EWOULDBLOCK) ) {
      0        
344 0         0 return;
345             }
346 0         0 $state->{state} = 'cmd';
347 0 0       0 if ( $fd <= 0 ) {
348 0         0 warn sprintf 'Failed recv fd: %s (%d)', $!, $!;
349 0         0 return;
350             }
351 0         0 my $sockid = $state->{sockid};
352 0         0 my $reqs = $state->{reqs};
353             $self->{sockets}{$sockid} = [
354 0         0 AnyEvent::Util::guard { _close($fd) },
355 0         0 $fd,
356             AE::now,
357             $reqs,
358             0
359             ]; #guard,fd,time,reqs,state
360 0         0 $self->update_stats();
361             $wait_read{$sockid} = AE::io $fd, 0, sub {
362 0         0 delete $wait_read{$sockid};
363 0         0 $self->queued_send($sockid);
364 0         0 };
365             } # cmd
366             } # AE::io
367 0         0 } # for 1..max_workers
368 0         0 $manager{workers} = \%workers;
369 0         0 $cv->recv;
370             }
371              
372             sub request_worker {
373 6     6 0 62 my ($self,$app) = @_;
374              
375 6         364 delete $self->{stats_fh};
376 6         163 fh_nonblocking($self->{listen_sock},1);
377 0           $self->{fdpass_sock}[WRITER]->close;
378 0           fh_nonblocking($self->{fdpass_sock}[READER],1);
379 0           $self->{workers}{$_}{sock}[READER]->close for 1..$self->{max_workers};
380              
381             # use Parallel::Prefork
382             my %pm_args = (
383             max_workers => $self->{max_workers},
384 0           trap_signals => {
385             TERM => 'TERM',
386             HUP => 'TERM',
387             USR1 => 'USR1',
388             },
389             );
390 0 0         if (defined $self->{err_respawn_interval}) {
391 0           $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
392             }
393              
394 0           my $next;
395             $pm_args{on_child_reap} = sub {
396 0     0     my ( $pm, $exit_pid, $status ) = @_;
397 0           for my $wid (1..$self->{max_workers} ) {
398 0 0 0       if ( $self->{workers}{$wid}{running} && $self->{workers}{$wid}{running} == $exit_pid ) {
399             #warn sprintf "finished wid:%s pid:%s", $next, $exit_pid if DEBUG;
400 0           $self->{workers}{$wid}{running} = 0;
401 0           last;
402             }
403             }
404 0           };
405             $pm_args{before_fork} = sub {
406 0     0     for my $wid (1..$self->{max_workers} ) {
407 0 0         if ( ! $self->{workers}{$wid}{running} ) {
408 0           $next = $wid;
409 0           last;
410             }
411             }
412              
413 0           };
414             $pm_args{after_fork} = sub {
415 0     0     my ($pm, $pid) = @_;
416 0 0         if ( defined $next ) {
417             #warn sprintf "assign wid:%s to pid:%s", $next, $pid if DEBUG;
418 0           $self->{workers}{$next}{running} = $pid;
419             }
420             else {
421 0           warn "worker start but next is undefined";
422             }
423 0           };
424              
425 0           my $pm = Parallel::Prefork->new(\%pm_args);
426              
427 0           while ($pm->signal_received !~ /^(?:TERM|USR1)$/) {
428             $pm->start(sub {
429 0 0   0     die 'worker start but next is undefined' unless $next;
430 0           for my $wid ( 1..$self->{max_workers} ) {
431 0 0         next if $wid == $next;
432 0           $self->{workers}{$wid}{sock}[WRITER]->close;
433             }
434 0           $self->{mgr_sock} = $self->{workers}{$next}{sock}[WRITER];
435 0           fh_nonblocking($self->{mgr_sock},1);
436              
437             open($self->{stats_fh}, '<', $self->{stats_filename})
438 0 0         or die "could not open stats file: $!";
439              
440 0           $self->{fhlist} = [$self->{listen_sock},$self->{fdpass_sock}[READER]];
441 0           $self->{fhbits} = '';
442 0           for ( @{$self->{fhlist}} ) {
  0            
443 0           vec($self->{fhbits}, fileno $_, 1) = 1;
444             }
445            
446             my $max_reqs_per_child = $self->_calc_minmax_per_child(
447             $self->{max_reqs_per_child},
448             $self->{min_reqs_per_child}
449 0           );
450             my $max_readahead_reqs = $self->_calc_minmax_per_child(
451             $self->{max_readahead_reqs},
452             $self->{min_readahead_reqs}
453 0           );
454              
455 0           my $proc_req_count = 0;
456            
457 0           $self->{term_received} = 0;
458 0           $self->{stop_accept} = 0;
459             local $SIG{TERM} = sub {
460 0           $self->{term_received}++;
461 0 0         exit 0 if $self->{term_received} > 1;
462 0           };
463             local $SIG{USR1} = sub {
464 0           $self->{fhlist} = [$self->{fdpass_sock}[READER]];
465 0           $self->{fhbits} = '';
466 0           vec($self->{fhbits}, fileno($self->{fdpass_sock}[READER]), 1) = 1;
467 0           $self->{stop_accept}++;
468 0           };
469              
470 0           local $SIG{PIPE} = 'IGNORE';
471              
472 0           my $next_conn;
473              
474 0   0       while ( $next_conn || $self->{stop_accept} || $proc_req_count < $max_reqs_per_child ) {
      0        
475             last if ( $self->{term_received}
476 0 0 0       && !$next_conn );
477            
478 0           my $conn;
479 0 0 0       if ( $next_conn && $next_conn->{buf} ) { #read ahead or pipeline
480 0           $conn = $next_conn;
481 0           $next_conn = undef;
482             }
483             else {
484 0           my @rfh = @{$self->{fhlist}};
  0            
485 0           my $rfd = $self->{fhbits};
486 0 0         if ( $next_conn ) {
487 0           push @rfh, $next_conn->{fh};
488 0           vec($rfd, fileno $next_conn->{fh}, 1) = 1;
489             }
490 0           my @can_read;
491 0 0         if ( select($rfd, undef, undef, 1) > 0 ) {
492 0           for ( my $i = 0; $i <= $#rfh; $i++ ) {
493 0           my $try_read_fd = fileno $rfh[$i];
494 0 0 0       if ( !defined $rfd || vec($rfd, $try_read_fd, 1) ) {
495 0 0 0       if ( $next_conn && fileno $next_conn->{fh} == $try_read_fd ) {
496 0           $conn = $next_conn;
497 0           last;
498             }
499 0           push @can_read, $self->{fhlist}[$i];
500             }
501             }
502             }
503             #accept or recv
504 0 0         if ( !$conn ) {
505 0           $conn = $self->accept_or_recv( @can_read );
506             }
507             # exists new conn && exists next_conn && next_conn is not ready => keep
508 0 0 0       if ( $conn && $next_conn && $conn != $next_conn ) {
      0        
509 0           $self->keep_it($next_conn);
510             }
511             # try to re-read next_conn
512 0 0 0       if ( !$conn && $next_conn ) {
513 0           $conn = $next_conn;
514             }
515             #clear next_conn
516 0           @rfh = ();
517 0           $next_conn = undef;
518             }
519 0 0         next unless $conn;
520            
521 0           ++$proc_req_count;
522             my $env = {
523             SERVER_PORT => $self->{port},
524             SERVER_NAME => $self->{host},
525             SCRIPT_NAME => '',
526             REMOTE_ADDR => $conn->{peeraddr},
527             REMOTE_PORT => $conn->{peerport},
528             'psgi.version' => [ 1, 1 ],
529             'psgi.errors' => *STDERR,
530             'psgi.url_scheme' => 'http',
531             'psgi.run_once' => Plack::Util::FALSE,
532             'psgi.multithread' => Plack::Util::FALSE,
533             'psgi.multiprocess' => Plack::Util::TRUE,
534             'psgi.streaming' => Plack::Util::TRUE,
535             'psgi.nonblocking' => Plack::Util::FALSE,
536             'psgix.input.buffered' => Plack::Util::TRUE,
537             'psgix.io' => $conn->{fh},
538             'X_MONOCEROS_WORKER_STATS' => $self->{stats_filename},
539 0           };
540 0           $self->{_is_deferred_accept} = 1; #ready to read
541 0           my $prebuf;
542 0 0         if ( exists $conn->{buf} ) {
543 0           $prebuf = delete $conn->{buf};
544             }
545             else {
546             #pre-read
547 0           my $ret = sysread($conn->{fh}, $prebuf, MAX_REQUEST_SIZE);
548 0 0 0       if ( ! defined $ret && ($! == EAGAIN || $! == EWOULDBLOCK || $! == EINTR) ) {
    0 0        
      0        
549 0           $self->keep_it($conn);
550 0           next;
551             }
552             elsif ( defined $ret && $ret == 0) {
553             #closed?
554             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
555 0 0         if !$conn->{direct};
556 0           next;
557             }
558             }
559             # stop keepalive if SIG{TERM} or SIG{USR1}. but go-on if pipline req
560 0           my $may_keepalive = 1;
561 0 0 0       $may_keepalive = 0 if ($self->{term_received} || $self->{stop_accept});
562 0           my $is_keepalive = 1; # to use "keepalive_timeout" in handle_connection,
563             # treat every connection as keepalive
564             my ($keepalive,$pipelined_buf) = $self->handle_connection($env, $conn->{fh}, $app,
565             $may_keepalive, $is_keepalive, $prebuf,
566 0           $conn->{reqs});
567 0           $conn->{reqs}++;
568 0 0         if ( !$keepalive ) {
569             #close
570             $self->cmd_to_mgr('clos', $conn->{peername}, $conn->{reqs})
571 0 0         if !$conn->{direct};
572 0           next;
573             }
574              
575             # pipeline
576 0 0 0       if ( defined $pipelined_buf && length $pipelined_buf ) {
577 0           $next_conn = $conn;
578 0           $next_conn->{buf} = $pipelined_buf;
579 0           next;
580             }
581              
582             # read ahead
583 0 0 0       if ( $conn->{reqs} < $max_readahead_reqs && $proc_req_count < $max_reqs_per_child ) {
584 0           $next_conn = $conn;
585 0           next;
586             }
587              
588             # wait
589 0           $self->keep_it($conn);
590             }
591 0           }); #start
592             }
593             local $SIG{TERM} = sub {
594 0     0     $pm->signal_all_children('TERM');
595 0           };
596 0           kill 'USR1', getppid();
597 0           $pm->wait_all_children;
598 0           exit;
599             }
600              
601             sub cmd_to_mgr {
602 0     0 0   my ($self,$cmd,$peername,$reqs) = @_;
603 0           my $msg = $cmd . Digest::MD5::md5($peername) . sprintf('%08x',$reqs);
604 0           _sendn(fileno($self->{mgr_sock}), $msg, 0);
605             }
606              
607             sub keep_it {
608 0     0 0   my ($self,$conn) = @_;
609 0 0         if ( $conn->{direct} ) {
610 0           $self->cmd_to_mgr("push", $conn->{peername}, $conn->{reqs});
611 0           my $ret;
612 0           do {
613 0           $ret = IO::FDPass::send(fileno $self->{mgr_sock}, fileno $conn->{fh});
614 0 0 0       die $! if ( !defined $ret && $! != EAGAIN && $! != EWOULDBLOCK && $! != EINTR);
      0        
      0        
615             #need select?
616             } while (!$ret);
617             }
618             else {
619 0           $self->cmd_to_mgr("keep", $conn->{peername}, $conn->{reqs});
620             }
621             }
622              
623             sub accept_or_recv {
624 0     0 0   my $self = shift;
625 0           my @for_read = @_;
626 0           my $conn;
627 0           for my $sock ( @for_read ) {
628 0 0         if ( fileno $sock == fileno $self->{listen_sock} ) {
    0          
629 0           my $peer = accept(my $fh, $self->{listen_sock});
630 0 0 0       if ( !$peer && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      0        
      0        
      0        
631 0           warn sprintf 'failed to accept: %s (%d)', $!, $!;
632 0           next;
633             }
634 0 0         next unless $peer;
635 0           fh_nonblocking($fh,1);
636 0 0         setsockopt($fh, IPPROTO_TCP, TCP_NODELAY, 1)
637             or die "setsockopt(TCP_NODELAY) failed:$!";
638 0           my ($peerport,$peerhost) = unpack_sockaddr_in $peer;
639 0           my $peeraddr = inet_ntoa($peerhost);
640 0           $conn = {
641             fh => $fh,
642             peername => $peer,
643             peerport => $peerport,
644             peeraddr => $peeraddr,
645             direct => 1,
646             reqs => 0,
647             };
648 0           last;
649             }
650             elsif ( fileno $sock == fileno $self->{fdpass_sock}[READER] ) {
651 0           my $fd = IO::FDPass::recv(fileno $self->{fdpass_sock}[READER]);
652 0 0 0       if ( $fd < 0 && ($! != EINTR && $! != EAGAIN && $! != EWOULDBLOCK && $! != ESPIPE) ) {
      0        
      0        
      0        
653 0           warn sprintf("could not recv fd: %s (%d)", $!, $!);
654             }
655 0 0         next if $fd <= 0;
656 0           my $peer;
657 0 0         if ( _getpeername($fd, $peer) < 0 ) {
658 0           next;
659             }
660 0 0         open(my $fh, '<&='.$fd)
661             or die "could not open fd: $!";
662 0           my ($peerport,$peerhost) = unpack_sockaddr_in $peer;
663 0           my $peeraddr = inet_ntoa($peerhost);
664 0           $conn = {
665             fh => $fh,
666             peername => $peer,
667             peerport => $peerport,
668             peeraddr => $peeraddr,
669             direct => 0,
670             reqs => 1, #xx
671             };
672 0           last;
673             }
674             }
675 0 0         return unless $conn;
676 0           $conn;
677             }
678              
679             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
680             sub handle_connection {
681 0     0 0   my($self, $env, $conn, $app, $use_keepalive, $is_keepalive, $prebuf, $reqs) = @_;
682            
683 0           my $buf = '';
684 0           my $pipelined_buf='';
685 0           my $res = $bad_response;
686              
687 0           while (1) {
688 0           my $rlen;
689 0 0         if ( defined $prebuf ) {
690 0           $rlen = length $prebuf;
691 0           $buf = $prebuf;
692 0           undef $prebuf;
693             }
694             else {
695             $rlen = $self->read_timeout(
696             $conn, \$buf, MAX_REQUEST_SIZE - length($buf), length($buf),
697             $is_keepalive ? $self->{keepalive_timeout} : $self->{timeout},
698 0 0         ) or return;
    0          
699             }
700              
701 0           my $reqlen = parse_http_request($buf, $env);
702 0 0         if ($reqlen >= 0) {
703             # handle request
704 0           my $protocol = $env->{SERVER_PROTOCOL};
705 0 0         if ($use_keepalive) {
706 0 0         if ( $protocol eq 'HTTP/1.1' ) {
707 0 0         if (my $c = $env->{HTTP_CONNECTION}) {
708 0 0         $use_keepalive = undef
709             if $c =~ /^\s*close\s*/i;
710             }
711             }
712             else {
713 0 0         if (my $c = $env->{HTTP_CONNECTION}) {
714 0 0         $use_keepalive = undef
715             unless $c =~ /^\s*keep-alive\s*/i;
716             } else {
717 0           $use_keepalive = undef;
718             }
719             }
720 0 0 0       if ( $use_keepalive && $reqs <= 1 ) {
721 0           $use_keepalive = $self->can_keepalive;
722             }
723             }
724 0           $buf = substr $buf, $reqlen;
725 14     14   106 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  14         16  
  14         19502  
  0            
  0            
726 0 0         if (my $cl = $env->{CONTENT_LENGTH}) {
    0          
727 0           my $buffer = Plack::TempBuffer->new($cl);
728 0           while ($cl > 0) {
729 0           my $chunk;
730 0 0         if (length $buf) {
731 0           $chunk = $buf;
732 0           $buf = '';
733             } else {
734             $self->read_timeout(
735             $conn, \$chunk, $cl, 0, $self->{timeout})
736 0 0         or return;
737             }
738 0           $buffer->print($chunk);
739 0           $cl -= length $chunk;
740             }
741 0           $env->{'psgi.input'} = $buffer->rewind;
742             }
743             elsif ($chunked) {
744 0           my $buffer = Plack::TempBuffer->new;
745 0           my $chunk_buffer = '';
746 0           my $length;
747 0           DECHUNK: while(1) {
748 0           my $chunk;
749 0 0         if ( length $buf ) {
750 0           $chunk = $buf;
751 0           $buf = '';
752             }
753             else {
754             $self->read_timeout($conn, \$chunk, CHUNKSIZE, 0, $self->{timeout})
755 0 0         or return;
756             }
757              
758 0           $chunk_buffer .= $chunk;
759 0           while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
760 0           my $trailer = $1;
761 0           my $chunk_len = hex $2;
762 0 0         if ($chunk_len == 0) {
    0          
763 0           last DECHUNK;
764             } elsif (length $chunk_buffer < $chunk_len + 2) {
765 0           $chunk_buffer = $trailer . $chunk_buffer;
766 0           last;
767             }
768 0           $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
769 0           $chunk_buffer =~ s/^\015\012//;
770 0           $length += $chunk_len;
771             }
772             }
773 0           $env->{CONTENT_LENGTH} = $length;
774 0           $env->{'psgi.input'} = $buffer->rewind;
775            
776             } else {
777 0 0         if ( $buf =~ m!^(?:GET|HEAD)! ) { #pipeline
778 0           $pipelined_buf = $buf;
779 0           $use_keepalive = 1;
780             } # else clear buffer
781 0           $env->{'psgi.input'} = $null_io;
782             }
783              
784 0 0         if ( $env->{HTTP_EXPECT} ) {
785 0 0         if ( $env->{HTTP_EXPECT} eq '100-continue' ) {
786 0 0         $self->write_all($conn, "HTTP/1.1 100 Continue\015\012\015\012")
787             or return;
788             } else {
789 0           $res = [417,[ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Expectation Failed' ] ];
790 0           last;
791             }
792             }
793              
794 0           $res = Plack::Util::run_app $app, $env;
795 0           last;
796             }
797 0 0         if ($reqlen == -2) {
    0          
798             # request is incomplete, do nothing
799             } elsif ($reqlen == -1) {
800             # error, close conn
801 0           last;
802             }
803             }
804              
805 0 0         if (ref $res eq 'ARRAY') {
    0          
806 0           $self->_handle_response($env, $res, $conn, \$use_keepalive);
807             } elsif (ref $res eq 'CODE') {
808             $res->(sub {
809 0     0     $self->_handle_response($env, $_[0], $conn, \$use_keepalive);
810 0           });
811             } else {
812 0           die "Bad response $res";
813             }
814              
815 0           return ($use_keepalive, $pipelined_buf);
816             }
817              
818             sub _handle_response {
819 0     0     my($self, $env, $res, $conn, $use_keepalive_r) = @_;
820 0           my $protocol = $env->{SERVER_PROTOCOL};
821 0           my $status_code = $res->[0];
822 0           my $headers = $res->[1];
823 0           my $body = $res->[2];
824            
825 0           my @lines;
826             my %send_headers;
827 0           for (my $i = 0; $i < @$headers; $i += 2) {
828 0           my $k = $headers->[$i];
829 0           my $v = $headers->[$i + 1];
830 0           my $lck = lc $k;
831 0 0         if ($lck eq 'connection') {
832 0 0 0       $$use_keepalive_r = undef
833             if $$use_keepalive_r && lc $v ne 'keep-alive';
834             } else {
835 0           push @lines, "$k: $v\015\012";
836 0           $send_headers{$lck} = $v;
837             }
838             }
839 0 0         if ( ! exists $send_headers{server} ) {
840 0           unshift @lines, "Server: $self->{server_software}\015\012";
841             }
842 0 0         if ( ! exists $send_headers{date} ) {
843 0           unshift @lines, "Date: @{[HTTP::Date::time2str()]}\015\012";
  0            
844             }
845              
846             # try to set content-length when keepalive can be used, or disable it
847 0           my $use_chunked;
848 0 0         if ( $protocol eq 'HTTP/1.0' ) {
    0          
849 0 0         if ($$use_keepalive_r) {
850 0 0 0       if (defined $send_headers{'content-length'}
    0 0        
851             || defined $send_headers{'transfer-encoding'}) {
852             # ok
853             }
854             elsif ( ! Plack::Util::status_with_no_entity_body($status_code)
855             && defined(my $cl = Plack::Util::content_length($body))) {
856 0           push @lines, "Content-Length: $cl\015\012";
857             }
858             else {
859 0           $$use_keepalive_r = undef
860             }
861             }
862 0 0         push @lines, "Connection: keep-alive\015\012" if $$use_keepalive_r;
863 0 0         push @lines, "Connection: close\015\012" if !$$use_keepalive_r; #fmm..
864             }
865             elsif ( $protocol eq 'HTTP/1.1' ) {
866 0 0 0       if (defined $send_headers{'content-length'}
    0          
867             || defined $send_headers{'transfer-encoding'}) {
868             # ok
869             } elsif ( !Plack::Util::status_with_no_entity_body($status_code) ) {
870 0           push @lines, "Transfer-Encoding: chunked\015\012";
871 0           $use_chunked = 1;
872             }
873 0 0         push @lines, "Connection: close\015\012" unless $$use_keepalive_r;
874              
875             }
876              
877 0           unshift @lines, "HTTP/1.1 $status_code @{[ HTTP::Status::status_message($status_code) ]}\015\012";
  0            
878 0           push @lines, "\015\012";
879            
880 0 0 0       if (defined $body && ref $body eq 'ARRAY' && @$body == 1
      0        
      0        
881             && length $body->[0] < 8192) {
882             # combine response header and small request body
883 0           my $buf = $body->[0];
884 0 0         if ($use_chunked ) {
885 0           my $len = length $buf;
886 0           $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012" . '0' . "\015\012\015\012";
887             }
888             my $len = $self->write_all(
889             $conn, join('', @lines, $buf), $self->{timeout},
890 0           );
891 0 0         warn $! unless $len;
892 0           return;
893             }
894             $self->write_all($conn, join('', @lines), $self->{timeout})
895 0 0         or return;
896              
897 0 0         if (defined $body) {
898 0           my $failed;
899             my $completed;
900 0 0         my $body_count = (ref $body eq 'ARRAY') ? $#{$body} + 1 : -1;
  0            
901             Plack::Util::foreach(
902             $body,
903             sub {
904 0 0   0     unless ($failed) {
905 0           my $buf = $_[0];
906 0           --$body_count;
907 0 0         if ( $use_chunked ) {
908 0           my $len = length $buf;
909 0 0         return unless $len;
910 0           $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012";
911 0 0         if ( $body_count == 0 ) {
912 0           $buf .= '0' . "\015\012\015\012";
913 0           $completed = 1;
914             }
915             }
916             $self->write_all($conn, $buf, $self->{timeout})
917 0 0         or $failed = 1;
918             }
919             },
920 0           );
921 0 0 0       $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked && !$completed;
922             } else {
923             return Plack::Util::inline_object
924             write => sub {
925 0     0     my $buf = $_[0];
926 0 0         if ( $use_chunked ) {
927 0           my $len = length $buf;
928 0 0         return unless $len;
929 0           $buf = sprintf("%x",$len) . "\015\012" . $buf . "\015\012"
930             }
931             $self->write_all($conn, $buf, $self->{timeout})
932 0           },
933             close => sub {
934 0 0   0     $self->write_all($conn, '0' . "\015\012\015\012", $self->{timeout}) if $use_chunked;
935 0           };
936             }
937             }
938              
939             sub _calc_minmax_per_child {
940 0     0     my $self = shift;
941 0           my ($max,$min) = @_;
942 0 0         if (defined $min) {
943 0           srand((rand() * 2 ** 30) ^ $$ ^ time);
944 0           return $max - int(($max - $min + 1) * rand);
945             } else {
946 0           return $max;
947             }
948             }
949              
950             1;