File Coverage

blib/lib/Mojo/Transaction/WebSocket.pm
Criterion Covered Total %
statement 99 103 96.1
branch 55 58 94.8
condition 20 28 71.4
subroutine 25 29 86.2
pod 22 22 100.0
total 221 240 92.0


line stmt bran cond sub pod time code
1             package Mojo::Transaction::WebSocket;
2 55     55   67984 use Mojo::Base 'Mojo::Transaction';
  55         141  
  55         402  
3              
4 55     55   450 use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
  55         153  
  55         3427  
5 55     55   440 use List::Util qw(first);
  55         170  
  55         3608  
6 55     55   390 use Mojo::JSON qw(encode_json j);
  55         189  
  55         3220  
7 55     55   388 use Mojo::Util qw(decode encode trim);
  55         176  
  55         3416  
8 55     55   26920 use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
  55         172  
  55         115116  
9              
10             has [qw(compressed established handshake masked)];
11             has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12              
13             sub build_message {
14 161     161 1 8668 my ($self, $frame) = @_;
15              
16             # Text
17 161 100       691 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18              
19             # JSON
20 161 100       538 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21              
22             # Raw text or binary
23 161 100       416 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
  132         539  
24 29         101 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25              
26             # "permessage-deflate" extension
27 161 100       626 return $frame unless $self->compressed;
28             my $deflate = $self->{deflate}
29 8   66     98 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30 8         34249 $deflate->deflate($frame->[5], my $out);
31 8         75 $deflate->flush($out, Z_SYNC_FLUSH);
32 8         528 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
33              
34 8         34 return $frame;
35             }
36              
37 175     175 1 615 sub client_read { shift->server_read(@_) }
38 442     442 1 1146 sub client_write { shift->server_write(@_) }
39              
40             sub closed {
41 133     133 1 445 my $self = shift->completed;
42 133 100       419 my @args = $self->{close} ? (@{$self->{close}}) : (1006);
  119         325  
43 133 100       576 return $self->emit(finish => @args > 1 ? @args : (@args, undef));
44             }
45              
46 9     9 1 35 sub connection { shift->handshake->connection }
47              
48             sub finish {
49 154     154 1 431 my $self = shift;
50              
51 154         481 my $close = $self->{close} = [@_];
52 154 100       468 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
53 154 100       465 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
54 154   100     801 $close->[0] //= 1005;
55 154         579 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
56              
57 154         759 return $self;
58             }
59              
60 530     530 1 2457 sub is_websocket {1}
61              
62 1     1 1 28 sub kept_alive { shift->handshake->kept_alive }
63 0     0 1 0 sub local_address { shift->handshake->local_address }
64 0     0 1 0 sub local_port { shift->handshake->local_port }
65              
66             sub parse_message {
67 259     259 1 2746 my ($self, $frame) = @_;
68              
69 259         860 $self->emit(frame => $frame);
70              
71             # Ping/Pong
72 259         513 my $op = $frame->[4];
73 259 100       575 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
74 258 100       578 return undef if $op == WS_PONG;
75              
76             # Close
77 257 100       588 if ($op == WS_CLOSE) {
78 93 100       472 return $self->finish unless length $frame->[5] >= 2;
79 13         102 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
80             }
81              
82             # Append chunk and check message size
83 164 100 66     660 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
  157         611  
84 164         715 $self->{message} .= $frame->[5];
85 164         402 my $max = $self->max_websocket_size;
86 164 100       488 return $self->finish(1009) if length $self->{message} > $max;
87              
88             # No FIN bit (Continuation)
89 163 100       456 return undef unless $frame->[0];
90              
91             # "permessage-deflate" extension (handshake and RSV1)
92 156         380 my $msg = delete $self->{message};
93 156 50 66     422 if ($self->compressed && $self->{pmc}) {
94             my $inflate = $self->{inflate}
95 6   33     92 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
96 6         5315 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
97 6 100       41 return $self->finish(1009) if length $msg;
98 5         128 $msg = $out;
99             }
100              
101 155 100       520 $self->emit(json => j($msg)) if $self->has_subscribers('json');
102 155         383 $op = delete $self->{op};
103 155 100       580 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
104 155 50       423 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
    100          
105             }
106              
107 12     12 1 34 sub protocol { shift->res->headers->sec_websocket_protocol }
108              
109 0     0 1 0 sub remote_address { shift->handshake->remote_address }
110 0     0 1 0 sub remote_port { shift->handshake->remote_port }
111 441     441 1 1152 sub req { shift->handshake->req }
112 564     564 1 1551 sub res { shift->handshake->res }
113              
114 106 50   106 1 288 sub resume { $_[0]->handshake->resume and return $_[0] }
115              
116             sub send {
117 317     317 1 811 my ($self, $msg, $cb) = @_;
118 317 100       885 $self->once(drain => $cb) if $cb;
119 317 100       1152 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
120 317         1166 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
121 317         1208 return $self->emit('resume');
122             }
123              
124             sub server_read {
125 288     288 1 640 my ($self, $chunk) = @_;
126              
127 288         880 $self->{read} .= $chunk;
128 288         833 my $max = $self->max_websocket_size;
129 288         1132 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
130 255 100 50     751 $self->finish(1009) and last unless ref $frame;
131 253         653 $self->parse_message($frame);
132             }
133              
134 288         817 $self->emit('resume');
135             }
136              
137             sub server_write {
138 879     879 1 1423 my $self = shift;
139 879 100 100     4806 $self->emit('drain') unless length($self->{write} //= '');
140 879 100 100     3385 $self->completed if !length $self->{write} && $self->{closing};
141 879         2568 return delete $self->{write};
142             }
143              
144             sub with_compression {
145 20     20 1 45 my $self = shift;
146              
147             # "permessage-deflate" extension
148 20 100 33     67 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
      100        
149             if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
150             }
151              
152             sub with_protocols {
153 6     6 1 16 my $self = shift;
154              
155 6   100     19 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
  8         26  
156 6 100   19   52 return undef unless defined(my $proto = first { $protos{$_} } @_);
  19         53  
157              
158 3         15 $self->res->headers->sec_websocket_protocol($proto);
159 3         13 return $proto;
160             }
161              
162             1;
163              
164             =encoding utf8
165              
166             =head1 NAME
167              
168             Mojo::Transaction::WebSocket - WebSocket transaction
169              
170             =head1 SYNOPSIS
171              
172             use Mojo::Transaction::WebSocket;
173              
174             # Send and receive WebSocket messages
175             my $ws = Mojo::Transaction::WebSocket->new;
176             $ws->send('Hello World!');
177             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
178             $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
179              
180             =head1 DESCRIPTION
181              
182             L is a container for WebSocket transactions, based on L
183             6455|https://tools.ietf.org/html/rfc6455> and L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L and can emit the following new ones.
188              
189             =head2 binary
190              
191             $ws->on(binary => sub ($ws, $bytes) {...});
192              
193             Emitted when a complete WebSocket binary message has been received.
194              
195             $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
196              
197             =head2 drain
198              
199             $ws->on(drain => sub ($ws) {...});
200              
201             Emitted once all data has been sent.
202              
203             $ws->on(drain => sub ($ws) { $ws->send(time) });
204              
205             =head2 finish
206              
207             $ws->on(finish => sub ($ws, $code, $reason) {...});
208              
209             Emitted when the WebSocket connection has been closed.
210              
211             =head2 frame
212              
213             $ws->on(frame => sub ($ws, $frame) {...});
214              
215             Emitted when a WebSocket frame has been received.
216              
217             $ws->on(frame => sub ($ws, $frame) {
218             say "FIN: $frame->[0]";
219             say "RSV1: $frame->[1]";
220             say "RSV2: $frame->[2]";
221             say "RSV3: $frame->[3]";
222             say "Opcode: $frame->[4]";
223             say "Payload: $frame->[5]";
224             });
225              
226             =head2 json
227              
228             $ws->on(json => sub ($ws, $json) {...});
229              
230             Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
231             decoded. Note that this event only gets emitted when it has at least one subscriber.
232              
233             $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
234              
235             =head2 message
236              
237             $ws->on(message => sub ($ws, $msg) {...});
238              
239             Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
240             this event only gets emitted when it has at least one subscriber.
241              
242             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
243              
244             =head2 resume
245              
246             $tx->on(resume => sub ($tx) {...});
247              
248             Emitted when transaction is resumed.
249              
250             =head2 text
251              
252             $ws->on(text => sub ($ws, $bytes) {...});
253              
254             Emitted when a complete WebSocket text message has been received.
255              
256             $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
257              
258             =head1 ATTRIBUTES
259              
260             L inherits all attributes from L and implements the following new
261             ones.
262              
263             =head2 compressed
264              
265             my $bool = $ws->compressed;
266             $ws = $ws->compressed($bool);
267              
268             Compress messages with C extension.
269              
270             =head2 established
271              
272             my $bool = $ws->established;
273             $ws = $ws->established($bool);
274              
275             WebSocket connection established.
276              
277             =head2 handshake
278              
279             my $handshake = $ws->handshake;
280             $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
281              
282             The original handshake transaction, usually a L object.
283              
284             =head2 masked
285              
286             my $bool = $ws->masked;
287             $ws = $ws->masked($bool);
288              
289             Mask outgoing frames with XOR cipher and a random 32-bit key.
290              
291             =head2 max_websocket_size
292              
293             my $size = $ws->max_websocket_size;
294             $ws = $ws->max_websocket_size(1024);
295              
296             Maximum WebSocket message size in bytes, defaults to the value of the C environment variable
297             or C<262144> (256KiB).
298              
299             =head1 METHODS
300              
301             L inherits all methods from L and implements the following new ones.
302              
303             =head2 build_message
304              
305             my $frame = $ws->build_message({binary => $bytes});
306             my $frame = $ws->build_message({text => $bytes});
307             my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
308             my $frame = $ws->build_message($chars);
309              
310             Build WebSocket message.
311              
312             =head2 client_read
313              
314             $ws->client_read($data);
315              
316             Read data client-side, used to implement user agents such as L.
317              
318             =head2 client_write
319              
320             my $bytes = $ws->client_write;
321              
322             Write data client-side, used to implement user agents such as L.
323              
324             =head2 closed
325              
326             $tx = $tx->closed;
327              
328             Same as L, but also indicates that all transaction data has been sent.
329              
330             =head2 connection
331              
332             my $id = $ws->connection;
333              
334             Connection identifier.
335              
336             =head2 finish
337              
338             $ws = $ws->finish;
339             $ws = $ws->finish(1000);
340             $ws = $ws->finish(1003 => 'Cannot accept data!');
341              
342             Close WebSocket connection gracefully.
343              
344             =head2 is_websocket
345              
346             my $bool = $ws->is_websocket;
347              
348             True, this is a L object.
349              
350             =head2 kept_alive
351              
352             my $bool = $ws->kept_alive;
353              
354             Connection has been kept alive.
355              
356             =head2 local_address
357              
358             my $address = $ws->local_address;
359              
360             Local interface address.
361              
362             =head2 local_port
363              
364             my $port = $ws->local_port;
365              
366             Local interface port.
367              
368             =head2 parse_message
369              
370             $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
371              
372             Parse WebSocket message.
373              
374             =head2 protocol
375              
376             my $proto = $ws->protocol;
377              
378             Return negotiated subprotocol or C.
379              
380             =head2 remote_address
381              
382             my $address = $ws->remote_address;
383              
384             Remote interface address.
385              
386             =head2 remote_port
387              
388             my $port = $ws->remote_port;
389              
390             Remote interface port.
391              
392             =head2 req
393              
394             my $req = $ws->req;
395              
396             Handshake request, usually a L object.
397              
398             =head2 res
399              
400             my $res = $ws->res;
401              
402             Handshake response, usually a L object.
403              
404             =head2 resume
405              
406             $ws = $ws->resume;
407              
408             Resume L transaction.
409              
410             =head2 send
411              
412             $ws = $ws->send({binary => $bytes});
413             $ws = $ws->send({text => $bytes});
414             $ws = $ws->send({json => {test => [1, 2, 3]}});
415             $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
416             $ws = $ws->send($chars);
417             $ws = $ws->send($chars => sub {...});
418              
419             Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
420             written.
421              
422             # Send "Ping" frame
423             use Mojo::WebSocket qw(WS_PING);
424             $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
425              
426             =head2 server_read
427              
428             $ws->server_read($data);
429              
430             Read data server-side, used to implement web servers such as L.
431              
432             =head2 server_write
433              
434             my $bytes = $ws->server_write;
435              
436             Write data server-side, used to implement web servers such as L.
437              
438             =head2 with_compression
439              
440             $ws->with_compression;
441              
442             Negotiate C extension for this WebSocket connection.
443              
444             =head2 with_protocols
445              
446             my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
447              
448             Negotiate subprotocol for this WebSocket connection.
449              
450             =head1 SEE ALSO
451              
452             L, L, L.
453              
454             =cut