File Coverage

blib/lib/Plack/Handler/Gazelle.pm
Criterion Covered Total %
statement 127 181 70.1
branch 30 80 37.5
condition 17 32 53.1
subroutine 24 27 88.8
pod 0 3 0.0
total 198 323 61.3


line stmt bran cond sub pod time code
1             package Plack::Handler::Gazelle;
2              
3 70     70   158646089 use 5.008001;
  70         2352  
4 70     70   2637 use strict;
  70         1806  
  70         7961  
5 70     70   1434 use warnings;
  70         1049  
  70         25248  
6 70     70   2118 use IO::Socket::INET;
  70         28357  
  70         8854  
7 70     70   164221 use Plack::Util;
  70         4770  
  70         4229  
8 70     70   61576 use Stream::Buffered;
  70         816877  
  70         6184  
9 70     70   1574 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  70         9785  
  70         4647  
10 70     70   29619 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  70         267  
  70         9148  
11 70     70   48629 use Parallel::Prefork;
  70         587164  
  70         1464  
12 70     70   55722 use Server::Starter ();
  70         762249  
  70         2957  
13 70     70   41539 use Guard;
  70         79981  
  70         7890  
14              
15             our $VERSION = "0.50";
16              
17 70     70   650 use XSLoader;
  70         190  
  70         5967  
18             XSLoader::load(__PACKAGE__, $VERSION);
19              
20 70     70   601 use constant MAX_REQUEST_SIZE => 131072;
  70         182  
  70         19378  
21 70     70   585 use constant CHUNKSIZE => 64 * 1024;
  70         173  
  70         128714  
