File Coverage

blib/lib/Mojo/WebSocketProxy/Dispatcher.pm
Criterion Covered Total %
statement 133 143 93.0
branch 25 36 69.4
condition 10 18 55.5
subroutine 31 32 96.8
pod 7 7 100.0
total 206 236 87.2


line stmt bran cond sub pod time code
1             package Mojo::WebSocketProxy::Dispatcher;
2              
3 17     17   101 use strict;
  17         32  
  17         561  
4 17     17   93 use warnings;
  17         32  
  17         476  
5              
6 17     17   80 use Mojo::Base 'Mojolicious::Controller';
  17         27  
  17         86  
7 17     17   9417 use Mojo::WebSocketProxy::Parser;
  17         38  
  17         609  
8 17     17   99 use Mojo::WebSocketProxy::Config;
  17         29  
  17         80  
9              
10 17     17   6502 use Class::Method::Modifiers;
  17         23106  
  17         1023  
11              
12 17     17   107 use JSON::MaybeUTF8 qw(:v1);
  17         30  
  17         1997  
13 17     17   100 use Unicode::Normalize ();
  17         30  
  17         336  
14 17     17   6166 use Future::Mojo 0.004; # ->new_timeout
  17         58253  
  17         533  
15 17     17   6427 use Future::Utils qw(fmap);
  17         30488  
  17         966  
16 17     17   107 use Scalar::Util qw(blessed);
  17         35  
  17         668  
17 17     17   92 use Encode;
  17         48  
  17         1246  
18 17     17   6148 use DataDog::DogStatsd::Helper qw(stats_inc);
  17         39354  
  17         1299  
19              
20 17   100 17   114 use constant TIMEOUT => $ENV{MOJO_WEBSOCKETPROXY_TIMEOUT} || 15;
  17         31  
  17         34020  
