File Coverage

blib/lib/Plack/Handler/Gazelle.pm
Criterion Covered Total %
statement 168 183 91.8
branch 53 78 67.9
condition 19 30 63.3
subroutine 29 29 100.0
pod 0 3 0.0
total 269 323 83.2


line stmt bran cond sub pod time code
1             package Plack::Handler::Gazelle;
2              
3 186     186   201250500 use 5.008001;
  186         1268  
4 186     186   1339 use strict;
  186         897  
  186         9186  
5 186     186   1993 use warnings;
  186         658  
  186         15722  
6 186     186   2771 use IO::Socket::INET;
  186         31994  
  186         10172  
7 186     186   193244 use Plack::Util;
  186         3072  
  186         6362  
8 186     186   89815 use Stream::Buffered;
  186         1064441  
  186         7573  
9 186     186   1705 use POSIX qw(EINTR EAGAIN EWOULDBLOCK);
  186         4549  
  186         5132  
10 186     186   28768 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  186         372  
  186         12189  
11 186     186   93716 use Parallel::Prefork;
  186         883270  
  186         1740  
12 186     186   108040 use Server::Starter ();
  186         1100954  
  186         5947  
13 186     186   2087 use Try::Tiny;
  186         1407  
  186         14155  
14 186     186   97181 use Guard;
  186         103533  
  186         13806  
15              
16             our $VERSION = "0.46";
17              
18 186     186   982 use XSLoader;
  186         256  
  186         6703  
19             XSLoader::load(__PACKAGE__, $VERSION);
20              
21 186     186   724 use constant MAX_REQUEST_SIZE => 131072;
  186         270  
  186         23812  
22 186     186   861 use constant CHUNKSIZE => 64 * 1024;
  186         314  
  186         158852  
23              
24 186     186   1165 my $null_io = do { open my $io, "<", \""; $io };
  186         243  
  186         4810  
