File Coverage

blib/lib/Web/Async/WebSocket/Server/Connection.pm
Criterion Covered Total %
statement 36 288 12.5
branch 0 102 0.0
condition 0 53 0.0
subroutine 12 51 23.5
pod 3 32 9.3
total 51 526 9.7


line stmt bran cond sub pod time code
1             package Web::Async::WebSocket::Server::Connection;
2 2     2   253881 use Full::Class qw(:v1), extends => 'IO::Async::Notifier';
  2         119620  
  2         16  
3              
4             our $VERSION = '0.006'; ## VERSION
5             ## AUTHORITY
6              
7 2     2   11521 use Web::Async::WebSocket::Frame;
  2         10  
  2         159  
8              
9 2     2   17 use List::Util qw(pairmap);
  2         4  
  2         187  
10 2     2   1560 use Compress::Zlib;
  2         243299  
  2         776  
11 2     2   647 use POSIX ();
  2         21244  
  2         256  
12 2     2   1973 use URI;
  2         9458  
  2         173  
13 2     2   1441 use URI::ws;
  2         22653  
  2         163  
14 2     2   1309 use Time::Moment;
  2         7585  
  2         276  
15 2     2   1454 use Digest::SHA qw(sha1);
  2         13134  
  2         317  
16 2     2   1193 use MIME::Base64 qw(encode_base64);
  2         1581  
  2         181  
17 2     2   13 use Unicode::UTF8 qw(valid_utf8);
  2         4  
  2         114  
18              
19             # As defined in the RFC - it's used as part of the hashing for the security header in the response
20 2     2   7 use constant WEBSOCKET_GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
  2         3  
  2         38168  
21              
22             # Opcodes have a registry here: https://www.iana.org/assignments/websocket/websocket.xhtml#opcode
23             our %OPCODE_BY_CODE = (
24             0 => 'continuation',
25             1 => 'text',
26             2 => 'binary',
27             8 => 'close',
28             9 => 'ping',
29             10 => 'pong',
30             );
31             our %OPCODE_BY_NAME = reverse %OPCODE_BY_CODE;
32              
33             our %COMPRESSIBLE_OPCODE = (
34             $OPCODE_BY_NAME{text} => 1,
35             $OPCODE_BY_NAME{binary} => 1,
36             );
37              
38             # Whether we're `ws` or `wss`
39 0     0 0   field $scheme : reader : param = 'ws';
40             # The Web::Async::WebSocket::Server instance
41 0           field $server : reader : param = undef;
42              
43             # Given the state of websockets in general, this is unlikely to change from `HTTP/1.1` anytime soon
44 0     0 0   field $http_version : reader : param = 'HTTP/1.1';
  0     0 0    
  0            
45             # 101 Upgrade is defined by the RFC, but if you have special requirements you can override via the constructor
46 0     0 0   field $status : reader : param = '101';
  0            
47             # The message is probably ignored by everything
48 0     0 0   field $msg : reader : param = 'Switching Protocols';
  0            
49             # There aren't a vast number of extensions, at the time of writing https://www.iana.org/assignments/websocket/websocket.xhtml#extension-name
50             # lists just two of 'em
51 0     0 0   field $supported_extension : reader : param {
  0            
52             +{
53             'permessage-deflate' => 1,
54             'server_no_context_takeover' => 1,
55             'client_no_context_takeover' => 1,
56             'server_max_window_bits' => 1,
57             'client_max_window_bits' => 1,
58             }
59             }
60              
61 0           field $method : reader = undef;
62 0     0 0   field $url : reader = undef;
  0            
63 0     0 0   field $uri : reader = undef;
  0            
64 0     0 0   field $headers : reader { +{ } }
  0     0 0    
  0            
65              
66             # What to report in the `Server:` header
67 0     0 0   field $server_name : reader : param = 'perl';
  0            
68              
69             # Restriction on number of raw (pre-decompression!) bytes,
70             # advised to set this to a nonzero value to avoid clients
71             # burning up all your memory...
72 0           field $maximum_payload_size : reader : param = undef;
73              
74             # Our current deflation (compression) state
75 0     0 0   field $deflation;
  0            
76             # Our current inflation (decompression) state
77             field $inflation;
78              
79 0     0 0   field $ryu : param : reader;
  0            
80              
81             # A coderef for processing requests before starting to accept traffic,
82             # should return a failed Future if the connection should be rejected.
83             field $handshake : param : reader = undef;
84              
85 0     0 0   field $on_handshake_failure : param : reader = undef;
  0            
