File Coverage

blib/lib/Plack/App/Proxy/WebSocket.pm
Criterion Covered Total %
statement 30 93 32.2
branch 0 32 0.0
condition 0 9 0.0
subroutine 10 13 76.9
pod 2 2 100.0
total 42 149 28.1


line stmt bran cond sub pod time code
1             package Plack::App::Proxy::WebSocket;
2             {
3             $Plack::App::Proxy::WebSocket::VERSION = '0.03';
4             }
5             # ABSTRACT: proxy HTTP and WebSocket connections
6              
7 1     1   24850 use warnings FATAL => 'all';
  1         2  
  1         45  
8 1     1   5 use strict;
  1         2  
  1         37  
9              
10 1     1   1205 use AnyEvent::Handle;
  1         32951  
  1         40  
11 1     1   1252 use AnyEvent::Socket;
  1         18939  
  1         156  
12 1     1   1098 use HTTP::Headers;
  1         10138  
  1         38  
13 1     1   893 use HTTP::Request;
  1         15034  
  1         42  
14 1     1   807 use HTTP::Parser::XS qw/parse_http_response HEADERS_AS_HASHREF/;
  1         940  
  1         72  
15 1     1   721 use Plack::Request;
  1         45916  
  1         33  
16 1     1   9 use URI;
  1         2  
  1         19  
17              
18 1     1   5 use parent 'Plack::App::Proxy';
  1         2  
  1         11  
19              
20              
21             sub call {
22 0     0 1   my ($self, $env) = @_;
23 0           my $req = Plack::Request->new($env);
24              
25             # detect a protocol upgrade handshake or just proxy as usual
26 0 0         my $upgrade = $req->header('Upgrade') or return $self->SUPER::call($env);
27              
28 0 0         $env->{'psgi.streaming'} or die "Plack server support for psgi.streaming is required";
29 0 0         my $client_fh = $env->{'psgix.io'} or die "Plack server support for the psgix.io extension is required";
30              
31 0 0         my $url = $self->build_url_from_env($env) or return [502, [], ["Bad Gateway"]];
32 0           my $uri = URI->new($url);
33              
34             sub {
35 0     0     my $res = shift;
36              
37             # set up an event loop if the server is blocking
38 0           my $cv;
39 0 0         unless ($env->{'psgi.nonblocking'}) {
40 0           $env->{'psgi.errors'}->print("Plack server support for psgi.nonblocking is highly recommended.\n");
41 0           $cv = AE::cv;
42             }
43              
44             tcp_connect $uri->host, $uri->port, sub {
45 0           my $server_fh = shift;
46              
47             # return 502 if connection to server fails
48 0 0         unless ($server_fh) {
49 0           $res->([502, [], ["Bad Gateway"]]);
50 0 0         $cv->send if $cv;
51 0           return;
52             }
53              
54 0           my $client = AnyEvent::Handle->new(fh => $client_fh);
55 0           my $server = AnyEvent::Handle->new(fh => $server_fh);
56              
57             # forward request from the client
58 0           my $headers = $self->build_headers_from_env($env, $req, $uri);
59 0           $headers->{Upgrade} = $upgrade;
60 0           $headers->{Connection} = 'Upgrade';
61 0           my $hs = HTTP::Request->new('GET', $uri->path, HTTP::Headers->new(%$headers));
62 0           $hs->protocol($req->protocol);
63 0           $server->push_write($hs->as_string);
64              
65 0           my $buffer = "";
66 0           my $writer;
67              
68             # buffer the exchange between the client and server
69             $client->on_read(sub {
70 0           my $hdl = shift;
71 0           my $buf = delete $hdl->{rbuf};
72 0           $server->push_write($buf);
73 0           });
74             $server->on_read(sub {
75 0           my $hdl = shift;
76 0           my $buf = delete $hdl->{rbuf};
77              
78 0 0         return $writer->write($buf) if $writer;
79 0           $buffer .= $buf;
80              
81 0           my ($ret, $http_version, $status, $message, $headers) =
82             parse_http_response($buffer, HEADERS_AS_HASHREF);
83 0 0         $server->push_shutdown if $ret == -2;
84 0 0         return if $ret < 0;
85              
86 0 0         $headers = [$self->response_headers(HTTP::Headers->new(%$headers))] unless $status == 101;
87 0           $writer = $res->([$status, $headers]);
88 0           $writer->write(substr($buffer, $ret));
89 0           $buffer = undef;
90 0           });
91              
92             # shut down the sockets and exit the loop if an error occurs
93             $client->on_error(sub {
94 0           $client->destroy;
95 0           $server->push_shutdown;
96 0 0         $cv->send if $cv;
97 0 0         $writer->close if $writer;
98 0           });
99             $server->on_error(sub {
100 0           $server->destroy;
101             # get the client handle's attention
102 0           $client->push_shutdown;
103 0           });
104 0           };
105              
106 0 0         $cv->recv if $cv;
107 0           };
108             }
109              
110              
111             sub build_headers_from_env {
112 0     0 1   my ($self, $env, $req, $uri) = @_;
113              
114 0           my $headers = $self->SUPER::build_headers_from_env($env, $req);
115              
116             # if x-forwarded-for already existed, append the remote address; the super
117             # method fails to maintain a list of mutiple proxies
118 0 0         if (my $forwarded_for = $env->{HTTP_X_FORWARDED_FOR}) {
119 0           $headers->{'X-Forwarded-For'} = "$forwarded_for, $env->{REMOTE_ADDR}";
120             }
121              
122             # the super method depends on the user agent to add the host header if it
123             # is missing, so set the host if it needs to be set
124 0 0 0       if ($uri && !$headers->{'Host'}) {
125 0           $headers->{'Host'} = $uri->host_port;
126             }
127              
128 0   0       $headers->{'X-Forwarded-Proto'} ||= $env->{'psgi.url_scheme'};
129 0   0       $headers->{'X-Real-IP'} ||= $env->{REMOTE_ADDR};
130              
131 0           $headers;
132             }
133              
134             1;
135              
136             __END__