25             my $bad_response = [ 400, [ 'Content-Type' => 'text/plain', 'Connection' => 'close' ], [ 'Bad Request' ] ];
26              
27             sub new {
28 185     185 0 15498 my($class, %args) = @_;
29              
30             # setup before instantiation
31 185         356 my $listen_sock;
32 185 50       1575 if (defined $ENV{SERVER_STARTER_PORT}) {
33 0         0 my ($hostport, $fd) = %{Server::Starter::server_ports()};
  0         0  
34 0 0       0 if ($hostport =~ /(.*):(\d+)/) {
35 0         0 $args{host} = $1;
36 0         0 $args{port} = $2;
37             } else {
38 0         0 $args{port} = $hostport;
39             }
40 0 0       0 $listen_sock = IO::Socket::INET->new(
41             Proto => 'tcp',
42             ) or die "failed to create socket:$!";
43 0 0       0 $listen_sock->fdopen($fd, 'w')
44             or die "failed to bind to listening socket:$!";
45             }
46              
47 185         265 my $max_workers = 10;
48 185         469 for (qw(max_workers workers)) {
49             $max_workers = delete $args{$_}
50 370 100       1109 if defined $args{$_};
51             }
52              
53 185 100       1666 if ($args{child_exit}) {
54 2 50       112 $args{child_exit} = eval $args{child_exit} unless ref($args{child_exit});
55 2 50       6 die "child_exit is defined but not a code block" if ref($args{child_exit}) ne 'CODE';
56             }
57              
58             my $self = bless {
59             server_software => $args{server_software} || $class,
60       183     server_ready => $args{server_ready} || sub {},
61             listen_sock => $listen_sock,
62             host => $args{host} || 0,
63             port => $args{port} || 8080,
64             timeout => $args{timeout} || 300,
65             max_workers => $max_workers,
66       165     child_exit => $args{child_exit} || sub {},
67             min_reqs_per_child => (
68             defined $args{min_reqs_per_child}
69             ? $args{min_reqs_per_child} : undef,
70             ),
71             max_reqs_per_child => (
72             $args{max_reqs_per_child} || $args{max_requests} || 1000,
73             ),
74             spawn_interval => $args{spawn_interval} || 0,
75             err_respawn_interval => (
76             defined $args{err_respawn_interval}
77             ? $args{err_respawn_interval} : undef,
78 185 50 33     8877 ),
    50 100        
      100        
      50        
      50        
      100        
      50        
      50        
79             }, $class;
80              
81 185         1336 $self;
82             }
83              
84             sub setup_listener {
85 185     185 0 288 my $self = shift;
86             $self->{listen_sock} ||= IO::Socket::INET->new(
87             Listen => SOMAXCONN,
88             LocalPort => $self->{port},
89             LocalAddr => $self->{host},
90 185 50 33     4363 Proto => 'tcp',
91             ReuseAddr => 1,
92             ) or die "failed to listen to port $self->{port}:$!";
93              
94 185         77043 my $family = Socket::sockaddr_family(getsockname($self->{listen_sock}));
95 185         531 $self->{_listen_sock_is_tcp} = $family != AF_UNIX;
96              
97             # set defer accept
98 185 50 33     1402 if ($^O eq 'linux' && $self->{_listen_sock_is_tcp}) {
99 185         660 setsockopt($self->{listen_sock}, IPPROTO_TCP, 9, 1);
100             }
101 185         567 $self->{server_ready}->($self);
102             }
103              
104              
105             sub run {
106 185     185 0 7781 my($self, $app) = @_;
107 185         457 $self->setup_listener();
108             # use Parallel::Prefork
109             my %pm_args = (
110             max_workers => $self->{max_workers},
111 185         822 trap_signals => {
112             HUP => 'TERM',
113             },
114             );
115 185 50       781 if (defined $self->{spawn_interval}) {
116 185         471 $pm_args{trap_signals}{USR1} = [ 'TERM', $self->{spawn_interval} ];
117 185         278 $pm_args{spawn_interval} = $self->{spawn_interval};
118             }
119 185 50       657 if (defined $self->{err_respawn_interval}) {
120 0         0 $pm_args{err_respawn_interval} = $self->{err_respawn_interval};
121             }
122 185         1201 my $pm = Parallel::Prefork->new(\%pm_args);
123              
124             local $SIG{TERM} = sub {
125             #tell the socket we're done reading (stops new connections, existing will continue)
126             $self->{listen_sock}->shutdown(0)
127 18 50   18   4912424 if not defined $ENV{SERVER_STARTER_PORT};
128              
129 18         25742 $pm->signal_received('TERM');
130 18         445 $pm->signal_all_children('TERM');
131 185         13497 };
132 185         702 while ($pm->signal_received !~ /^(TERM|USR1)$/) {
133             $pm->start(sub{
134 167     167   12271240 srand((rand() * 2 ** 30) ^ $$ ^ time);
135              
136             my $max_reqs_per_child = $self->_calc_minmax_per_child(
137             $self->{max_reqs_per_child},
138             $self->{min_reqs_per_child}
139 167         4211 );
140              
141 167         993 my $proc_req_count = 0;
142 167         2676 $self->{term_received} = 0;
143             local $SIG{TERM} = sub {
144 143         1953 $self->{term_received}++;
145 167         7888 };
146 167         1745 local $SIG{PIPE} = 'IGNORE';
147             PROC_LOOP:
148 167         1472 while ( $proc_req_count < $max_reqs_per_child) {
149 7512 100       34976 if ( $self->{term_received} ) {
150 143         1787 $self->{child_exit}->($self, $app);
151 143         22225 exit 0;
152             }
153 7369 100 100     21341928 if ( my ($conn, $buf, $env) = accept_psgi(
      50        
154             fileno($self->{listen_sock}), $self->{timeout}, $self->{_listen_sock_is_tcp},
155             $self->{host} || 0, $self->{port} || 0
156             ) ) {
157 146         3063 my $guard = guard { close_client($conn) };
  146         1023721  
158 146         355 ++$proc_req_count;
159              
160 146         577 my $res = $bad_response;
161 186     186   1174 my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
  186         457  
  186         169725  
  146         280  
  146         835  
162 146 100       1226 if (my $cl = $env->{CONTENT_LENGTH}) {
    100          
163 9         372 my $buffer = Stream::Buffered->new($cl);
164 9         515 while ($cl > 0) {
165 29         48 my $chunk = "";
166 29 100       85 if (length $buf) {
167 1         1 $chunk = $buf;
168 1         1 $buf = '';
169             } else {
170             read_timeout(
171             $conn, \$chunk, $cl, 0, $self->{timeout})
172 28 50       418 or next PROC_LOOP;
173             }
174 29         94 $buffer->print($chunk);
175 29         691 $cl -= length $chunk;
176             }
177 9         49 $env->{'psgi.input'} = $buffer->rewind;
178             } elsif ( $chunked ) {
179 1         39 my $buffer = Stream::Buffered->new($cl);
180 1         112 my $chunk_buffer = '';
181 1         2 my $length;
182 1         1 DECHUNK: while(1) {
183 6         6 my $chunk = "";
184 6 50       12 if ( length $buf ) {
185 0         0 $chunk = $buf;
186 0         0 $buf = '';
187             }
188             else {
189             read_timeout(
190             $conn, \$chunk, 16384, 0, $self->{timeout})
191 6 50       71 or next PROC_LOOP;
192             }
193              
194 6         18 $chunk_buffer .= $chunk;
195 6         64 while ( $chunk_buffer =~ s/^(([0-9a-fA-F]+).*\015\012)// ) {
196 16         26 my $trailer = $1;
197 16         26 my $chunk_len = hex $2;
198 16 100       47 if ($chunk_len == 0) {
    50          
199 1         12 last DECHUNK;
200             } elsif (length $chunk_buffer < $chunk_len + 2) {
201 0         0 $chunk_buffer = $trailer . $chunk_buffer;
202 0         0 last;
203             }
204 15         39 $buffer->print(substr $chunk_buffer, 0, $chunk_len, '');
205 15         181 $chunk_buffer =~ s/^\015\012//;
206 15         43 $length += $chunk_len;
207             }
208             }
209 1         6 $env->{CONTENT_LENGTH} = $length;
210 1         6 $env->{'psgi.input'} = $buffer->rewind;
211             } else {
212 136         466 $env->{'psgi.input'} = $null_io;
213             }
214 146         3165 $res = Plack::Util::run_app $app, $env;
215 145 100       23868 my $use_chunked = $env->{"SERVER_PROTOCOL"} eq 'HTTP/1.1' ? 1 : 0;
216 145 100       1232 if (ref $res eq 'ARRAY') {
    50          
217 133         790 $self->_handle_response($res, $conn, $use_chunked);
218             } elsif (ref $res eq 'CODE') {
219             $res->(sub {
220 12         480 $self->_handle_response($_[0], $conn, $use_chunked);
221 12         174 });
222             } else {
223 0         0 die "Bad response $res";
224             }
225 145 100       3884 if ($env->{'psgix.harakiri.commit'}) {
226 23         152 $self->{child_exit}->($self, $app);
227 23         1882 exit 0;
228             }
229             }
230             }
231 185         3223 });
232             }
233 18         3296 while ($pm->wait_all_children(1)) {
234 1         1019002 $pm->signal_all_children('TERM');
235             }
236             }
237              
238             sub _calc_minmax_per_child {
239 167     167   1211 my $self = shift;
240 167         929 my ($max,$min) = @_;
241 167 50       2150 if (defined $min) {
242 0         0 return $max - int(($max - $min + 1) * rand);
243             } else {
244 167         868 return $max;
245             }
246             }
247              
248             sub _handle_response {
249 145     145   495 my($self, $res, $conn, $use_chunked) = @_;
250 145         341 my $status_code = $res->[0];
251 145         270 my $headers = $res->[1];
252 145         237 my $body = $res->[2];
253              
254 145 100 100     1646 if (defined $body && ref $body eq 'ARRAY' ) {
255 129         37182 write_psgi_response($conn, $self->{timeout}, $status_code, $headers , $body, $use_chunked);
256 129         465 return;
257             }
258              
259 16 50       2256 write_psgi_response_header($conn, $self->{timeout}, $status_code, $headers, [], $use_chunked) or return;
260              
261 16 100       96 if (defined $body) {
262 10         16 my $failed;
263             Plack::Util::foreach(
264             $body,
265             sub {
266 16 50   16   1495 return if $failed;
267 16         15 my $ret;
268 16 100       53 if ( $use_chunked ) {
269 5         153 $ret = write_chunk($conn, $_[0], 0, $self->{timeout});
270             }
271             else {
272 11         370 $ret = write_all($conn, $_[0], 0, $self->{timeout});
273             }
274 16 50       250 $failed = 1 if ! defined $ret;
275             },
276 10         132 );
277 10 100       8438 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
278             } else {
279             return Plack::Util::inline_object
280             write => sub {
281 14 100   14   622 if ( $use_chunked ) {
282 7         142 write_chunk($conn, $_[0], 0, $self->{timeout});
283             }
284             else {
285 7         184 write_all($conn, $_[0], 0, $self->{timeout});
286             }
287             },
288             close => sub {
289 6 100   6   516 write_all($conn, "0\015\012\015\012", 0, $self->{timeout}) if $use_chunked;
290 6         216 };
291             }
292             }
293              
294             1;
295              
296