File Coverage

blib/lib/Twiggy/Server.pm
Criterion Covered Total %
statement 191 357 53.5
branch 38 126 30.1
condition 14 75 18.6
subroutine 43 64 67.1
pod 0 4 0.0
total 286 626 45.6


line stmt bran cond sub pod time code
1             package Twiggy::Server;
2 1     1   14 use strict;
  1         2  
  1         37  
3 1     1   6 use warnings;
  1         10  
  1         130  
4              
5 1     1   8 use Scalar::Util qw(blessed weaken);
  1         1  
  1         249  
6 1     1   6 use Try::Tiny;
  1         2  
  1         109  
7 1     1   8 use Carp;
  1         2  
  1         135  
8              
9 1     1   5 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         2  
  1         1243  
10 1     1   77 use Errno qw(EAGAIN EINTR);
  1         3  
  1         120  
11 1     1   6 use IO::Handle;
  1         1  
  1         47  
12              
13 1     1   6 use AnyEvent;
  1         2  
  1         102  
14 1     1   5 use AnyEvent::Handle;
  1         3  
  1         37  
15 1     1   1641 use AnyEvent::Socket;
  1         23432  
  1         197  
16 1     1   14 use AnyEvent::Util qw(WSAEWOULDBLOCK);
  1         2  
  1         49  
17              
18 1     1   2141 use HTTP::Status;
  1         4798  
  1         361  
19 1     1   894 use Plack::HTTPParser qw(parse_http_request);
  1         5906  
  1         72  
20 1     1   11 use Plack::Util;
  1         2  
  1         33  
21              
22 1     1   5 use constant DEBUG => $ENV{TWIGGY_DEBUG};
  1         2  
  1         289  
23             use constant HAS_AIO => !$ENV{PLACK_NO_SENDFILE} && try {
24             require AnyEvent::AIO;
25             require IO::AIO;
26             1;
27 1   33 1   8 };
  1         112  
  1         25  
28              
29 1     1   6 open my $null_io, '<', \'';
  1         2  
  1         47  