22              
23             my $null_io = do { open my $io, "<", \""; $io };
24             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
25              
26             sub new {
27 69     69 0 4946 my($class, %args) = @_;
28              
29             # setup before instantiation
30 69 50       577 if ($args{listen_sock}) {
    50          
31 0         0 $args{host} = $args{listen_sock}->sockhost;
32 0         0 $args{port} = $args{listen_sock}->sockport;
33             }
34             elsif (defined $ENV{SERVER_STARTER_PORT}) {
35 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
36 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
37 0         0 $args{host} = $1;
38 0         0 $args{port} = $2;
39             } else {
40 0         0 $args{port} = $hostport;
41             }
42 0 0       0 $args{listen_sock} = IO::Socket::INET->new(
43             Proto => 'tcp',
44             ) or die "failed to create socket:$!";
45 0 0       0 $args{listen_sock}->fdopen($fd, 'w')
46             or die "failed to bind to listening socket:$!";
47             }
48              
49 69         175 my $max_workers = 10;
50 69         213 for (qw(max_workers workers)) {
51             $max_workers = delete $args{$_}
52 138 100       606 if defined $args{$_};
53             }
54              
55 69 100       254 if ($args{child_exit}) {
56 2 50       308 $args{child_exit} = eval $args{child_exit} unless ref($args{child_exit});
57 2 50       12 die "child_exit is defined but not a code block" if ref($args{child_exit}) ne 'CODE';
58             }
59              
60             my $self = bless {
61             server_software => $args{server_software} || $class,
62       67     server_ready => $args{server_ready} || sub {},
63             listen_sock => $args{listen_sock},
64             host => $args{host} || 0,
65             port => $args{port} || 8080,
66             timeout => $args{timeout} || 300,
67             max_workers => $max_workers,
68       47     child_exit => $args{child_exit} || sub {},
69             min_reqs_per_child => (
70             defined $args{min_reqs_per_child}
71             ? $args{min_reqs_per_child} : undef,
72             ),
73             max_reqs_per_child => (
74             $args{max_reqs_per_child} || $args{max_requests} || 1000,
75             ),
76             spawn_interval => $args{spawn_interval} || 0,
77             err_respawn_interval => (
78             defined $args{err_respawn_interval}
79             ? $args{err_respawn_interval} : undef,
80 69 50 33     3122 ),
    50 66        
      100        
      50        
      50        
      66        
      50        
      50        
81             }, $class;
82              
83 69         769 $self;
84             }
85              
86             sub setup_listener {
87 69     69 0 170 my $self = shift;
88             $self->{listen_sock} ||= IO::Socket::INET->new(
89             Listen => SOMAXCONN,
90             LocalPort => $self->{port},
91             LocalAddr => $self->{host},
92 69 50 33     2859 Proto => 'tcp',
93             ReuseAddr => 1,
94             ) or die "failed to listen to port $self->{port}:$!";
95              
96 69         58607 my $family = Socket::sockaddr_family(getsockname($self->{listen_sock}));
97 69         322 $self->{_listen_sock_is_tcp} = $family != AF_UNIX;
98              
99             # set defer accept
100 69 50 33     1190 if ($^O eq 'linux' && $self->{_listen_sock_is_tcp}) {
101 69         547 setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1);
102             }
103 69         313 $self->{server_ready}->($self);
104             }
105              
106              
107             sub run {
108 69     69 0 3679 my($self, $app) = @_;
109 69         455 $self->setup_listener();
110             # use Parallel::Prefork
111             my %pm_args = (
112             max_workers => $self->{max_workers},
113 69         1037 trap_signals => {
114             HUP => 'TERM',
115             },
116             );
117 69 50       309 if (defined $self->{spawn_interval}) {
118 69         318 $pm_args{trap_signals}{USR1} = [ 'TERM', $self->{spawn_interval} ];
119 69         224 $pm_args{spawn_interval} = $self->{spawn_interval};
120             }
121 69 50       398 if (defined $self->{err_respawn_interval}) {
122 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
123             }
124 69         930 my $pm = Parallel::Prefork->new(\%pm_args);
125              
126             local $SIG{TERM} = sub {
127             #tell the socket we're done reading (stops new connections, existing will continue)
128             $self->{listen_sock}->shutdown(0)
129 20 50   20   7660533 if not defined $ENV{SERVER_STARTER_PORT};
130              
131 20         3537 $pm->signal_received('TERM');
132 20         1133 $pm->signal_all_children('TERM');
133 69         6362 };
134 69         354 while ($pm->signal_received !~ /^(TERM|USR1)$/) {
135             $pm->start(sub{
136 49     49   33865364 srand((rand() * 2 ** 30) ^ $$ ^ time);
137              
138             my $max_reqs_per_child = $self->_calc_minmax_per_child(
139             $self->{max_reqs_per_child},
140             $self->{min_reqs_per_child}
141 49         8387 );
142              
143 49         2460 my $proc_req_count = 0;
144 49         2431 $self->{term_received} = 0;
145             local $SIG{TERM} = sub {
146 29         690 $self->{term_received}++;
147 49         11317 };
148 49         5580 local $SIG{PIPE} = 'IGNORE';
149             PROC_LOOP:
150 49         2218 while ( $proc_req_count < $max_reqs_per_child) {
151 13557 100       163180 if ( $self->{term_received} ) {
152 29         1184 $self->{child_exit}->($self, $app);
153 29         16591 exit 0;
154             }
155 13528 100 100     5426164 if ( my ($conn, $buf, $env) = accept_psgi(
      50        
156             fileno($self->{listen_sock}), $self->{timeout}, $self->{_listen_sock_is_tcp},
157             $self->{host} || 0, $self->{port} || 0
158             ) ) {
159 39         2077 my $guard = guard { close_client($conn) };
  39         1023321  
160 39         163 ++$proc_req_count;
161              
162 39         137 my $res = $bad_response;
163 70     70   1089 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  70         293  
  70         122036  
  39         238  
  39         383  
164             # RFC 7230 ยง3.3.3: Transfer-Encoding takes precedence over
165             # Content-Length. Requests carrying both are rejected upstream
166             # in the XS layer; this ordering is defence-in-depth.
167 39 50       1242 if ( $chunked ) {
    100          
168 0         0 my $buffer = Stream::Buffered->new;
169 0         0 my $chunk_buffer = '';
170 0         0 my $length = 0;
171 0         0 DECHUNK: while(1) {
172 0         0 my $chunk = "";
173 0 0       0 if ( length $buf ) {
174 0         0 $chunk = $buf;
175 0         0 $buf = '';
176             }
177             else {
178             read_timeout(
179             $conn, \$chunk, 16384, 0, $self->{timeout})
180 0 0       0 or next PROC_LOOP;
181             }
182              
183 0         0 $chunk_buffer .= $chunk;
184 0         0 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
185 0         0 my $trailer = $1;
186 0         0 my $chunk_len = hex $2;
187 0 0       0 if ($chunk_len == 0) {
    0          
188 0         0 last DECHUNK;
189             } elsif (length $chunk_buffer < $chunk_len + 2) {
190 0         0 $chunk_buffer = $trailer . $chunk_buffer;
191 0         0 last;
192             }
193 0         0 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
194 0         0 $chunk_buffer =~ s/^\015\012//;
195 0         0 $length += $chunk_len;
196             }
197             }
198 0         0 $env->{CONTENT_LENGTH} = $length;
199 0         0 $env->{'psgi.input'} = $buffer->rewind;
200             } elsif (my $cl = $env->{CONTENT_LENGTH}) {
201 1         77 my $buffer = Stream::Buffered->new($cl);
202 1         174 while ($cl > 0) {
203 1         4 my $chunk = "";
204 1 50       4 if (length $buf) {
205 1         2 $chunk = $buf;
206 1         3 $buf = '';
207             } else {
208             read_timeout(
209             $conn, \$chunk, $cl, 0, $self->{timeout})
210 0 0       0 or next PROC_LOOP;
211             }
212 1         23 $buffer->print($chunk);
213 1         38 $cl -= length $chunk;
214             }
215 1         6 $env->{'psgi.input'} = $buffer->rewind;
216             } else {
217 38         622 $env->{'psgi.input'} = $null_io;
218             }
219             $env->{'psgix.informational'} = sub {
220 1         90 my ($status,$headers) = @_;
221 1         110 write_informational_response($conn, $self->{timeout}, $status, $headers);
222 39         1081 };
223 39         2520 $res = Plack::Util::run_app $app, $env;
224 38 100       5137 my $use_chunked = $env->{"SERVER_PROTOCOL"} eq 'HTTP/1.1' ? 1 : 0;
225 38 50       425 if (ref $res eq 'ARRAY') {
    0          
226 38         36880 $self->_handle_response($res, $conn, $use_chunked);
227             } elsif (ref $res eq 'CODE') {
228             $res->(sub {
229 0         0 $self->_handle_response($_[0], $conn, $use_chunked);
230 0         0 });
231             } else {
232 0         0 die "Bad response $res";
233             }
234 38 100       2066 if ($env->{'psgix.harakiri.commit'}) {
235 19         169 $self->{child_exit}->($self, $app);
236 19         16187 exit 0;
237             }
238             }
239             }
240 69         2464 });
241             }
242 20         7410 while ($pm->wait_all_children(1)) {
243 16         17241546 $pm->signal_all_children('TERM');
244             }
245             }
246              
247             sub _calc_minmax_per_child {
248 49     49   1781 my $self = shift;
249 49         1237 my ($max,$min) = @_;
250 49 50       2575 if (defined $min) {
251 0         0 return $max - int(($max - $min + 1) * rand);
252             } else {
253 49         784 return $max;
254             }
255             }
256              
257             sub _handle_response {
258 38     38   216 my($self, $res, $conn, $use_chunked) = @_;
259 38         234 my $status_code = $res->[0];
260 38         93 my $headers = $res->[1];
261 38         358 my $body = $res->[2];
262              
263 38 50 33     1151 if (defined $body && ref $body eq 'ARRAY' ) {
264 38         48926 write_psgi_response($conn, $self->{timeout}, $status_code, $headers , $body, $use_chunked);
265 38         212 return;
266             }
267              
268 0 0         write_psgi_response_header($conn, $self->{timeout}, $status_code, $headers, [], $use_chunked) or return;
269              
270 0 0         if (defined $body) {
271 0           my $failed;
272             Plack::Util::foreach(
273             $body,
274             sub {
275 0 0   0     return if $failed;
276 0           my $ret;
277 0 0         if ( $use_chunked ) {
278 0           $ret = write_chunk($conn, $_[0], 0, $self->{timeout});
279             }
280             else {
281 0           $ret = write_all($conn, $_[0], 0, $self->{timeout});
282             }
283 0 0         $failed = 1 if ! defined $ret;
284             },
285 0           );
286 0 0         write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
287             } else {
288             return Plack::Util::inline_object
289             write => sub {
290 0 0   0     if ( $use_chunked ) {
291 0           write_chunk($conn, $_[0], 0, $self->{timeout});
292             }
293             else {
294 0           write_all($conn, $_[0], 0, $self->{timeout});
295             }
296             },
297             close => sub {
298 0 0   0     write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
299 0           };
300             }
301             }
302              
303             1;