File Coverage

blib/lib/Geest.pm
Criterion Covered Total %
statement 38 130 29.2
branch 0 12 0.0
condition 0 4 0.0
subroutine 13 23 56.5
pod 0 6 0.0
total 51 175 29.1


line stmt bran cond sub pod time code
1             package Geest;
2 1     1   3532 use Moo;
  1         27874  
  1         7  
3 1     1   4618 use AnyEvent;
  1         8077  
  1         49  
4 1     1   1589 use AnyEvent::HTTP ();
  1         37421  
  1         42  
5 1     1   1185 use AnyEvent::Socket ();
  1         32619  
  1         46  
6 1     1   1556 use HTTP::Message::PSGI ();
  1         3039  
  1         30  
7 1     1   10 use HTTP::Request;
  1         1  
  1         32  
8 1     1   6 use HTTP::Response;
  1         2  
  1         26  
9 1     1   1278 use Log::Minimal;
  1         34058  
  1         9  
10 1     1   1593 use Plack::Request;
  1         38462  
  1         50  
11 1     1   1468 use Geest::Backend;
  1         3  
  1         44  
12 1     1   9 use constant DEBUG => !!$ENV{GEEST_DEBUG};
  1         2  
  1         163  
13             our $VERSION = '0.02';
14             BEGIN {
15 1     1   3 $Log::Minimal::ENV_DEBUG = "GEEST_DEBUG";
16             $Log::Minimal::PRINT = sub {
17 0         0 my ($time, $type, $message) = @_;
18 0         0 print STDERR "$time [$type] $message\n";
19 1         1570 };
20             }
21              
22             has master => (
23             is => 'rw'
24             );
25              
26             has callbacks => (
27             is => 'lazy',
28             default => sub { +{} }
29             );
30              
31             has backends => (
32             is => 'lazy',
33             default => sub { +{} }
34             );
35              
36             sub on {
37 0     0 0   my ($self, $name, $cb) = @_;
38 0   0       my $list = $self->callbacks->{$name} ||= [];
39 0           push @$list, $cb;
40             }
41              
42             sub fire {
43 0     0 0   my ($self, $name, @args) = @_;
44              
45 0   0       my $list = $self->callbacks->{$name} || [];
46 0           my @ret;
47 0           foreach my $cb (@$list) {
48 0           if (DEBUG) {
49             debugf("Firing callback %s for hook '%s'", $cb, $name);
50             }
51 0           @ret = $cb->(@args);
52             }
53 0           return @ret;
54             }
55              
56             sub add_master {
57 0     0 0   my ($self, $name, %args) = @_;
58 0           $self->master($name);
59 0           $self->add_backend($name, %args);
60             }
61              
62             sub add_backend {
63 0     0 0   my ($self, $name, %args) = @_;
64 0           $self->backends->{$name} = Geest::Backend->new(name => $name, %args);
65             }
66              
67             sub psgi_app {
68 0     0 0   my $self = shift;
69              
70             # Do some sanity checks...
71             # Make sure that we have backends registered
72             {
73 0           my $backends = $self->backends;
  0            
74 0 0         if (keys %$backends < 1) {
75 0           Carp::croak("No backends registered! Can't proceed");
76             }
77              
78 0           my $master = $self->master;
79 0 0         if (! $master) {
80 0           Carp::croak("No master backend registered! Can't proceed");
81             }
82             }
83              
84             return sub {
85 0     0     my $env = shift;
86              
87             # Do away with stuff you don't really need to postpone.
88 0           my $preq = Plack::Request->new($env);
89 0           my $hreq = HTTP::Request->new($preq->method, $preq->uri);
90             $preq->headers->scan(sub {
91 0           my ($k, $v) = @_;
92 0           $hreq->headers->push_header($k, $v);
93 0           });
94 0           $hreq->content($preq->content);
95              
96             return sub {
97 0           my $responder = shift;
98              
99 0           my $backends = $self->backends;
100 0           my ($backend_names) = $self->fire('select_backend', $hreq);
101 0 0         if (! $backend_names) {
102             # You didn't specify any backends for me? hmmm...
103             # Well, then let's just get all the backends...
104 0           $backend_names = [ keys %$backends ];
105             }
106              
107 0           if (DEBUG) {
108             local $Log::Minimal::AUTODUMP = 1;
109             debugf("Backend names: %s", $backend_names);
110             }
111              
112             # Keep track of this request
113 0           my %state = (
114             sent_reply => 0,
115             reply_on_master =>
116 0           !!(grep { $_ eq $self->master } @$backend_names),
117             );
118              
119             # Check at which point we should reply to the client.
120             # If the list of backends contains the master backend,
121             # we honor that. Otherwise, just reply when the earliest
122             # reply comes in
123             # This is where we hold the responses
124 0           my %responses;
125              
126             # When all sub-requests are done, fire the backend_finished
127             # hook. Note that this will most likely fire AFTER the
128             # client has received a response. See $respond_cv below
129             my $main_cv = AE::cv {
130 0           if (DEBUG) {
131             debugf("Received all responses");
132             }
133 0           $self->fire(backend_finished => \%responses);
134              
135             # Explicitly free the response so we don't possibly
136             # hog all the memory
137 0           undef %responses;
138 0           undef %state;
139 0           };
140              
141             # When the master server responds, we reply to the client.
142             # Waiting for all the backends would be silly.
143             my $respond_cv = AE::cv {
144 0           $responder->($_[0]->recv);
145 0           };
146              
147 0           foreach my $backend (map { $backends->{$_} } @$backend_names ) {
  0            
148 0           my %response = (
149             backend => $backend,
150             request => $hreq,
151             response => undef
152             );
153 0           $responses{$backend->name} = \%response;
154              
155 0           $main_cv->begin;
156 0           my $cv = $self->send_backend($backend, $hreq->clone);
157             $cv->cb(sub {
158 0           $response{response} = HTTP::Message::PSGI::res_from_psgi($cv->recv);
159 0           $main_cv->end;
160 0 0         if ($state{sent_reply}) {
161             # We hav ealready sent a reply. short-circuit.
162 0           return;
163             }
164              
165 0 0         if ($state{reply_on_master}) {
166             # check if this is master
167 0 0         if ($backend->name eq $self->master) {
168 0           if (DEBUG) {
169             debugf("Received response from '%s' (master), replying to client", $backend->name);
170             }
171 0           $respond_cv->send($cv->recv);
172 0           $state{sent_reply}++;
173             }
174             } else {
175             # Nothing specified, just reply
176 0           if (DEBUG) {
177             debugf("Received response from '%s', replying to client", $backend->name);
178             }
179 0           $respond_cv->send($cv->recv);
180 0           $state{sent_reply}++;
181             }
182 0           });
183             }
184 0           };
185             }
186 0           }
187              
188             sub send_backend {
189 0     0 0   my ($self, $backend, $req) = @_;
190              
191 0           if (DEBUG) {
192             debugf("Sending %s '%s %s'", $backend->name, $req->method, $req->uri);
193             }
194              
195 0           $self->fire(munge_request => ($backend, $req));
196              
197 0           my %headers;
198             $req->headers->scan(sub {
199 0     0     my ($key, $value) = @_;
200 0           $headers{$key} = $value;
201 0           });
202              
203 0           my $cv = AE::cv;
204 0           my $guard; $guard = AnyEvent::HTTP::http_request(
205             $req->method,
206             $req->uri,
207             headers => \%headers,
208             recurse => 0,
209             persistent => 0,
210             tcp_connect => sub {
211             # Override tcp_connect so we connect to the specified
212             # backend instead of the request url
213 0     0     my ($host, $port, $connect_cb, $prepare_cb) = @_;
214 0           if (DEBUG) {
215             debugf("Connecting to %s => %s:%s", $backend->name, $backend->host, $backend->port);
216             }
217             return AnyEvent::Socket::tcp_connect(
218             $backend->host,
219             $backend->port,
220             sub {
221 0           if (DEBUG) {
222             debugf("Connected to %s => %s:%s", $backend->name, $backend->host, $backend->port);
223             }
224 0           $connect_cb->(@_),
225             },
226 0           $prepare_cb
227             );
228             },
229             sub {
230             # Free me, baby.
231 0     0     undef $guard;
232              
233 0           if (DEBUG) {
234             debugf("Received response from %s => code = %s, message = %s", $backend->name, $_[1]->{Status}, $_[1]->{Reason});
235             }
236             # Remove these pseudo-headers from AE::HTTP
237 0           delete $_[1]->{URL};
238 0           delete $_[1]->{HTTPVersion};
239 0           delete $_[1]->{Reason};
240              
241             # Notify the condvar with a PSGI response.
242 0           $cv->send([
243             delete $_[1]->{Status},
244 0           [ %{$_[1]} ],
245             [ $_[0] ]
246             ]);
247             }
248 0           );
249 0           return $cv;
250             }
251              
252              
253 1     1   10 no Moo;
  1         3  
  1         7  
254              
255             1;
256              
257             __END__