File Coverage

blib/lib/Mojo/WebSocketProxy/Dispatcher.pm
Criterion Covered Total %
statement 133 143 93.0
branch 25 36 69.4
condition 10 18 55.5
subroutine 31 32 96.8
pod 7 7 100.0
total 206 236 87.2


line stmt bran cond sub pod time code
1             package Mojo::WebSocketProxy::Dispatcher;
2              
3 17     17   98 use strict;
  17         32  
  17         436  
4 17     17   78 use warnings;
  17         27  
  17         387  
5              
6 17     17   71 use Mojo::Base 'Mojolicious::Controller';
  17         42  
  17         81  
7 17     17   8700 use Mojo::WebSocketProxy::Parser;
  17         40  
  17         527  
8 17     17   97 use Mojo::WebSocketProxy::Config;
  17         33  
  17         84  
9              
10 17     17   6581 use Class::Method::Modifiers;
  17         21877  
  17         970  
11              
12 17     17   108 use JSON::MaybeUTF8 qw(:v1);
  17         32  
  17         2089  
13 17     17   109 use Unicode::Normalize ();
  17         31  
  17         346  
14 17     17   6274 use Future::Mojo 0.004; # ->new_timeout
  17         58004  
  17         533  
15 17     17   6519 use Future::Utils qw(fmap);
  17         30789  
  17         1026  
16 17     17   110 use Scalar::Util qw(blessed);
  17         33  
  17         647  
17 17     17   98 use Encode;
  17         49  
  17         1213  
18 17     17   6150 use DataDog::DogStatsd::Helper qw(stats_inc);
  17         40218  
  17         1236  
19              
20 17   100 17   121 use constant TIMEOUT => $ENV{MOJO_WEBSOCKETPROXY_TIMEOUT} || 15;
  17         30  
  17         34175  
