File Coverage

blib/lib/Twiggy/Server.pm
Criterion Covered Total %
statement 198 362 54.7
branch 41 130 31.5
condition 14 75 18.6
subroutine 43 64 67.1
pod 0 4 0.0
total 296 635 46.6


line stmt bran cond sub pod time code
1             package Twiggy::Server;
2 1     1   6 use strict;
  1         10  
  1         43  
3 1     1   7 use warnings;
  1         2  
  1         102  
4              
5 1     1   7 use Scalar::Util qw(blessed weaken);
  1         2  
  1         242  
6 1     1   8 use Try::Tiny;
  1         10  
  1         112  
7 1     1   9 use Carp;
  1         12  
  1         163  
8              
9 1     1   8 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  1         4  
  1         170  
10 1     1   9 use Errno qw(EAGAIN EINTR);
  1         8  
  1         136  
11 1     1   7 use IO::Handle;
  1         2  
  1         113  
12              
13 1     1   8 use AnyEvent;
  1         41  
  1         83  
14 1     1   37 use AnyEvent::Handle;
  1         4  
  1         57  
15 1     1   734 use AnyEvent::Socket;
  1         16938  
  1         261  
16 1     1   9 use AnyEvent::Util qw(WSAEWOULDBLOCK);
  1         3  
  1         48  
17              
18 1     1   549 use HTTP::Status;
  1         4891  
  1         256  
19 1     1   494 use Plack::HTTPParser qw(parse_http_request);
  1         4336  
  1         88  
20 1     1   9 use Plack::Util;
  1         2  
  1         51  
21              
22 1     1   8 use constant DEBUG => $ENV{TWIGGY_DEBUG};
  1         3  
  1         305  
23             use constant HAS_AIO => !$ENV{PLACK_NO_SENDFILE} && try {
24             require AnyEvent::AIO;
25             require IO::AIO;
26             1;
27 1   33 1   8 };
  1         2  
  1         24  
28              
29 1     1   8 open my $null_io, '<', \'';
  1         1  
  1         36  
