File Coverage

blib/lib/Mojo/Transaction/WebSocket.pm
Criterion Covered Total %
statement 100 104 96.1
branch 55 58 94.8
condition 21 28 75.0
subroutine 25 29 86.2
pod 22 22 100.0
total 223 241 92.5


line stmt bran cond sub pod time code
1             package Mojo::Transaction::WebSocket;
2 59     59   107078 use Mojo::Base 'Mojo::Transaction';
  59         126  
  59         404  
3              
4 59     59   492 use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
  59         155  
  59         4641  
5 59     59   462 use List::Util qw(first);
  59         322  
  59         4373  
6 59     59   638 use Mojo::JSON qw(encode_json j);
  59         155  
  59         3474  
7 59     59   366 use Mojo::Util qw(decode encode trim);
  59         208  
  59         4234  
8 59     59   38795 use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
  59         237  
  59         159503  
9              
10             has [qw(compressed established handshake masked)];
11             has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12              
13             sub build_message {
14 178     178 1 11136 my ($self, $frame) = @_;
15              
16             # Text
17 178 100       835 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18              
19             # JSON
20 178 100       620 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21              
22             # Raw text or binary
23 178 100       491 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
  150         553  
24 28         117 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25              
26             # "permessage-deflate" extension
27 178 100       736 return $frame unless $self->compressed;
28             my $deflate = $self->{deflate}
29 13   66     108 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30 13         35761 $deflate->deflate($frame->[5], my $out);
31 13         94 $deflate->flush($out, Z_SYNC_FLUSH);
32 13         690 $deflate->deflateReset;
33 13         157 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
34              
35 13         51 return $frame;
36             }
37              
38 196     196 1 672 sub client_read { shift->server_read(@_) }
39 464     464 1 1432 sub client_write { shift->server_write(@_) }
40              
41             sub closed {
42 134     134 1 503 my $self = shift->completed;
43 134 100       469 my @args = $self->{close} ? (@{$self->{close}}) : (1006);
  119         382  
44 134 100       645 return $self->emit(finish => @args > 1 ? @args : (@args, undef));
45             }
46              
47 9     9 1 38 sub connection { shift->handshake->connection }
48              
49             sub finish {
50 156     156 1 407 my $self = shift;
51              
52 156         557 my $close = $self->{close} = [@_];
53 156 100       526 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
54 156 100       535 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
55 156   100     824 $close->[0] //= 1005;
56 156         3722 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
57              
58 156         948 return $self;
59             }
60              
61 579     579 1 2333 sub is_websocket {1}
62              
63 1     1 1 32 sub kept_alive { shift->handshake->kept_alive }
64 0     0 1 0 sub local_address { shift->handshake->local_address }
65 0     0 1 0 sub local_port { shift->handshake->local_port }
66              
67             sub parse_message {
68 275     275 1 2685 my ($self, $frame) = @_;
69              
70 275         1073 $self->emit(frame => $frame);
71              
72             # Ping/Pong
73 275         575 my $op = $frame->[4];
74 275 100       632 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
75 274 100       661 return undef if $op == WS_PONG;
76              
77             # Close
78 273 100       594 if ($op == WS_CLOSE) {
79 94 100       466 return $self->finish unless length $frame->[5] >= 2;
80 14         113 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
81             }
82              
83             # Append chunk and check message size
84 179 100 66     768 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
  172         638  
85 179         796 $self->{message} .= $frame->[5];
86 179         477 my $max = $self->max_websocket_size;
87 179 100       618 return $self->finish(1009) if length $self->{message} > $max;
88              
89             # No FIN bit (Continuation)
90 178 100       491 return undef unless $frame->[0];
91              
92             # "permessage-deflate" extension (handshake and RSV1)
93 171         399 my $msg = delete $self->{message};
94 171 50 66     502 if ($self->compressed && $self->{pmc}) {
95             my $inflate = $self->{inflate}
96 9   66     84 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
97 9         5536 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
98 9 100       56 return $self->finish(1009) if length $msg;
99 8         181 $msg = $out;
100             }
101              
102 170 100       629 $self->emit(json => j($msg)) if $self->has_subscribers('json');
103 170         452 $op = delete $self->{op};
104 170 100       766 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
105 170 50       486 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
    100          
106             }
107              
108 12     12 1 50 sub protocol { shift->res->headers->sec_websocket_protocol }
109              
110 0     0 1 0 sub remote_address { shift->handshake->remote_address }
111 0     0 1 0 sub remote_port { shift->handshake->remote_port }
112 506     506 1 1404 sub req { shift->handshake->req }
113 642     642 1 1994 sub res { shift->handshake->res }
114              
115 119 50   119 1 393 sub resume { $_[0]->handshake->resume and return $_[0] }
116              
117             sub send {
118 331     331 1 1002 my ($self, $msg, $cb) = @_;
119 331 100       1011 $self->once(drain => $cb) if $cb;
120 331 100       1361 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
121 331         1392 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
122 331         1673 return $self->emit('resume');
123             }
124              
125             sub server_read {
126 313     313 1 773 my ($self, $chunk) = @_;
127              
128 313         1131 $self->{read} .= $chunk;
129 313         962 my $max = $self->max_websocket_size;
130 313         1546 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
131 269 100 50     731 $self->finish(1009) and last unless ref $frame;
132 266         747 $self->parse_message($frame);
133             }
134              
135 313         982 $self->emit('resume');
136             }
137              
138             sub server_write {
139 930     930 1 1489 my $self = shift;
140 930 100 100     5479 $self->emit('drain') unless length($self->{write} //= '');
141 930 100 100     4123 $self->completed if !length $self->{write} && $self->{closing};
142 930         2641 return delete $self->{write};
143             }
144              
145             sub with_compression {
146 20     20 1 35 my $self = shift;
147              
148             # "permessage-deflate" extension
149 20 100 33     59 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
      100        
150             if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
151             }
152              
153             sub with_protocols {
154 6     6 1 16 my $self = shift;
155              
156 6   100     27 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
  8         32  
157 6 100   19   77 return undef unless defined(my $proto = first { $protos{$_} } @_);
  19         63  
158              
159 3         19 $self->res->headers->sec_websocket_protocol($proto);
160 3         12 return $proto;
161             }
162              
163             1;
164              
165             =encoding utf8
166              
167             =head1 NAME
168              
169             Mojo::Transaction::WebSocket - WebSocket transaction
170              
171             =head1 SYNOPSIS
172              
173             use Mojo::Transaction::WebSocket;
174              
175             # Send and receive WebSocket messages
176             my $ws = Mojo::Transaction::WebSocket->new;
177             $ws->send('Hello World!');
178             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
179             $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
180              
181             =head1 DESCRIPTION
182              
183             L is a container for WebSocket transactions, based on L
184             6455|https://tools.ietf.org/html/rfc6455> and L.
185              
186             =head1 EVENTS
187              
188             L inherits all events from L and can emit the following new ones.
189              
190             =head2 binary
191              
192             $ws->on(binary => sub ($ws, $bytes) {...});
193              
194             Emitted when a complete WebSocket binary message has been received.
195              
196             $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
197              
198             =head2 drain
199              
200             $ws->on(drain => sub ($ws) {...});
201              
202             Emitted once all data has been sent.
203              
204             $ws->on(drain => sub ($ws) { $ws->send(time) });
205              
206             =head2 finish
207              
208             $ws->on(finish => sub ($ws, $code, $reason) {...});
209              
210             Emitted when the WebSocket connection has been closed.
211              
212             =head2 frame
213              
214             $ws->on(frame => sub ($ws, $frame) {...});
215              
216             Emitted when a WebSocket frame has been received.
217              
218             $ws->on(frame => sub ($ws, $frame) {
219             say "FIN: $frame->[0]";
220             say "RSV1: $frame->[1]";
221             say "RSV2: $frame->[2]";
222             say "RSV3: $frame->[3]";
223             say "Opcode: $frame->[4]";
224             say "Payload: $frame->[5]";
225             });
226              
227             =head2 json
228              
229             $ws->on(json => sub ($ws, $json) {...});
230              
231             Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
232             decoded. Note that this event only gets emitted when it has at least one subscriber.
233              
234             $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
235              
236             =head2 message
237              
238             $ws->on(message => sub ($ws, $msg) {...});
239              
240             Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
241             this event only gets emitted when it has at least one subscriber.
242              
243             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
244              
245             =head2 resume
246              
247             $tx->on(resume => sub ($tx) {...});
248              
249             Emitted when transaction is resumed.
250              
251             =head2 text
252              
253             $ws->on(text => sub ($ws, $bytes) {...});
254              
255             Emitted when a complete WebSocket text message has been received.
256              
257             $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
258              
259             =head1 ATTRIBUTES
260              
261             L inherits all attributes from L and implements the following new
262             ones.
263              
264             =head2 compressed
265              
266             my $bool = $ws->compressed;
267             $ws = $ws->compressed($bool);
268              
269             Compress messages with C extension.
270              
271             =head2 established
272              
273             my $bool = $ws->established;
274             $ws = $ws->established($bool);
275              
276             WebSocket connection established.
277              
278             =head2 handshake
279              
280             my $handshake = $ws->handshake;
281             $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
282              
283             The original handshake transaction, usually a L object.
284              
285             =head2 masked
286              
287             my $bool = $ws->masked;
288             $ws = $ws->masked($bool);
289              
290             Mask outgoing frames with XOR cipher and a random 32-bit key.
291              
292             =head2 max_websocket_size
293              
294             my $size = $ws->max_websocket_size;
295             $ws = $ws->max_websocket_size(1024);
296              
297             Maximum WebSocket message size in bytes, defaults to the value of the C environment variable
298             or C<262144> (256KiB).
299              
300             =head1 METHODS
301              
302             L inherits all methods from L and implements the following new ones.
303              
304             =head2 build_message
305              
306             my $frame = $ws->build_message({binary => $bytes});
307             my $frame = $ws->build_message({text => $bytes});
308             my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
309             my $frame = $ws->build_message($chars);
310              
311             Build WebSocket message.
312              
313             =head2 client_read
314              
315             $ws->client_read($data);
316              
317             Read data client-side, used to implement user agents such as L.
318              
319             =head2 client_write
320              
321             my $bytes = $ws->client_write;
322              
323             Write data client-side, used to implement user agents such as L.
324              
325             =head2 closed
326              
327             $tx = $tx->closed;
328              
329             Same as L, but also indicates that all transaction data has been sent.
330              
331             =head2 connection
332              
333             my $id = $ws->connection;
334              
335             Connection identifier.
336              
337             =head2 finish
338              
339             $ws = $ws->finish;
340             $ws = $ws->finish(1000);
341             $ws = $ws->finish(1003 => 'Cannot accept data!');
342              
343             Close WebSocket connection gracefully.
344              
345             =head2 is_websocket
346              
347             my $bool = $ws->is_websocket;
348              
349             True, this is a L object.
350              
351             =head2 kept_alive
352              
353             my $bool = $ws->kept_alive;
354              
355             Connection has been kept alive.
356              
357             =head2 local_address
358              
359             my $address = $ws->local_address;
360              
361             Local interface address.
362              
363             =head2 local_port
364              
365             my $port = $ws->local_port;
366              
367             Local interface port.
368              
369             =head2 parse_message
370              
371             $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
372              
373             Parse WebSocket message.
374              
375             =head2 protocol
376              
377             my $proto = $ws->protocol;
378              
379             Return negotiated subprotocol or C.
380              
381             =head2 remote_address
382              
383             my $address = $ws->remote_address;
384              
385             Remote interface address.
386              
387             =head2 remote_port
388              
389             my $port = $ws->remote_port;
390              
391             Remote interface port.
392              
393             =head2 req
394              
395             my $req = $ws->req;
396              
397             Handshake request, usually a L object.
398              
399             =head2 res
400              
401             my $res = $ws->res;
402              
403             Handshake response, usually a L object.
404              
405             =head2 resume
406              
407             $ws = $ws->resume;
408              
409             Resume L transaction.
410              
411             =head2 send
412              
413             $ws = $ws->send({binary => $bytes});
414             $ws = $ws->send({text => $bytes});
415             $ws = $ws->send({json => {test => [1, 2, 3]}});
416             $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
417             $ws = $ws->send($chars);
418             $ws = $ws->send($chars => sub {...});
419              
420             Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
421             written.
422              
423             # Send "Ping" frame
424             use Mojo::WebSocket qw(WS_PING);
425             $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
426              
427             =head2 server_read
428              
429             $ws->server_read($data);
430              
431             Read data server-side, used to implement web servers such as L.
432              
433             =head2 server_write
434              
435             my $bytes = $ws->server_write;
436              
437             Write data server-side, used to implement web servers such as L.
438              
439             =head2 with_compression
440              
441             $ws->with_compression;
442              
443             Negotiate C extension for this WebSocket connection.
444              
445             =head2 with_protocols
446              
447             my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
448              
449             Negotiate subprotocol for this WebSocket connection.
450              
451             =head1 SEE ALSO
452              
453             L, L, L.
454              
455             =cut