86 0     0 0   field $on_handshake_complete : reader = undef;
  0            
87              
88             # A Ryu::Source representing the messages received from the client
89 0     0 0   field $incoming_frame : reader : param { $self->ryu->source }
  0     0 0    
  0            
90             # A Ryu::Source representing the messages to be sent to the client
91 0     0 0   field $outgoing_frame : reader : param { $self->ryu->source }
  0            
92              
93 0     0 0   field $compression_options : reader { +{ } }
  0            
94              
95             # The IO::Async::Stream representing the network connection
96             # to the client
97 0           field $stream;
98              
99             field $closed : reader : param = undef;
100              
101 0     0 0   method configure (%args) {
  0     0 1    
  0            
  0            
  0            
  0            
102 0 0         $http_version = delete $args{http_version} if exists $args{http_version};
103 0 0         $status = delete $args{status} if exists $args{status};
104 0 0         $msg = delete $args{msg} if exists $args{msg};
105 0 0         $ryu = delete $args{ryu} if exists $args{ryu};
106 0 0         $stream = delete $args{stream} if exists $args{stream};
107 0 0         weaken($server = delete $args{server}) if exists $args{server};
108 0 0         $server_name = delete $args{server_name} if exists $args{server_name};
109 0 0         $maximum_payload_size = delete $args{maximum_payload_size} if exists $args{maximum_payload_size};
110 0 0         $on_handshake_failure = delete $args{on_handshake_failure} if exists $args{on_handshake_failure};
111 0 0         $handshake = delete $args{handshake} if exists $args{handshake};
112 0           return $self->next::method(%args);
113             }
114              
115 0     0     method _add_to_loop ($loop) {
  0            
  0            
  0            
116 0     0     $on_handshake_failure //= async method ($stream, $error, @) {
  0            
  0            
  0            
  0            
  0            
117 0           await $stream->write("$http_version 400 $error\x0D\x0A\x0D\x0A");
118 0   0       };
119 0   0       $closed //= $self->loop->new_future;
120 0   0       $on_handshake_complete //= $self->loop->new_future;
121 0           $stream->configure(
122 0     0     on_closed => $self->$curry::weak(async method (@) {
  0            
  0            
123 0 0         $closed->done unless $closed->is_ready;
124 0           $server->on_client_disconnect($self);
125 0           }),
126             );
127             }
128              
129             =head2 send_text_frame
130              
131             Send a text frame.
132              
133             Expects a Unicode Perl text string as the first parameter - this will be
134             encoded to UTF-8 and sent to the client.
135              
136             =cut
137              
138 0     0 1   async method send_text_frame ($text, %args) {
  0            
  0            
  0            
  0            
  0            
139 0           return await $self->write_frame(
140             payload => $text,
141             type => 'text',
142             %args
143             );
144             }
145              
146             =head2 send_binary_frame
147              
148             Send a binary data frame.
149              
150             Expects the raw binary data bytes as the first parameter.
151              
152             =cut
153              
154 0     0 0   async method send_data_frame ($data, %args) {
  0            
  0            
  0            
  0            
  0            
155 0           return await $self->write_frame(
156             payload => $data,
157             type => 'binary',
158             %args
159             );
160             }
161              
162             =head2 write_frame
163              
164             Sends one or more frames to the client.
165              
166             =cut
167              
168 0     0 1   async method write_frame (%args) {
  0            
  0            
  0            
  0            
169 0 0         die 'already closed' if $closed->is_ready;
170 0           for my $frame ($self->prepare_frames(%args)) {
171 0           await $stream->write($frame);
172             }
173 0           return;
174             }
175              
176 0     0 0   async method prepare_frames (%args) {
  0            
  0            
  0            
  0            
177 0           my @frames;
178 0           $log->tracef('Write frame with %s', \%args);
179 0   0       my $opcode = $OPCODE_BY_NAME{$args{type}} // die 'invalid frame type';
180 0   0       my $compressed = ($args{compress} // 1) && $compression_options->{compress} && $COMPRESSIBLE_OPCODE{$opcode};
181 0           my $payload = $args{payload};
182 0 0         $payload = encode_utf8($payload) if $opcode == $OPCODE_BY_NAME{text};
183              
184 0           $opcode |= 0x80;
185 0 0         if($compressed) {
186 0           $opcode |= 0x40;
187 0           my $original = length $payload;
188 0           $payload = $self->deflate($payload);
189             # Strip terminator if we have one
190 0           $payload =~ s{\x00\x00\xFF\xFF$}{};
191 0   0       $log->tracef(
192             'Size after deflation is %d/%d, ratio of %4.1f%%',
193             length($payload),
194             $original,
195             100.0 * (length($payload) / ($original || 1)),
196             );
197             }
198 0           my $len = length $payload;
199 0           my $msg = pack('C1', $opcode);
200 0 0         if($len < 126) {
    0          
201 0           $msg .= pack('C1', $len);
202             } elsif($len <= 0xFFFF) {
203 0           $msg .= pack('C1n1', 126, $len);
204             } else {
205 0           $msg .= pack('C1Q>1', 127, $len);
206             }
207 0           $msg .= $payload;
208 0           push @frames, $msg;
209 0           return @frames;
210             }
211              
212 0     0 0   method deflate ($data) {
  0            
  0            
  0            
213 0 0         undef $deflation unless $compression_options->{server_context};
214             $deflation //= deflateInit(
215 0 0 0       -WindowBits => -($compression_options->{server_bits} || 15)
      0        
216             ) or die "Cannot create a deflation stream\n" ;
217              
218 0           my ($output, $status) = $deflation->deflate($data);
219 0 0         die "deflation failed - $status\n" unless $status == Z_OK;
220 0           (my $block, $status) = $deflation->flush(Z_SYNC_FLUSH);
221 0 0         die "deflation failed at flush stage\n" unless $status == Z_OK;
222              
223 0           return $output . $block;
224             }
225              
226 0     0 0   method inflate ($data) {
  0            
  0            
  0            
227 0 0         undef $inflation unless $compression_options->{client_context};
228             $inflation //= inflateInit(
229 0 0 0       -WindowBits => -($compression_options->{client_bits} || 15)
      0        
230             ) or die "Cannot create a deflation stream\n" ;
231              
232 0           my ($block, $status) = $inflation->inflate($data);
233 0 0 0       die "inflation failed - $status\n" unless $status == Z_STREAM_END or $status == Z_OK;
234 0           return $block;
235             }
236              
237 0     0 0   async method read_headers () {
  0            
  0            
  0            
238 0           while(1) {
239 0           my $line = decode_utf8('' . await $stream->read_until("\x0D\x0A"));
240 0           $line =~ s/\x0D\x0A$//;
241 0 0         last unless length $line;
242              
243 0           my ($k, $v) = $line =~ /^([^:]+):\s+(.*)$/;
244 0           $k = lc($k =~ tr{-}{_}r);
245 0           $headers->{$k} = $v;
246             }
247 0           return $headers;
248             }
249              
250 0     0 0   method generate_response_key ($key) {
  0            
  0            
  0            
251 0 0 0       die "No websocket key provided\n" unless defined $key and length $key;
252 0           return encode_base64(sha1($key . WEBSOCKET_GUID), '');
253             }
254              
255 0     0 0   async method handle_connection () {
  0            
  0            
  0            
256             try {
257             $self->add_child($stream);
258             my $first = await $stream->read_until("\x0D\x0A");
259             ($method, $url, my $version) = $first =~ m{^(\S+)\s+(\S+)\s+(HTTP/\d+\.\d+)\x0D\x0A$}a;
260             $log->tracef('HTTP request is [%s] for [%s] version %s', $method, $url, $version);
261             my $hdr = await $self->read_headers();
262              
263             $log->tracef('url = %s, headers = %s', $url, format_json_text($hdr));
264              
265             # We rely on the caller to tell us the scheme, defaulting to plain `ws`,
266             # and everything else in the URI comes directly from the request.
267             $uri = URI->new($scheme . '://localhost');
268             $uri->host($hdr->{host}) if exists $hdr->{host};
269             $uri->path($url);
270              
271             unless($hdr->{upgrade} =~ /^websocket$/i) {
272             die sprintf "No upgrade: websocket header, ignoring connection\n";
273             }
274             unless($hdr->{sec_websocket_version} >= 13) {
275             die sprintf "Invalid websocket version %s\n", $hdr->{sec_websocket_version};
276             }
277              
278             my %output = (
279             'Upgrade' => 'websocket',
280             'Connection' => 'upgrade',
281             'Server' => $server_name,
282             'Date' => Time::Moment->now_utc->strftime("%a, %d %b %Y %H:%M:%S GMT"),
283             );
284             $output{'Sec-WebSocket-Accept'} = $self->generate_response_key($hdr->{sec_websocket_key});
285              
286             if(exists $hdr->{sec_websocket_extensions}) {
287             my $extensions;
288             VALID: {
289             SELECTION:
290             for my $selection (split /\s*,\s*/, $hdr->{sec_websocket_extensions} // '') {
291             my @options = map {; /^(\S+)(?:\s*=\s*(.*)\s*)?$/ ? ($1, $2) : () } split /\s*;\s*/, $selection;
292 0     0     my @order = pairmap { $a } @options;
293             my %options = @options;
294             my @invalid = grep { !$supported_extension->{$_} } sort keys %options;
295             if(@invalid) {
296             $log->infof('Rejecting invalid option combination %s', \@invalid);
297             next SELECTION;
298             }
299              
300             $log->infof('Acceptable options: %s', \%options);
301             $options{client_max_window_bits} //= 15 if exists $options{client_max_window_bits};
302             $compression_options->{client_bits} = $options{client_max_window_bits};
303             $compression_options->{server_bits} = $options{server_max_window_bits} || 15;
304             $extensions = join '; ', map { defined($options{$_}) ? "$_=$options{$_}" : $_ } @order;
305             $compression_options->{server_context} = (exists $options{server_no_context_takeover}) ? 0 : 1;
306             $compression_options->{client_context} = (exists $options{client_no_context_takeover}) ? 0 : 1;
307             $compression_options->{compress} = 1 if exists $options{'permessage-deflate'};
308             last VALID;
309             }
310             $log->infof('No acceptable extension options, giving up: %s', $hdr->{sec_websocket_extensions});
311             await $stream->write(
312             join(
313             "\x0D\x0A",
314             "$http_version 400 No acceptable extensions",
315             (pairmap {
316 0     0     encode_utf8("$a: $b")
317             } %output),
318             # Blank line at the end of the headers
319             '', ''
320             )
321             );
322             die "no acceptable extensions\n";
323             }
324             $output{'Sec-Websocket-Extensions'} = $extensions;
325             }
326              
327             try {
328             await $handshake->(
329             client => $self,
330             response_headers => \%output,
331             ) if $handshake;
332             } catch ($e) {
333             await $stream->write(
334             join(
335             "\x0D\x0A",
336             "$http_version 400 Handshake rejected",
337             (pairmap {
338 0     0     encode_utf8("$a: $b")
339             } %output),
340             # Blank line at the end of the headers
341             '', ''
342             )
343             );
344             die "handshake rejected\n";
345             }
346              
347             # Send the entire header block in a single write
348             await $stream->write(
349             join(
350             "\x0D\x0A",
351             "$http_version $status $msg",
352             (pairmap {
353 0     0     encode_utf8("$a: $b")
354             } %output),
355             # Blank line at the end of the headers
356             '', ''
357             )
358             );
359             } catch ($e) {
360             $log->errorf('Failed - %s', $e);
361             await $self->$on_handshake_failure($stream, $e);
362             return;
363             }
364              
365             # Once the handshake is complete, we don't need the handler any more,
366             # and keeping it around could lead to unwanted refcount cycles
367 0           undef $on_handshake_failure;
  0            
368 0           $on_handshake_complete->done;
369              
370             # Body processing
371             try {
372             $log->tracef('Start reading frames');
373             while(1) {
374             await $incoming_frame->unblocked;
375             my $frame = await $self->read_frame();
376             $log->tracef('Had frame: %s', $frame);
377             $incoming_frame->emit($frame);
378             }
379 0           } catch ($e) {
380             $log->errorf('Problem, %s', $e) unless $e =~ /^EOF/;
381             await $self->close(
382             code => 1011, # internal error
383             reason => 'Internal error'
384             );
385             }
386             }
387              
388 0     0 0   async method read_frame () {
  0            
  0            
  0            
389 0           $log->tracef('Reading frames from %s', "$stream");
390 0           my $fin;
391 0           my $data = '';
392 0           my $compressed;
393             my $type;
394 0           do {
395 0           my ($chunk, $eof);
396 0           ($chunk, $eof) = await $stream->read_exactly(2);
397 0 0         die "EOF\n" if $eof;
398 0           my ($opcode, $len) = unpack 'C1C1', $chunk;
399 0           my $masked = $len & 0x80;
400 0 0         die "unmasked frame\n" unless $masked;
401 0           $len &= ~0x80;
402 0 0         $fin = ($opcode & 0x80) ? 1 : 0;
403 0 0         my @rsv = map { ($opcode & $_) ? 1 : 0 } 0x40, 0x20, 0x10;
  0            
404 0   0       $compressed //= $compression_options->{compress} && $rsv[0];
      0        
405             return await $self->close(
406             code => 1002,
407             reason => 'Reserved bit 0 set with compression disabled',
408 0 0 0       ) if $rsv[0] and not $compression_options->{compress};
409             return await $self->close(
410             code => 1002,
411             reason => 'Unexpected reserved bit set',
412 0 0         ) if any { $_ } @rsv[1..2];
  0            
413 0   0       $type //= $opcode & 0x0F;
414             return await $self->close(
415             code => 1002,
416             reason => 'Unknown opcode',
417 0 0         ) unless $OPCODE_BY_CODE{$type};
418 0 0         if($len == 126) {
    0          
419 0           ($chunk, $eof) = await $stream->read_exactly(2);
420 0 0         die "EOF\n" if $eof;
421 0           ($len) = unpack 'n1', $chunk;
422 0 0         die 'invalid length' if $len < 126;
423             } elsif($len == 127) {
424 0           ($chunk, $eof) = await $stream->read_exactly(8);
425 0 0         die "EOF\n" if $eof;
426 0           ($len) = unpack 'Q>1', $chunk;
427 0 0 0       die 'invalid length' if $len < 0xFFFF or $len & 0x80000000;
428             }
429 0           my $mask = '';
430 0 0         if($masked) {
431 0           ($mask, $eof) = await $stream->read_exactly(4);
432 0 0         die "EOF\n" if $eof;
433             }
434             $log->tracef(
435 0           'Frame opcode %d, length %d, fin = %s, rsv = %s %s %s, mask key %v0x',
436             $opcode,
437             $len,
438             $fin,
439             @rsv,
440             $mask
441             );
442 0 0 0       die "excessive length\n" if defined($maximum_payload_size) and $len + length($data) > $maximum_payload_size;
443 0           (my $payload, $eof) = await $stream->read_exactly($len);
444 0 0         die "EOF\n" if $eof;
445 0 0         if($masked) {
446 0           $log->tracef('Masked payload = %v0x', $payload);
447 0           my ($frac, $int) = POSIX::modf(length($payload) / 4);
448 0           $payload ^.= ($mask x $int) . substr($mask, 0, 4 * $frac);
449             }
450 0           $log->tracef('Payload = %v0x', $payload);
451 0           $data .= $payload;
452             } until $fin;
453 0 0         $data = $self->inflate($data . "\x00\x00\xFF\xFF") if $compressed;
454 0           $log->tracef('Frame opcode is %s', $OPCODE_BY_CODE{$type});
455 0 0         if($type == $OPCODE_BY_NAME{text}) {
456 0 0         return await $self->close(
457             code => 1002,
458             reason => 'Invalid UTF-8 data in text frame',
459             ) unless valid_utf8($data);
460 0           $data = decode_utf8($data);
461             }
462 0           $log->tracef('Finished, data is now %s', $data);
463 0           my $frame = Web::Async::WebSocket::Frame->new(
464             payload => $data,
465             opcode => $type
466             );
467 0 0         if($OPCODE_BY_CODE{$type} equ 'close') {
468 0           my ($code, $reason) = unpack 'na*', $frame->payload;
469 0 0         return await $self->close(
470             code => 1002,
471             reason => 'Invalid UTF-8 reason in close frame',
472             ) unless valid_utf8($reason);
473 0           await $self->close(
474             code => ($code || 0),
475             reason => decode_utf8($reason // ''),
476             );
477             }
478 0           return $frame;
479             }
480              
481 0     0 0   async method close (%args) {
  0            
  0            
  0            
  0            
482             # Can only close once
483 0 0         return if $closed->is_ready;
484              
485 0 0         if($server) {
486 0           $server->on_client_close($self, %args);
487             }
488              
489             # No point trying to write anything if the remote has closed the connection
490 0 0         if($stream->is_read_eof) {
491 0           $closed->done(%args);
492 0           $stream->close;
493 0           return;
494             }
495              
496             my $f = $self->write_frame(
497             type => 'close',
498             payload => pack(
499 0   0       'na*' => ($args{code} // 0), encode_utf8($args{reason} // '')
      0        
500             ),
501             );
502 0           $closed->done(%args);
503 0           await $f;
504 0           $stream->close;
505             }
506              
507             1;