30              
31             sub new {
32 1     1 0 6 my($class, @args) = @_;
33              
34 1         7 return bless {
35             no_delay => 1,
36             timeout => 300,
37             read_chunk_size => 4096,
38             @args,
39             }, $class;
40             }
41              
42             sub start_listen {
43 1     1 0 3 my ($self, $app) = @_;
44 1 50 50     2 my @listen = @{$self->{listen} || [ ($self->{host} || '') . ":" . ($self->{port} || 0) ]};
  1   50     15  
45 1         5 for my $listen (@listen) {
46 1         3 push @{$self->{listen_guards}}, $self->_create_tcp_server($listen, $app);
  1         3  
47             }
48             }
49              
50             sub register_service {
51 1     1 0 2 my($self, $app) = @_;
52              
53 1         3 $self->start_listen($app);
54              
55             $self->{exit_guard} = AE::cv {
56             # Make sure that we are not listening on a socket anymore, while
57             # other events are being flushed
58 1     1   63 delete $self->{listen_guards};
59 1         5662 };
60 1         223 $self->{exit_guard}->begin;
61             }
62              
63             sub _create_tcp_server {
64 1     1   3 my ( $self, $listen, $app ) = @_;
65              
66 1         2 my($host, $port, $is_tcp);
67 1 50       9 if ($listen =~ /:\d+$/) {
68 1         4 ($host, $port) = split /:/, $listen;
69 1 50       4 $host = undef if $host eq '';
70 1 50       7 $port = undef if $port == 0;
71 1         2 $is_tcp = 1;
72             } else {
73 0         0 $host = "unix/";
74 0         0 $port = $listen;
75             }
76              
77 1         3 my($listen_host, $listen_port);
78              
79 1         5 return tcp_server $host, $port, $self->_accept_handler($app, $is_tcp, \$listen_host, \$listen_port),
80             $self->_accept_prepare_handler(\$listen_host, \$listen_port);
81             }
82              
83             sub _accept_prepare_handler {
84 1     1   2 my($self, $listen_host_r, $listen_port_r) = @_;
85              
86             return sub {
87 1     1   233 my ( $fh, $host, $port ) = @_;
88 1         2 DEBUG && warn "Listening on $host:$port\n";
89 1         3 $$listen_host_r = $host;
90 1         1 $$listen_port_r = $port;
91             $self->{server_ready}->({
92             host => $host,
93             port => $port,
94             server_software => 'Twiggy',
95 1 50       4 }) if $self->{server_ready};
96              
97 1   50     6 return $self->{backlog} || 0;
98 1         6 };
99             }
100              
101             sub _accept_handler {
102 1     1   2 my ( $self, $app, $is_tcp, $listen_host_r, $listen_port_r ) = @_;
103              
104             return sub {
105 2     2   22730 my ( $sock, $peer_host, $peer_port ) = @_;
106              
107 2         4 DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
108 2 50       7 return unless $sock;
109 2         12 $self->{exit_guard}->begin;
110              
111 2 50 33     21 if ( $is_tcp && $self->{no_delay} ) {
112 2 50       22 setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
113             or die "setsockopt(TCP_NODELAY) failed:$!";
114             }
115              
116 2         6 my $headers = "";
117             my $try_parse = sub {
118 2 50       7 if ( $self->_try_read_headers($sock, $headers) ) {
119 1         33 my $env = {
120             SERVER_NAME => $$listen_host_r,
121             SERVER_PORT => $$listen_port_r,
122             SCRIPT_NAME => '',
123             REMOTE_ADDR => $peer_host,
124             'psgi.version' => [ 1, 0 ],
125             'psgi.errors' => *STDERR,
126             'psgi.url_scheme' => 'http',
127             'psgi.nonblocking' => Plack::Util::TRUE,
128             'psgi.streaming' => Plack::Util::TRUE,
129             'psgi.run_once' => Plack::Util::FALSE,
130             'psgi.multithread' => Plack::Util::FALSE,
131             'psgi.multiprocess' => Plack::Util::FALSE,
132             'psgi.input' => undef, # will be set by _run_app()
133             'psgix.io' => $sock,
134             'psgix.input.buffered' => Plack::Util::TRUE,
135             };
136              
137 1         7 my $reqlen = parse_http_request($headers, $env);
138 1         249 DEBUG && warn "$sock Parsed HTTP headers: request length=$reqlen\n";
139              
140 1 50       4 if ( $reqlen < 0 ) {
141 0         0 die "bad request";
142             } else {
143 1         6 return $env;
144             }
145             }
146              
147 0         0 return;
148 2         14 };
149              
150 2         6 local $@;
151 2 100       4 unless ( eval {
152 2 50       5 if ( my $env = $try_parse->() ) {
153             # the request data is already available, no need to parse more
154 1         4 $self->_run_app($app, $env, $sock);
155             } else {
156             # there's not yet enough data to parse the request,
157             # set up a watcher
158 0         0 $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
159             };
160              
161 1         16 1;
162             }) {
163 1         6 my $disconnected = ($@ =~ /^client disconnected/);
164 1         4 $self->_bad_request($sock, $disconnected);
165             }
166 1         7 };
167             }
168              
169             # returns a closure that tries to parse
170             # this is not a method because it needs a buffer per socket
171             sub _try_read_headers {
172 2     2   5 my ( $self, $sock, undef ) = @_;
173              
174             # FIXME add a timer to manage read timeouts
175 2         11 local $/ = "\012";
176              
177 2         5 read_more: for my $headers ( $_[2] ) {
178 5 100 0     51 if ( defined(my $line = <$sock>) ) {
    50 0        
    50 33        
179 4         10 $headers .= $line;
180              
181 4 100 66     14 if ( $line eq "\015\012" or $line eq "\012" ) {
182             # got an empty line, we're done reading the headers
183 1         7 return 1;
184             } else {
185             # try to read more lines using buffered IO
186 3         6 redo read_more;
187             }
188             } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {
189 0         0 die $!;
190             } elsif (!$!) {
191 1         16 die "client disconnected";
192             }
193             }
194              
195 0         0 DEBUG && warn "$sock did not read to end of req, wait for more data to arrive\n";
196 0         0 return;
197             }
198              
199             sub _create_req_parsing_watcher {
200 0     0   0 my ( $self, $sock, $try_parse, $app ) = @_;
201              
202 0         0 my $headers_io_watcher;
203              
204             my $timeout_timer = AE::timer $self->{timeout}, 0, sub {
205 0     0   0 DEBUG && warn "$sock Timeout\n";
206 0         0 undef $headers_io_watcher;
207 0         0 undef $try_parse;
208 0         0 undef $sock;
209 0 0       0 } if $self->{timeout};
210              
211             $headers_io_watcher = AE::io $sock, 0, sub {
212             try {
213 0 0       0 if ( my $env = $try_parse->() ) {
214 0         0 undef $headers_io_watcher;
215 0         0 undef $timeout_timer;
216 0         0 $self->_run_app($app, $env, $sock);
217             }
218             } catch {
219 0         0 undef $headers_io_watcher;
220 0         0 undef $timeout_timer;
221 0         0 my $disconnected = /^client disconnected/;
222 0         0 $self->_bad_request($sock, $disconnected);
223             }
224 0     0   0 };
  0         0  
225             }
226              
227             sub _bad_request {
228 1     1   3 my ( $self, $sock, $disconnected ) = @_;
229              
230 1 50 33     7 return unless defined $sock and defined fileno $sock;
231              
232 1         4 my $response = [
233             400,
234             [ 'Content-Type' => 'text/plain' ],
235             [ ],
236             ];
237              
238             # if client is already gone, don't try to write to it
239 1 50       4 $response = [] if $disconnected;
240              
241 1         11 $self->_write_psgi_response($sock, $response);
242              
243 1         84 return;
244             }
245              
246             sub _read_chunk {
247 0     0   0 my ($self, $sock, $remaining, $cb) = @_;
248              
249 0         0 my $data = '';
250 0         0 my $read_chunk_size = $self->{read_chunk_size};
251              
252             my $try_read = sub {
253             READ_MORE: {
254 0 0   0   0 my $read_size = $remaining > $read_chunk_size ? $read_chunk_size : $remaining;
  0         0  
255 0         0 my $rlen = read($sock, $data, $read_size, length($data));
256              
257 0 0 0     0 if (defined $rlen and $rlen > 0) {
    0 0        
    0 0        
    0 0        
258 0         0 $remaining -= $rlen;
259              
260 0 0       0 if ($remaining <= 0) {
261 0         0 $cb->($data);
262 0         0 return 1;
263             } else {
264 0         0 redo READ_MORE;
265             }
266             } elsif (defined $rlen) {
267 0         0 $cb->($data);
268 0         0 return 1;
269             } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
270 0         0 die $!;
271             } elsif (!$!) {
272 0         0 die "client disconnected";
273             }
274             }
275              
276 0         0 return;
277 0         0 };
278              
279 0 0       0 unless ($try_read->()) {
280 0         0 my $rw; $rw = AE::io($sock, 0, sub {
281             try {
282 0 0       0 if ($try_read->()) {
283 0         0 undef $rw;
284             }
285             } catch {
286 0         0 undef $rw;
287 0         0 $self->_bad_request($sock);
288 0     0   0 };
289 0         0 });
290             }
291             }
292              
293             sub _run_app {
294 1     1   3 my($self, $app, $env, $sock) = @_;
295              
296 1 50       18 unless ($env->{'psgi.input'}) {
297 1 50       5 if ($env->{CONTENT_LENGTH}) {
298             $self->_read_chunk($sock, $env->{CONTENT_LENGTH}, sub {
299 0     0   0 my ($data) = @_;
300 0         0 open my $input, '<', \$data;
301 0         0 $env->{'psgi.input'} = $input;
302 0         0 $self->_run_app($app, $env, $sock);
303 0         0 });
304 0         0 return;
305             } else {
306 1         3 $env->{'psgi.input'} = $null_io;
307             }
308             }
309              
310 1         20 my $res = Plack::Util::run_app $app, $env;
311              
312 1 50 33     106 if ( ref $res eq 'ARRAY' ) {
    50          
    50          
313 0         0 $self->_write_psgi_response($sock, $res);
314             } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
315 0         0 Carp::carp("Returning AnyEvent condvar is deprecated and will be removed in the next release of Twiggy. Use the streaming callback interface intstead.");
316 0     0   0 $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
  0         0  
317             } elsif ( ref $res eq 'CODE' ) {
318 1         3 my $created_writer;
319              
320             $res->(
321             sub {
322 1     1   1421 my $res = shift;
323              
324 1 50       7 if ( @$res < 2 ) {
    50          
325 0         0 croak "Insufficient arguments";
326             } elsif ( @$res == 2 ) {
327 0         0 my ( $status, $headers ) = @$res;
328              
329 0         0 $self->_flush($sock);
330              
331 0         0 my $writer = Twiggy::Writer->new($sock, $self->{exit_guard});
332 0         0 $created_writer = 1;
333              
334 0         0 my $buf = $self->_format_headers($status, $headers);
335 0         0 $writer->write($$buf);
336              
337 0         0 return $writer;
338             } else {
339 1         4 my ( $status, $headers, $body, $post ) = @$res;
340 1         4 my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
341 1 50       9 $cv->cb(sub { $post->() }) if $post;
  0         0  
342             }
343             },
344 1         25 $sock,
345             );
346              
347 1 50       401 if($created_writer) {
348 0         0 $self->{exit_guard}->end; # normally _write_psgi_response calls this, but it doesn't get called when we use a writer!
349             }
350             } else {
351 0         0 croak("Unknown response type: $res");
352             }
353             }
354              
355             sub _write_psgi_response {
356 2     2   6 my ( $self, $sock, $res ) = @_;
357              
358 2 50       7 if ( ref $res eq 'ARRAY' ) {
359 2 100       7 if ( scalar @$res == 0 ) {
360             # no response
361 1         62 $self->{exit_guard}->end;
362 1         8 return;
363             }
364              
365 1         3 my ( $status, $headers, $body ) = @$res;
366              
367 1         31 my $cv = AE::cv;
368              
369             $self->_write_headers( $sock, $status, $headers )->cb(sub {
370 1     1   9 local $@;
371 1 50       2 if ( eval { $_[0]->recv; 1 } ) {
  1         5  
  1         11  
372             $self->_write_body($sock, $body)->cb(sub {
373 1         18 shutdown $sock, 1;
374 1         44 close $sock;
375 1         8 $self->{exit_guard}->end;
376 1         62 local $@;
377 1 50       3 eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
  1         4  
  0         0  
378 1         5 });
379             } else {
380 0         0 $self->{exit_guard}->end;
381 0 0       0 eval { $cv->send($_[0]->recv); 1 } or $cv->croak($@);
  0         0  
  0         0  
382             }
383 1         12 });
384              
385 1         574 return $cv;
386             } else {
387 1     1   2622 no warnings 'uninitialized';
  1         2  
  1         437  
388 0         0 warn "Unknown response type: $res";
389 0         0 return $self->_write_psgi_response($sock, [ 204, [], [] ]);
390             }
391             }
392              
393             sub _write_headers {
394 1     1   5 my ( $self, $sock, $status, $headers ) = @_;
395              
396 1         4 $self->_write_buf( $sock, $self->_format_headers($status, $headers) );
397             }
398              
399             sub _format_headers {
400 1     1   3 my ( $self, $status, $headers ) = @_;
401              
402 1         8 my $hdr = sprintf "HTTP/1.0 %d %s\015\012", $status, HTTP::Status::status_message($status);
403              
404 1         23 my $i = 0;
405              
406 1         6 my @delim = ("\015\012", ": ");
407              
408 1         4 foreach my $str ( @$headers ) {
409 4         2424 $hdr .= $str . $delim[++$i % 2];
410             }
411              
412 1         37 $hdr .= "\015\012";
413              
414 1         9 return \$hdr;
415             }
416              
417             # this flushes just the output buffer, not the input buffer (unlike
418             # $handle->flush)
419             sub _flush {
420 0     0   0 my ( $self, $sock ) = @_;
421              
422 0         0 local $| = 1;
423 0         0 print $sock '';
424             }
425              
426             # helper routine, similar to push write, but respects buffering, and refcounts
427             # itself
428             sub _write_buf {
429 2     2   5 my($self, $socket, $data) = @_;
430              
431 1     1   10 no warnings 'uninitialized';
  1         1  
  1         602  
432              
433             # try writing immediately
434 2 100       447 if ( (my $written = syswrite($socket, $$data)) < length($$data) ) {
    50          
435 1   33     41 my $done = defined(wantarray) && AE::cv;
436              
437             # either the write failed or was incomplete
438              
439 1 50 33     41 if ( !defined($written) and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
      33        
      33        
440             # a real write error occured, like EPIPE
441 1 50       14 $done->croak($!) if $done;
442 1         25 return $done;
443             }
444              
445             # the write was either incomplete or a non fatal error occured, so we
446             # need to set up an IO watcher to wait until we can properly write
447              
448 0         0 my $length = length($$data);
449              
450 0         0 my $write_watcher;
451             $write_watcher = AE::io $socket, 1, sub {
452             write_more: {
453 0     0   0 my $out = syswrite($socket, $$data, $length - $written, $written);
  0         0  
454              
455 0 0 0     0 if ( defined($out) ) {
    0 0        
456 0         0 $written += $out;
457              
458 0 0       0 if ( $written == $length ) {
459 0         0 undef $write_watcher;
460 0 0       0 $done->send(1) if $done;
461             } else {
462 0         0 redo write_more;
463             }
464             } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
465 0 0       0 $done->croak($!) if $done;
466 0         0 undef $write_watcher;
467             }
468             }
469 0         0 };
470              
471 0         0 return $done;
472             } elsif ( defined wantarray ) {
473 1         33 my $done = AE::cv;
474 1         13 $done->send(1);
475 1         26 return $done;
476             }
477             }
478              
479             sub _write_body {
480 1     1   4 my ( $self, $sock, $body ) = @_;
481              
482 1 50 0     5 if ( ref $body eq 'ARRAY' ) {
    0          
    0          
483 1         4 my $buf = join "", @$body;
484 1         4 return $self->_write_buf($sock, \$buf);
485             } elsif ( Plack::Util::is_real_fh($body) ) {
486             # real handles use nonblocking IO
487             # either AIO or using watchers, with sendfile or with copying IO
488 0         0 return $self->_write_real_fh($sock, $body);
489             } elsif ( blessed($body) and $body->can("string_ref") ) {
490             # optimize IO::String to not use its incredibly slow getline
491 0 0       0 if ( my $pos = $body->tell ) {
492 0         0 my $str = substr ${ $body->string_ref }, $pos;
  0         0  
493 0         0 return $self->_write_buf($sock, \$str);
494             } else {
495 0         0 return $self->_write_buf($sock, $body->string_ref);
496             }
497             } else {
498 0         0 return $self->_write_fh($sock, $body);
499             }
500             }
501              
502             # like Plack::Util::foreach, but nonblocking on the output
503             # handle
504             sub _write_fh {
505 0     0   0 my ( $self, $sock, $body ) = @_;
506              
507 0         0 my $handle = AnyEvent::Handle->new( fh => $sock );
508 0         0 my $ret = AE::cv;
509              
510             $handle->on_error(sub {
511 0     0   0 my $err = $_[2];
512 0         0 $handle->destroy;
513 0         0 $ret->send($err);
514 0         0 });
515              
516 1     1   8 no warnings 'recursion';
  1         1  
  1         630  
517 0         0 $handle->on_drain( $self->_drain($body, $ret) );
518              
519 0         0 return $ret;
520             }
521              
522             sub _drain {
523 0     0   0 my ($self, $body, $ret) = @_;
524             return sub {
525 0     0   0 my $handle = shift;
526 0         0 local $/ = \ $self->{read_chunk_size};
527 0 0       0 if ( defined( my $buf = $body->getline ) ) {
    0          
528 0 0       0 if (length($buf)) {
529 0         0 $handle->push_write($buf);
530             }
531             else {
532             # if on_drain is called and we don't do any
533             # push_write, anyevent::handle thinks we are done.
534             # this fails for the deflater mw, since one 4096 chunk
535             # of getline might not generate a deflated packet yet,
536             # which gets us an empty string here.
537 0         0 return $self->_drain($body, $ret)->($handle);
538             }
539             } elsif ( $! ) {
540 0         0 $ret->croak($!);
541 0         0 $handle->destroy;
542             } else {
543 0         0 $body->close;
544             $handle->on_drain(sub {
545 0         0 shutdown $handle->fh, 1;
546 0         0 $handle->destroy;
547 0         0 $ret->send(1);
548 0         0 });
549             }
550              
551             }
552 0         0 }
553              
554             # when the body handle is a real filehandle we use this routine, which is more
555             # careful not to block when reading the response too
556              
557             # FIXME support only reading $length bytes from $body, instead of until EOF
558             # FIXME use len = 0 param to sendfile
559             # FIXME use Sys::Sendfile in nonblocking mode if AIO is not available
560             # FIXME test sendfile on non file backed handles
561             sub _write_real_fh {
562 0     0   0 my ( $self, $sock, $body ) = @_;
563              
564 0 0 0     0 if ( HAS_AIO and -s $body ) {
565 0         0 my $cv = AE::cv;
566 0         0 my $offset = 0;
567 0         0 my $length = -s $body;
568 0         0 $sock->blocking(1);
569 0         0 my $sendfile; $sendfile = sub {
570             IO::AIO::aio_sendfile( $sock, $body, $offset, $length - $offset, sub {
571 0         0 my $ret = shift;
572 0 0       0 $offset += $ret if $ret > 0;
573 0 0 0     0 if ($offset >= $length || ($ret == -1 && ! ($! == EAGAIN || $! == EINTR))) {
      0        
      0        
574 0 0       0 if ( $ret == -1 ) {
575 0         0 $cv->croak($!);
576             } else {
577 0         0 $cv->send(1);
578             }
579              
580 0         0 undef $sendfile;
581 0         0 undef $sock;
582             } else {
583 0         0 $sendfile->();
584             }
585 0     0   0 });
586 0         0 };
587 0         0 $sendfile->();
588 0         0 return $cv;
589             } else {
590 0         0 return $self->_write_fh($sock, $body);
591             }
592             }
593              
594             sub run {
595 1     1 0 3 my $self = shift;
596 1         7 $self->register_service(@_);
597              
598 1     1   40 my $w; $w = AE::signal QUIT => sub { $self->{exit_guard}->end; undef $w };
  1         13  
  1         57  
  1         18  
599 1         9 $self->{exit_guard}->recv;
600             }
601              
602             package Twiggy::Writer;
603 1     1   9 use AnyEvent::Handle;
  1         1  
  1         283  
604              
605             sub new {
606 0     0     my ( $class, $socket, $exit ) = @_;
607              
608 0 0         $exit->begin if $exit;
609              
610 0           bless { handle => AnyEvent::Handle->new( fh => $socket ), exit_guard => $exit }, $class;
611             }
612              
613 0     0     sub write { $_[0]{handle}->push_write($_[1]) }
614              
615             sub close {
616 0     0     my $self = shift;
617              
618 0           my $exit_guard = delete $self->{exit_guard};
619 0 0         $exit_guard->end if $exit_guard;
620              
621 0           my $handle = delete $self->{handle};
622 0 0         if ($handle) {
623 0           $handle->on_drain;
624 0           $handle->on_error;
625              
626             $handle->on_drain(sub {
627 0     0     shutdown $_[0]->fh, 1;
628 0           $_[0]->destroy;
629 0           undef $handle;
630 0           });
631             }
632             }
633              
634 0     0     sub DESTROY { $_[0]->close }
635              
636             package Twiggy::Server;
637              
638             1;
639             __END__