21              
22             our $VERSION = '0.12'; ## VERSION
23             around 'send' => sub {
24             my ($orig, $c, $api_response, $req_storage) = @_;
25              
26             my $config = $c->wsp_config->{config};
27              
28             my $max_response_size = $config->{max_response_size};
29             if ($max_response_size && length(encode_json_utf8($api_response)) > $max_response_size) {
30             $api_response->{json} = $c->wsp_error('error', 'ResponseTooLarge', 'Response too large.');
31             }
32              
33             my $before_send_api_response = $config->{before_send_api_response};
34             $_->($c, $req_storage, $api_response->{json})
35             for grep { $_ } (ref $before_send_api_response eq 'ARRAY' ? @{$before_send_api_response} : $before_send_api_response);
36              
37             my $ret = $orig->($c, $api_response);
38              
39             my $after_sent_api_response = $config->{after_sent_api_response};
40             $_->($c, $req_storage) for grep { $_ } (ref $after_sent_api_response eq 'ARRAY' ? @{$after_sent_api_response} : $after_sent_api_response);
41              
42             return $ret;
43             };
44              
45             sub ok {
46 23     23 1 243986 return 1;
47             }
48              
49             sub open_connection {
50 23     23 1 4006 my ($c) = @_;
51              
52 23         119 my $log = $c->app->log;
53 23         195 $log->debug("accepting a websocket connection from " . $c->tx->remote_address);
54              
55             # Enable permessage-deflate
56 23         717 $c->tx->with_compression;
57              
58 23         773 my $config = $c->wsp_config->{config};
59              
60 23 50       94 Mojo::IOLoop->singleton->stream($c->tx->connection)->timeout($config->{stream_timeout}) if $config->{stream_timeout};
61 23 50       88 Mojo::IOLoop->singleton->max_connections($config->{max_connections}) if $config->{max_connections};
62              
63 23 50       72 $config->{opened_connection}->($c) if $config->{opened_connection};
64              
65             $c->on(
66             text => sub {
67 34     34   119810 my ($c, $msg) = @_;
68              
69 34         90 my $original = "$msg";
70             # Incoming data will be JSON-formatted text, as a Unicode string.
71             # We normalize the entire string before decoding.
72              
73 34 50       69 my $decoded = eval { Encode::decode_utf8($msg, Encode::FB_CROAK) } or do {
  34         405  
74 0         0 $c->tx->emit(
75             encoding_error => _get_error_details(
76             code => 'INVALID_UTF8',
77             reason => 'Malformed UTF-8 data',
78             message => $msg
79             ));
80 0         0 return;
81             };
82              
83             # The Unicode::Normalize::NFC check is added as a safety net. However, the error is not triggered so far.
84 34 50       322 my $normalized_msg = eval { Unicode::Normalize::NFC($decoded) } or do {
  34         493  
85 0         0 $c->tx->emit(
86             encoding_error => _get_error_details(
87             code => 'INVALID_UNICODE',
88             reason => 'Malformed Unicode data',
89             message => $msg
90             ));
91 0         0 return;
92             };
93              
94 34 100       76 my $args = eval { decode_json_text($normalized_msg); } or do {
  34         173  
95 1         59 $c->tx->emit(
96             encoding_error => _get_error_details(
97             code => 'INVALID_JSON',
98             reason => 'Malformed JSON data',
99             message => $msg
100             ));
101 1         11 return;
102             };
103              
104 33         1058 on_message($c, $args);
105 23         296 });
106              
107             $c->on(
108             binary => sub {
109 1     1   3772 my ($d, $bytes) = @_;
110 1 50 33     10 $config->{binary_frame}(@_) if $bytes and exists($config->{binary_frame});
111 23         4690 });
112              
113 23 50       1431 $c->on(finish => $config->{finish_connection}) if $config->{finish_connection};
114              
115 23         137 return;
116             }
117              
118             sub on_message {
119 33     33 1 86 my ($c, $args) = @_;
120              
121 33         135 my $config = $c->wsp_config->{config};
122              
123 33         108 my $req_storage = {};
124 33         101 $req_storage->{args} = $args;
125              
126             # We still want to run any hooks even for invalid requests.
127 33 50       145 if (my $err = Mojo::WebSocketProxy::Parser::parse_req($c, $req_storage)) {
128 0         0 $c->send({json => $err}, $req_storage);
129 0   0     0 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
130             }
131              
132 33 100       141 my $action = $c->dispatch($args) or do {
133 2         28 my $err = $c->wsp_error('error', UnrecognisedRequest => 'Unrecognised request');
134 2         63 $c->send({json => $err}, $req_storage);
135 2   50     20 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
136             };
137              
138 31         113 @{$req_storage}{keys %$action} = (values %$action);
  31         144  
139 31         93 $req_storage->{method} = $req_storage->{name};
140              
141             # main processing pipeline
142             my $f = $c->before_forward($req_storage)->transform(
143             done => sub {
144             # Note that we completely ignore the return value of ->before_forward here.
145 23 100   23   1436 return $req_storage->{instead_of_forward}->($c, $req_storage) if $req_storage->{instead_of_forward};
146 21         77 return $c->forward($req_storage);
147             }
148             )->then(
149             sub {
150 23     23   3431 my $result = shift;
151 23         114 return $c->after_forward($result, $req_storage)->transform(done => sub { $result });
  23         1296  
152             },
153             sub {
154 7     7   101532 my $result = shift;
155 7         39 Future->done($result);
156 31         110 });
157              
158             return Future->wait_any(
159             Future::Mojo->new_timeout(TIMEOUT)->else(
160             sub {
161 1     1   1002053 return Future->done($c->wsp_error('error', Timeout => 'Timeout'));
162             }
163             ),
164             $f
165             )->then(
166             sub {
167 31     31   13689 my ($result) = @_;
168 31 100       443 $c->send({json => $result}, $req_storage) if $result;
169 31   50     198 return $c->_run_hooks($config->{after_dispatch} || []);
170             }
171             )->on_fail(
172             sub {
173 0     0   0 $c->app->log->error("An error occurred handling on_message. Error @_");
174 31         2091 })->retain;
175             }
176              
177             sub before_forward {
178 31     31 1 93 my ($c, $req_storage) = @_;
179              
180 31         92 my $config = $c->wsp_config->{config};
181              
182 31         65 my $before_forward_hooks = [];
183              
184             # Global hooks are always first
185 31         90 for ($config, $req_storage) {
186 62 100       211 push @$before_forward_hooks, ref($_->{before_forward}) eq 'ARRAY' ? @{$_->{before_forward}} : $_->{before_forward};
  7         20  
187             }
188              
189             # We always want to clear these after every request.
190 31         56 delete $req_storage->{before_forward};
191              
192 31         96 return $c->_run_hooks($before_forward_hooks, $req_storage);
193             }
194              
195             sub after_forward {
196 23     23 1 60 my ($c, $result, $req_storage) = @_;
197              
198 23         72 my $config = $c->wsp_config->{config};
199 23   50     188 return $c->_run_hooks($config->{after_forward} || [], $result, $req_storage);
200             }
201              
202             sub _run_hooks {
203 87     87   224 my @hook_params = @_;
204 87         145 my $c = shift @hook_params;
205 87         136 my $hooks = shift @hook_params;
206              
207             my $result_f = fmap {
208 19     19   1048 my $hook = shift;
209 19 100       62 my $result = $hook->($c, @hook_params) or return Future->done;
210 9 100 66     277 return $result if blessed($result) && $result->isa('Future');
211 5         44 return Future->fail($result);
212             }
213 87         544 foreach => [grep { defined } @$hooks],
  62         239  
214             concurrent => 1;
215 87         18923 return $result_f;
216             }
217              
218             sub dispatch {
219 33     33 1 79 my ($c, $args) = @_;
220              
221 33         120 my $log = $c->app->log;
222 33         422 $log->debug("websocket got json " . $c->dumper($args));
223              
224             my ($action) =
225 0         0 sort { $a->{order} <=> $b->{order} }
226 33         121 grep { defined }
227 33         4390 map { $c->wsp_config->{actions}->{$_} } keys %$args;
  33         110  
228              
229 33         137 return $action;
230             }
231              
232             sub forward {
233 23     23 1 54 my ($c, $req_storage) = @_;
234              
235 23         79 my $config = $c->wsp_config->{config};
236              
237 23         62 for my $hook (qw/ before_call before_get_rpc_response after_got_rpc_response /) {
238             $req_storage->{$hook} = [
239 71         156 grep { $_ } (ref $config->{$hook} eq 'ARRAY' ? @{$config->{$hook}} : $config->{$hook}),
  0         0  
240 69 50       320 grep { $_ } (ref $req_storage->{$hook} eq 'ARRAY' ? @{$req_storage->{$hook}} : $req_storage->{$hook}),
  69 50       119  
  0         0  
241             ];
242             }
243              
244 23   100     134 my $backend_name = $req_storage->{backend} // "default";
245 23 50       77 my $backend = $c->wsp_config->{backends}{$backend_name}
246             or die "Cannot dispatch request - no backend named '$backend_name'";
247              
248 23         134 $backend->call_rpc($c, $req_storage);
249              
250 23         76 return;
251             }
252              
253             sub _get_error_details {
254 1     1   10 my (%args) = @_;
255              
256             return {
257             error => 'Error Processing Request',
258             details => {
259             error_code => $args{code},
260             reason => $args{reason},
261             request_body => $args{message},
262             },
263 1         11 };
264             }
265              
266             1;
267              
268             __END__