21              
22             our $VERSION = '0.13'; ## VERSION
23             around 'send' => sub {
24             my ($orig, $c, $api_response, $req_storage) = @_;
25              
26             my $config = $c->wsp_config->{config};
27              
28             my $max_response_size = $config->{max_response_size};
29             if ($max_response_size && length(encode_json_utf8($api_response)) > $max_response_size) {
30             $api_response->{json} = $c->wsp_error('error', 'ResponseTooLarge', 'Response too large.');
31             }
32              
33             my $before_send_api_response = $config->{before_send_api_response};
34             $_->($c, $req_storage, $api_response->{json})
35             for grep { $_ } (ref $before_send_api_response eq 'ARRAY' ? @{$before_send_api_response} : $before_send_api_response);
36              
37             my $ret = $orig->($c, $api_response);
38              
39             my $after_sent_api_response = $config->{after_sent_api_response};
40             $_->($c, $req_storage) for grep { $_ } (ref $after_sent_api_response eq 'ARRAY' ? @{$after_sent_api_response} : $after_sent_api_response);
41              
42             return $ret;
43             };
44              
45             sub ok {
46 23     23 1 327406 return 1;
47             }
48              
49             sub open_connection {
50 23     23 1 4695 my ($c) = @_;
51              
52 23         132 my $log = $c->app->log;
53 23         201 $log->debug("accepting a websocket connection from " . $c->tx->remote_address);
54              
55             # Enable permessage-deflate
56 23         854 $c->tx->with_compression;
57              
58 23         774 my $config = $c->wsp_config->{config};
59              
60 23 50       107 Mojo::IOLoop->singleton->stream($c->tx->connection)->timeout($config->{stream_timeout}) if $config->{stream_timeout};
61 23 50       96 Mojo::IOLoop->singleton->max_connections($config->{max_connections}) if $config->{max_connections};
62              
63 23 50       72 $config->{opened_connection}->($c) if $config->{opened_connection};
64              
65             $c->on(
66             text => sub {
67 34     34   132754 my ($c, $msg) = @_;
68              
69 34         86 my $original = "$msg";
70             # Incoming data will be JSON-formatted text, as a Unicode string.
71             # We normalize the entire string before decoding.
72              
73 34 50       62 my $decoded = eval { Encode::decode_utf8($msg, Encode::FB_CROAK) } or do {
  34         360  
74 0         0 $c->tx->emit(
75             encoding_error => _get_error_details(
76             code => 'INVALID_UTF8',
77             reason => 'Malformed UTF-8 data',
78             message => $msg
79             ));
80 0         0 return;
81             };
82              
83             # The Unicode::Normalize::NFC check is added as a safety net. However, the error is not triggered so far.
84 34 50       286 my $normalized_msg = eval { Unicode::Normalize::NFC($decoded) } or do {
  34         447  
85 0         0 $c->tx->emit(
86             encoding_error => _get_error_details(
87             code => 'INVALID_UNICODE',
88             reason => 'Malformed Unicode data',
89             message => $msg
90             ));
91 0         0 return;
92             };
93              
94 34 100       71 my $args = eval { decode_json_text($normalized_msg); } or do {
  34         141  
95 1         52 $c->tx->emit(
96             encoding_error => _get_error_details(
97             code => 'INVALID_JSON',
98             reason => 'Malformed JSON data',
99             message => $msg
100             ));
101 1         10 return;
102             };
103              
104 33         1001 on_message($c, $args);
105 23         247 });
106              
107             $c->on(
108             binary => sub {
109 1     1   3850 my ($d, $bytes) = @_;
110 1 50 33     11 $config->{binary_frame}(@_) if $bytes and exists($config->{binary_frame});
111 23         5151 });
112              
113 23 50       1401 $c->on(finish => $config->{finish_connection}) if $config->{finish_connection};
114              
115 23         142 return;
116             }
117              
118             sub on_message {
119 33     33 1 91 my ($c, $args) = @_;
120              
121 33         117 my $config = $c->wsp_config->{config};
122              
123 33         86 my $req_storage = {};
124 33         96 $req_storage->{args} = $args;
125              
126             # We still want to run any hooks even for invalid requests.
127 33 50       147 if (my $err = Mojo::WebSocketProxy::Parser::parse_req($c, $req_storage)) {
128 0         0 $c->send({json => $err}, $req_storage);
129 0   0     0 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
130             }
131              
132 33 100       155 my $action = $c->dispatch($args) or do {
133 2         22 my $err = $c->wsp_error('error', UnrecognisedRequest => 'Unrecognised request');
134 2         69 $c->send({json => $err}, $req_storage);
135 2   50     18 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
136             };
137              
138 31         143 @{$req_storage}{keys %$action} = (values %$action);
  31         134  
139 31         86 $req_storage->{method} = $req_storage->{name};
140              
141             # main processing pipeline
142             my $f = $c->before_forward($req_storage)->transform(
143             done => sub {
144             # Note that we completely ignore the return value of ->before_forward here.
145 23 100   23   1421 return $req_storage->{instead_of_forward}->($c, $req_storage) if $req_storage->{instead_of_forward};
146 21         68 return $c->forward($req_storage);
147             }
148             )->then(
149             sub {
150 23     23   3678 my $result = shift;
151 23         90 return $c->after_forward($result, $req_storage)->transform(done => sub { $result });
  23         1301  
152             },
153             sub {
154 7     7   101484 my $result = shift;
155 7         28 Future->done($result);
156 31         118 });
157              
158             return Future->wait_any(
159             Future::Mojo->new_timeout(TIMEOUT)->else(
160             sub {
161 1     1   1001923 return Future->done($c->wsp_error('error', Timeout => 'Timeout'));
162             }
163             ),
164             $f
165             )->then(
166             sub {
167 31     31   14050 my ($result) = @_;
168 31 100       371 $c->send({json => $result}, $req_storage) if $result;
169 31   50     208 return $c->_run_hooks($config->{after_dispatch} || []);
170             }
171             )->on_fail(
172             sub {
173 0     0   0 $c->app->log->error("An error occurred handling on_message. Error @_");
174 31         2155 })->retain;
175             }
176              
177             sub before_forward {
178 31     31 1 75 my ($c, $req_storage) = @_;
179              
180 31         222 my $config = $c->wsp_config->{config};
181              
182 31         76 my $before_forward_hooks = [];
183              
184             # Global hooks are always first
185 31         84 for ($config, $req_storage) {
186 62 100       211 push @$before_forward_hooks, ref($_->{before_forward}) eq 'ARRAY' ? @{$_->{before_forward}} : $_->{before_forward};
  7         20  
187             }
188              
189             # We always want to clear these after every request.
190 31         58 delete $req_storage->{before_forward};
191              
192 31         96 return $c->_run_hooks($before_forward_hooks, $req_storage);
193             }
194              
195             sub after_forward {
196 23     23 1 56 my ($c, $result, $req_storage) = @_;
197              
198 23         115 my $config = $c->wsp_config->{config};
199 23   50     159 return $c->_run_hooks($config->{after_forward} || [], $result, $req_storage);
200             }
201              
202             sub _run_hooks {
203 87     87   191 my @hook_params = @_;
204 87         136 my $c = shift @hook_params;
205 87         128 my $hooks = shift @hook_params;
206              
207             my $result_f = fmap {
208 19     19   1011 my $hook = shift;
209 19 100       54 my $result = $hook->($c, @hook_params) or return Future->done;
210 9 100 66     253 return $result if blessed($result) && $result->isa('Future');
211 5         29 return Future->fail($result);
212             }
213 87         634 foreach => [grep { defined } @$hooks],
  62         229  
214             concurrent => 1;
215 87         21017 return $result_f;
216             }
217              
218             sub dispatch {
219 33     33 1 83 my ($c, $args) = @_;
220              
221 33         103 my $log = $c->app->log;
222 33         365 $log->debug("websocket got json " . $c->dumper($args));
223              
224             my ($action) =
225 0         0 sort { $a->{order} <=> $b->{order} }
226 33         126 grep { defined }
227 33         4010 map { $c->wsp_config->{actions}->{$_} } keys %$args;
  33         99  
228              
229 33         145 return $action;
230             }
231              
232             sub forward {
233 23     23 1 59 my ($c, $req_storage) = @_;
234              
235 23         70 my $config = $c->wsp_config->{config};
236              
237 23         56 for my $hook (qw/ before_call before_get_rpc_response after_got_rpc_response /) {
238             $req_storage->{$hook} = [
239 71         150 grep { $_ } (ref $config->{$hook} eq 'ARRAY' ? @{$config->{$hook}} : $config->{$hook}),
  0         0  
240 69 50       260 grep { $_ } (ref $req_storage->{$hook} eq 'ARRAY' ? @{$req_storage->{$hook}} : $req_storage->{$hook}),
  69 50       118  
  0         0  
241             ];
242             }
243              
244 23   100     116 my $backend_name = $req_storage->{backend} // "default";
245 23 50       69 my $backend = $c->wsp_config->{backends}{$backend_name}
246             or die "Cannot dispatch request - no backend named '$backend_name'";
247              
248 23         125 $backend->call_rpc($c, $req_storage);
249              
250 23         83 return;
251             }
252              
253             sub _get_error_details {
254 1     1   10 my (%args) = @_;
255              
256             return {
257             error => 'Error Processing Request',
258             details => {
259             error_code => $args{code},
260             reason => $args{reason},
261             request_body => $args{message},
262             },
263 1         10 };
264             }
265              
266             1;
267              
268             __END__