30              
31             sub new {
32 1     1 0 4 my($class, @args) = @_;
33              
34 1         10 return bless {
35             no_delay => 1,
36             timeout => 300,
37             read_chunk_size => 4096,
38             @args,
39             }, $class;
40             }
41              
42             sub start_listen {
43 1     1 0 2 my ($self, $app) = @_;
44 1 50 50     2 my @listen = @{$self->{listen} || [ ($self->{host} || '') . ":" . ($self->{port} || 0) ]};
  1   50     24  
45 1         4 for my $listen (@listen) {
46 1         2 push @{$self->{listen_guards}}, $self->_create_tcp_server($listen, $app);
  1         5  
47             }
48             }
49              
50             sub register_service {
51 1     1 0 2 my($self, $app) = @_;
52              
53 1         5 $self->start_listen($app);
54              
55             $self->{exit_guard} = AE::cv {
56             # Make sure that we are not listening on a socket anymore, while
57             # other events are being flushed
58 1     1   39 delete $self->{listen_guards};
59 1         7125 };
60 1         205 $self->{exit_guard}->begin;
61             }
62              
63             sub _create_tcp_server {
64 1     1   3 my ( $self, $listen, $app ) = @_;
65              
66 1         3 my($host, $port, $is_tcp);
67 1 50       13 if ($listen =~ /:\d+$/) {
68 1         4 ($host, $port) = split /:/, $listen;
69 1 50       5 $host = undef if $host eq '';
70 1 50       4 $port = undef if $port == 0;
71 1         2 $is_tcp = 1;
72             } else {
73 0         0 $host = "unix/";
74 0         0 $port = $listen;
75             }
76              
77 1         1 my($listen_host, $listen_port);
78              
79 1         6 return tcp_server $host, $port, $self->_accept_handler($app, $is_tcp, \$listen_host, \$listen_port),
80             $self->_accept_prepare_handler(\$listen_host, \$listen_port);
81             }
82              
83             sub _accept_prepare_handler {
84 1     1   1 my($self, $listen_host_r, $listen_port_r) = @_;
85              
86             return sub {
87 1     1   245 my ( $fh, $host, $port ) = @_;
88 1         2 DEBUG && warn "Listening on $host:$port\n";
89 1         3 $$listen_host_r = $host;
90 1         2 $$listen_port_r = $port;
91 1 50       4 $self->{server_ready}->({
92             host => $host,
93             port => $port,
94             server_software => 'Twiggy',
95             }) if $self->{server_ready};
96              
97 1   50     9 return $self->{backlog} || 0;
98 1         11 };
99             }
100              
101             sub _accept_handler {
102 1     1   2 my ( $self, $app, $is_tcp, $listen_host_r, $listen_port_r ) = @_;
103              
104             return sub {
105 2     2   6168 my ( $sock, $peer_host, $peer_port ) = @_;
106              
107 2         4 DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
108 2 50       11 return unless $sock;
109 2         12 $self->{exit_guard}->begin;
110              
111 2 50 33     118 if ( $is_tcp && $self->{no_delay} ) {
112 2 50       19 setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
113             or die "setsockopt(TCP_NODELAY) failed:$!";
114             }
115              
116 2         3 my $headers = "";
117             my $try_parse = sub {
118 2 50       9 if ( $self->_try_read_headers($sock, $headers) ) {
119 1         52 my $env = {
120             SERVER_NAME => $$listen_host_r,
121             SERVER_PORT => $$listen_port_r,
122             SCRIPT_NAME => '',
123             REMOTE_ADDR => $peer_host,
124             'psgi.version' => [ 1, 0 ],
125             'psgi.errors' => *STDERR,
126             'psgi.url_scheme' => 'http',
127             'psgi.nonblocking' => Plack::Util::TRUE,
128             'psgi.streaming' => Plack::Util::TRUE,
129             'psgi.run_once' => Plack::Util::FALSE,
130             'psgi.multithread' => Plack::Util::FALSE,
131             'psgi.multiprocess' => Plack::Util::FALSE,
132             'psgi.input' => undef, # will be set by _run_app()
133             'psgix.io' => $sock,
134             'psgix.input.buffered' => Plack::Util::TRUE,
135             };
136              
137 1         9 my $reqlen = parse_http_request($headers, $env);
138 1         316 DEBUG && warn "$sock Parsed HTTP headers: request length=$reqlen\n";
139              
140 1 50       4 if ( $reqlen < 0 ) {
141 0         0 die "bad request";
142             } else {
143 1         5 return $env;
144             }
145             }
146              
147 0         0 return;
148 2         15 };
149              
150 2         5 local $@;
151 2 100       5 unless ( eval {
152 2 50       6 if ( my $env = $try_parse->() ) {
153             # the request data is already available, no need to parse more
154 1         5 $self->_run_app($app, $env, $sock);
155             } else {
156             # there's not yet enough data to parse the request,
157             # set up a watcher
158 0         0 $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
159             };
160              
161 1         737 1;
162             }) {
163 1         9 my $disconnected = ($@ =~ /^client disconnected/);
164 1         6 $self->_bad_request($sock, $disconnected);
165             }
166 1         9 };
167             }
168              
169             # returns a closure that tries to parse
170             # this is not a method because it needs a buffer per socket
171             sub _try_read_headers {
172 2     2   4 my ( $self, $sock, undef ) = @_;
173              
174             # FIXME add a timer to manage read timeouts
175 2         11 local $/ = "\012";
176              
177 2         5 read_more: for my $headers ( $_[2] ) {
178 5 100 0     55 if ( defined(my $line = <$sock>) ) {
    50 0        
    50 33        
179 4         118 $headers .= $line;
180              
181 4 100 66     25 if ( $line eq "\015\012" or $line eq "\012" ) {
182             # got an empty line, we're done reading the headers
183 1         9 return 1;
184             } else {
185             # try to read more lines using buffered IO
186 3         6 redo read_more;
187             }
188             } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
189 0         0 die $!;
190             } elsif (!$!) {
191 1         17 die "client disconnected";
192             }
193             }
194              
195 0         0 DEBUG && warn "$sock did not read to end of req, wait for more data to arrive\n";
196 0         0 return;
197             }
198              
199             sub _create_req_parsing_watcher {
200 0     0   0 my ( $self, $sock, $try_parse, $app ) = @_;
201              
202 0         0 my $headers_io_watcher;
203              
204             my $timeout_timer = AE::timer $self->{timeout}, 0, sub {
205 0     0   0 DEBUG && warn "$sock Timeout\n";
206 0         0 undef $headers_io_watcher;
207 0         0 undef $try_parse;
208 0         0 undef $sock;
209 0 0       0 } if $self->{timeout};
210              
211             $headers_io_watcher = AE::io $sock, 0, sub {
212             try {
213 0 0       0 if ( my $env = $try_parse->() ) {
214 0         0 undef $headers_io_watcher;
215 0         0 undef $timeout_timer;
216 0         0 $self->_run_app($app, $env, $sock);
217             }
218             } catch {
219 0         0 undef $headers_io_watcher;
220 0         0 undef $timeout_timer;
221 0         0 my $disconnected = /^client disconnected/;
222 0         0 $self->_bad_request($sock, $disconnected);
223             }
224 0     0   0 };
  0         0  
225             }
226              
227             sub _bad_request {
228 1     1   2 my ( $self, $sock, $disconnected ) = @_;
229              
230 1 50 33     10 return unless defined $sock and defined fileno $sock;
231              
232 1         5 my $response = [
233             400,
234             [ 'Content-Type' => 'text/plain' ],
235             [ ],
236             ];
237              
238             # if client is already gone, don't try to write to it
239 1 50       10 $response = [] if $disconnected;
240              
241 1         7 $self->_write_psgi_response($sock, $response);
242              
243 1         177 return;
244             }
245              
246             sub _read_chunk {
247 0     0   0 my ($self, $sock, $remaining, $cb) = @_;
248              
249 0         0 my $data = '';
250 0         0 my $read_chunk_size = $self->{read_chunk_size};
251              
252             my $try_read = sub {
253 0 0       0 READ_MORE: {
254 0     0   0 my $read_size = $remaining > $read_chunk_size ? $read_chunk_size : $remaining;
255 0         0 my $rlen = read($sock, $data, $read_size, length($data));
256              
257 0 0 0     0 if (defined $rlen and $rlen > 0) {
    0 0        
    0 0        
    0 0        
258 0         0 $remaining -= $rlen;
259              
260 0 0       0 if ($remaining <= 0) {
261 0         0 $cb->($data);
262 0         0 return 1;
263             } else {
264 0         0 redo READ_MORE;
265             }
266             } elsif (defined $rlen) {
267 0         0 $cb->($data);
268 0         0 return 1;
269             } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
270 0         0 die $!;
271             } elsif (!$!) {
272 0         0 die "client disconnected";
273             }
274             }
275              
276 0         0 return;
277 0         0 };
278              
279 0 0       0 unless ($try_read->()) {
280 0         0 my $rw; $rw = AE::io($sock, 0, sub {
281             try {
282 0 0       0 if ($try_read->()) {
283 0         0 undef $rw;
284             }
285             } catch {
286 0         0 undef $rw;
287 0         0 $self->_bad_request($sock);
288 0     0   0 };
289 0         0 });
290             }
291             }
292              
293             sub _run_app {
294 1     1   6 my($self, $app, $env, $sock) = @_;
295              
296 1 50       13 unless ($env->{'psgi.input'}) {
297 1 50       5 if ($env->{CONTENT_LENGTH}) {
298             $self->_read_chunk($sock, $env->{CONTENT_LENGTH}, sub {
299 0     0   0 my ($data) = @_;
300 0         0 open my $input, '<', \$data;
301 0         0 $env->{'psgi.input'} = $input;
302 0         0 $self->_run_app($app, $env, $sock);
303 0         0 });
304 0         0 return;
305             } else {
306 1         4 $env->{'psgi.input'} = $null_io;
307             }
308             }
309              
310 1         14 my $res = Plack::Util::run_app $app, $env;
311              
312 1 50 33     63 if ( ref $res eq 'ARRAY' ) {
    50          
    50          
313 0         0 $self->_write_psgi_response($sock, $res);
314             } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
315 0         0 Carp::carp("Returning AnyEvent condvar is deprecated and will be removed in the next release of Twiggy. Use the streaming callback interface intstead.");
316 0     0   0 $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
  0         0  
317             } elsif ( ref $res eq 'CODE' ) {
318             $res->(
319             sub {
320 1     1   3155 my $res = shift;
321              
322 1 50       14 if ( @$res < 2 ) {
    50          
323 0         0 croak "Insufficient arguments";
324             } elsif ( @$res == 2 ) {
325 0         0 my ( $status, $headers ) = @$res;
326              
327 0         0 $self->_flush($sock);
328              
329 0         0 my $writer = Twiggy::Writer->new($sock, $self->{exit_guard});
330              
331 0         0 my $buf = $self->_format_headers($status, $headers);
332 0         0 $writer->write($$buf);
333              
334 0         0 return $writer;
335             } else {
336 1         5 my ( $status, $headers, $body, $post ) = @$res;
337 1         10 my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
338 1 50       8 $cv->cb(sub { $post->() }) if $post;
  0         0  
339             }
340             },
341 1         9 $sock,
342             );
343             } else {
344 0         0 croak("Unknown response type: $res");
345             }
346             }
347              
348             sub _write_psgi_response {
349 2     2   6 my ( $self, $sock, $res ) = @_;
350              
351 2 50       13 if ( ref $res eq 'ARRAY' ) {
352 2 100       9 if ( scalar @$res == 0 ) {
353             # no response
354 1         15 $self->{exit_guard}->end;
355 1         6 return;
356             }
357              
358 1         3 my ( $status, $headers, $body ) = @$res;
359              
360 1         63 my $cv = AE::cv;
361              
362             $self->_write_headers( $sock, $status, $headers )->cb(sub {
363 1     1   19 local $@;
364 1 50       3 if ( eval { $_[0]->recv; 1 } ) {
  1         6  
  0         0  
365             $self->_write_body($sock, $body)->cb(sub {
366 0         0 shutdown $sock, 1;
367 0         0 close $sock;
368 0         0 $self->{exit_guard}->end;
369 0         0 local $@;
370 0 0       0 eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
  0         0  
  0         0  
371 0         0 });
372             } else {
373 1         602 $self->{exit_guard}->end;
374 1 50       67 eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
  1         11  
  0         0  
375             }
376 1         14 });
377              
378 1         13 return $cv;
379             } else {
380 1     1   2901 no warnings 'uninitialized';
  1         2  
  1         384  
381 0         0 warn "Unknown response type: $res";
382 0         0 return $self->_write_psgi_response($sock, [ 204, [], [] ]);
383             }
384             }
385              
386             sub _write_headers {
387 1     1   4 my ( $self, $sock, $status, $headers ) = @_;
388              
389 1         14 $self->_write_buf( $sock, $self->_format_headers($status, $headers) );
390             }
391              
392             sub _format_headers {
393 1     1   3 my ( $self, $status, $headers ) = @_;
394              
395 1         7 my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);
396              
397 1         14 my $i = 0;
398              
399 1         3 my @delim = ("\015\012", ": ");
400              
401 1         3 foreach my $str ( @$headers ) {
402 4         5114 $hdr .= $str . $delim[++$i % 2];
403             }
404              
405 1         9 $hdr .= "\015\012";
406              
407 1         26 return \$hdr;
408             }
409              
410             # this flushes just the output buffer, not the input buffer (unlike
411             # $handle->flush)
412             sub _flush {
413 0     0   0 my ( $self, $sock ) = @_;
414              
415 0         0 local $| = 1;
416 0         0 print $sock '';
417             }
418              
419             # helper routine, similar to push write, but respects buffering, and refcounts
420             # itself
421             sub _write_buf {
422 1     1   5 my($self, $socket, $data) = @_;
423              
424 1     1   7 no warnings 'uninitialized';
  1         2  
  1         750  
425              
426             # try writing immediately
427 1 50       268 if ( (my $written = syswrite($socket, $$data)) < length($$data) ) {
    0          
428 1   33     223 my $done = defined(wantarray) && AE::cv;
429              
430             # either the write failed or was incomplete
431              
432 1 0 0     20 if ( !defined($written) and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
      0        
      33        
433             # a real write error occured, like EPIPE
434 0 0       0 $done->croak($!) if $done;
435 0         0 return $done;
436             }
437              
438             # the write was either incomplete or a non fatal error occured, so we
439             # need to set up an IO watcher to wait until we can properly write
440              
441 1         3 my $length = length($$data);
442              
443 1         1 my $write_watcher;
444             $write_watcher = AE::io $socket, 1, sub {
445 1         28 write_more: {
446 1     1   503 my $out = syswrite($socket, $$data, $length - $written, $written);
447              
448 1 50 33     22 if ( defined($out) ) {
    50 33        
449 0         0 $written += $out;
450              
451 0 0       0 if ( $written == $length ) {
452 0         0 undef $write_watcher;
453 0 0       0 $done->send(1) if $done;
454             } else {
455 0         0 redo write_more;
456             }
457             } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
458 1 50       134 $done->croak($!) if $done;
459 1         515 undef $write_watcher;
460             }
461             }
462 1         16 };
463              
464 1         14 return $done;
465             } elsif ( defined wantarray ) {
466 0         0 my $done = AE::cv;
467 0         0 $done->send(1);
468 0         0 return $done;
469             }
470             }
471              
472             sub _write_body {
473 0     0   0 my ( $self, $sock, $body ) = @_;
474              
475 0 0 0     0 if ( ref $body eq 'ARRAY' ) {
    0          
    0          
476 0         0 my $buf = join "", @$body;
477 0         0 return $self->_write_buf($sock, \$buf);
478             } elsif ( Plack::Util::is_real_fh($body) ) {
479             # real handles use nonblocking IO
480             # either AIO or using watchers, with sendfile or with copying IO
481 0         0 return $self->_write_real_fh($sock, $body);
482             } elsif ( blessed($body) and $body->can("string_ref") ) {
483             # optimize IO::String to not use its incredibly slow getline
484 0 0       0 if ( my $pos = $body->tell ) {
485 0         0 my $str = substr ${ $body->string_ref }, $pos;
  0         0  
486 0         0 return $self->_write_buf($sock, \$str);
487             } else {
488 0         0 return $self->_write_buf($sock, $body->string_ref);
489             }
490             } else {
491 0         0 return $self->_write_fh($sock, $body);
492             }
493             }
494              
495             # like Plack::Util::foreach, but nonblocking on the output
496             # handle
497             sub _write_fh {
498 0     0   0 my ( $self, $sock, $body ) = @_;
499              
500 0         0 my $handle = AnyEvent::Handle->new( fh => $sock );
501 0         0 my $ret = AE::cv;
502              
503             $handle->on_error(sub {
504 0     0   0 my $err = $_[2];
505 0         0 $handle->destroy;
506 0         0 $ret->send($err);
507 0         0 });
508              
509 1     1   6 no warnings 'recursion';
  1         2  
  1         739  
510 0         0 $handle->on_drain( $self->_drain($body, $ret) );
511              
512 0         0 return $ret;
513             }
514              
515             sub _drain {
516 0     0   0 my ($self, $body, $ret) = @_;
517             return sub {
518 0     0   0 my $handle = shift;
519 0         0 local $/ = \ $self->{read_chunk_size};
520 0 0       0 if ( defined( my $buf = $body->getline ) ) {
    0          
521 0 0       0 if (length($buf)) {
522 0         0 $handle->push_write($buf);
523             }
524             else {
525             # if on_drain is called and we don't do any
526             # push_write, anyevent::handle thinks we are done.
527             # this fails for the deflater mw, since one 4096 chunk
528             # of getline might not generate a deflated packet yet,
529             # which gets us an empty string here.
530 0         0 return $self->_drain($body, $ret)->($handle);
531             }
532             } elsif ( $! ) {
533 0         0 $ret->croak($!);
534 0         0 $handle->destroy;
535             } else {
536 0         0 $body->close;
537             $handle->on_drain(sub {
538 0         0 shutdown $handle->fh, 1;
539 0         0 $handle->destroy;
540 0         0 $ret->send(1);
541 0         0 });
542             }
543              
544             }
545 0         0 }
546              
547             # when the body handle is a real filehandle we use this routine, which is more
548             # careful not to block when reading the response too
549              
550             # FIXME support only reading $length bytes from $body, instead of until EOF
551             # FIXME use len = 0 param to sendfile
552             # FIXME use Sys::Sendfile in nonblocking mode if AIO is not available
553             # FIXME test sendfile on non file backed handles
554             sub _write_real_fh {
555 0     0   0 my ( $self, $sock, $body ) = @_;
556              
557 0 0 0     0 if ( HAS_AIO and -s $body ) {
558 0         0 my $cv = AE::cv;
559 0         0 my $offset = 0;
560 0         0 my $length = -s $body;
561 0         0 $sock->blocking(1);
562 0         0 my $sendfile; $sendfile = sub {
563             IO::AIO::aio_sendfile( $sock, $body, $offset, $length - $offset, sub {
564 0         0 my $ret = shift;
565 0 0       0 $offset += $ret if $ret > 0;
566 0 0 0     0 if ($offset >= $length || ($ret == -1 && ! ($! == EAGAIN || $! == EINTR))) {
      0        
      0        
567 0 0       0 if ( $ret == -1 ) {
568 0         0 $cv->croak($!);
569             } else {
570 0         0 $cv->send(1);
571             }
572              
573 0         0 undef $sendfile;
574 0         0 undef $sock;
575             } else {
576 0         0 $sendfile->();
577             }
578 0     0   0 });
579 0         0 };
580 0         0 $sendfile->();
581 0         0 return $cv;
582             } else {
583 0         0 return $self->_write_fh($sock, $body);
584             }
585             }
586              
587             sub run {
588 1     1 0 2 my $self = shift;
589 1         5 $self->register_service(@_);
590              
591 1     1   134 my $w; $w = AE::signal QUIT => sub { $self->{exit_guard}->end; undef $w };
  1         13  
  1         62  
  1         20  
592 1         12 $self->{exit_guard}->recv;
593             }
594              
595             package Twiggy::Writer;
596 1     1   7 use AnyEvent::Handle;
  1         1  
  1         305  
597              
598             sub new {
599 0     0     my ( $class, $socket, $exit ) = @_;
600              
601 0           bless { handle => AnyEvent::Handle->new( fh => $socket ), exit_guard => $exit }, $class;
602             }
603              
604 0     0     sub write { $_[0]{handle}->push_write($_[1]) }
605              
606             sub close {
607 0     0     my $self = shift;
608              
609 0           my $exit_guard = delete $self->{exit_guard};
610 0 0         $exit_guard->end if $exit_guard;
611              
612 0           my $handle = delete $self->{handle};
613 0 0         if ($handle) {
614 0           $handle->on_drain;
615 0           $handle->on_error;
616              
617             $handle->on_drain(sub {
618 0     0     shutdown $_[0]->fh, 1;
619 0           $_[0]->destroy;
620 0           undef $handle;
621 0           });
622             }
623             }
624              
625 0     0     sub DESTROY { $_[0]->close }
626              
627             package Twiggy::Server;
628              
629             1;
630             __END__