File Coverage

blib/lib/Mojo/Transaction/WebSocket.pm
Criterion Covered Total %
statement 99 103 96.1
branch 55 58 94.8
condition 20 28 71.4
subroutine 25 29 86.2
pod 22 22 100.0
total 221 240 92.0


line stmt bran cond sub pod time code
1             package Mojo::Transaction::WebSocket;
2 59     59   155739 use Mojo::Base 'Mojo::Transaction';
  59         133  
  59         516  
3              
4 59     59   798 use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
  59         342  
  59         4510  
5 59     59   427 use List::Util qw(first);
  59         135  
  59         4719  
6 59     59   407 use Mojo::JSON qw(encode_json j);
  59         182  
  59         3834  
7 59     59   346 use Mojo::Util qw(decode encode trim);
  59         135  
  59         4342  
8 59     59   39044 use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
  59         218  
  59         166065  
9              
10             has [qw(compressed established handshake masked)];
11             has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12              
13             sub build_message {
14 173     173 1 23547 my ($self, $frame) = @_;
15              
16             # Text
17 173 100       740 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18              
19             # JSON
20 173 100       636 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21              
22             # Raw text or binary
23 173 100       457 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
  144         604  
24 29         120 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25              
26             # "permessage-deflate" extension
27 173 100       755 return $frame unless $self->compressed;
28             my $deflate = $self->{deflate}
29 8   66     104 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30 8         43138 $deflate->deflate($frame->[5], my $out);
31 8         72 $deflate->flush($out, Z_SYNC_FLUSH);
32 8         729 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
33              
34 8         39 return $frame;
35             }
36              
37 196     196 1 692 sub client_read { shift->server_read(@_) }
38 465     465 1 1267 sub client_write { shift->server_write(@_) }
39              
40             sub closed {
41 134     134 1 469 my $self = shift->completed;
42 134 100       548 my @args = $self->{close} ? (@{$self->{close}}) : (1006);
  119         543  
43 134 100       749 return $self->emit(finish => @args > 1 ? @args : (@args, undef));
44             }
45              
46 9     9 1 31 sub connection { shift->handshake->connection }
47              
48             sub finish {
49 156     156 1 487 my $self = shift;
50              
51 156         539 my $close = $self->{close} = [@_];
52 156 100       563 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
53 156 100       563 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
54 156   100     829 $close->[0] //= 1005;
55 156         715 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
56              
57 156         950 return $self;
58             }
59              
60 579     579 1 2599 sub is_websocket {1}
61              
62 1     1 1 9 sub kept_alive { shift->handshake->kept_alive }
63 0     0 1 0 sub local_address { shift->handshake->local_address }
64 0     0 1 0 sub local_port { shift->handshake->local_port }
65              
66             sub parse_message {
67 272     272 1 4990 my ($self, $frame) = @_;
68              
69 272         1215 $self->emit(frame => $frame);
70              
71             # Ping/Pong
72 272         1043 my $op = $frame->[4];
73 272 100       627 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
74 271 100       711 return undef if $op == WS_PONG;
75              
76             # Close
77 270 100       744 if ($op == WS_CLOSE) {
78 94 100       423 return $self->finish unless length $frame->[5] >= 2;
79 14         104 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
80             }
81              
82             # Append chunk and check message size
83 176 100 66     802 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
  169         776  
84 176         794 $self->{message} .= $frame->[5];
85 176         721 my $max = $self->max_websocket_size;
86 176 100       637 return $self->finish(1009) if length $self->{message} > $max;
87              
88             # No FIN bit (Continuation)
89 175 100       545 return undef unless $frame->[0];
90              
91             # "permessage-deflate" extension (handshake and RSV1)
92 168         420 my $msg = delete $self->{message};
93 168 50 66     629 if ($self->compressed && $self->{pmc}) {
94             my $inflate = $self->{inflate}
95 6   33     83 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
96 6         5571 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
97 6 100       54 return $self->finish(1009) if length $msg;
98 5         301 $msg = $out;
99             }
100              
101 167 100       664 $self->emit(json => j($msg)) if $self->has_subscribers('json');
102 167         438 $op = delete $self->{op};
103 167 100       759 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
104 167 50       551 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
    100          
105             }
106              
107 12     12 1 41 sub protocol { shift->res->headers->sec_websocket_protocol }
108              
109 0     0 1 0 sub remote_address { shift->handshake->remote_address }
110 0     0 1 0 sub remote_port { shift->handshake->remote_port }
111 506     506 1 1270 sub req { shift->handshake->req }
112 642     642 1 1817 sub res { shift->handshake->res }
113              
114 119 50   119 1 401 sub resume { $_[0]->handshake->resume and return $_[0] }
115              
116             sub send {
117 331     331 1 933 my ($self, $msg, $cb) = @_;
118 331 100       1087 $self->once(drain => $cb) if $cb;
119 331 100       1395 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
120 331         1610 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
121 331         1573 return $self->emit('resume');
122             }
123              
124             sub server_read {
125 312     312 1 840 my ($self, $chunk) = @_;
126              
127 312         1164 $self->{read} .= $chunk;
128 312         1068 my $max = $self->max_websocket_size;
129 312         1369 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
130 269 100 50     710 $self->finish(1009) and last unless ref $frame;
131 266         878 $self->parse_message($frame);
132             }
133              
134 312         1157 $self->emit('resume');
135             }
136              
137             sub server_write {
138 930     930 1 1634 my $self = shift;
139 930 100 100     5516 $self->emit('drain') unless length($self->{write} //= '');
140 930 100 100     3974 $self->completed if !length $self->{write} && $self->{closing};
141 930         3074 return delete $self->{write};
142             }
143              
144             sub with_compression {
145 20     20 1 49 my $self = shift;
146              
147             # "permessage-deflate" extension
148 20 100 33     75 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
      100        
149             if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
150             }
151              
152             sub with_protocols {
153 6     6 1 15 my $self = shift;
154              
155 6   100     24 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
  8         26  
156 6 100   19   85 return undef unless defined(my $proto = first { $protos{$_} } @_);
  19         58  
157              
158 3         49 $self->res->headers->sec_websocket_protocol($proto);
159 3         13 return $proto;
160             }
161              
162             1;
163              
164             =encoding utf8
165              
166             =head1 NAME
167              
168             Mojo::Transaction::WebSocket - WebSocket transaction
169              
170             =head1 SYNOPSIS
171              
172             use Mojo::Transaction::WebSocket;
173              
174             # Send and receive WebSocket messages
175             my $ws = Mojo::Transaction::WebSocket->new;
176             $ws->send('Hello World!');
177             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
178             $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
179              
180             =head1 DESCRIPTION
181              
182             L is a container for WebSocket transactions, based on L
183             6455|https://tools.ietf.org/html/rfc6455> and L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L and can emit the following new ones.
188              
189             =head2 binary
190              
191             $ws->on(binary => sub ($ws, $bytes) {...});
192              
193             Emitted when a complete WebSocket binary message has been received.
194              
195             $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
196              
197             =head2 drain
198              
199             $ws->on(drain => sub ($ws) {...});
200              
201             Emitted once all data has been sent.
202              
203             $ws->on(drain => sub ($ws) { $ws->send(time) });
204              
205             =head2 finish
206              
207             $ws->on(finish => sub ($ws, $code, $reason) {...});
208              
209             Emitted when the WebSocket connection has been closed.
210              
211             =head2 frame
212              
213             $ws->on(frame => sub ($ws, $frame) {...});
214              
215             Emitted when a WebSocket frame has been received.
216              
217             $ws->on(frame => sub ($ws, $frame) {
218             say "FIN: $frame->[0]";
219             say "RSV1: $frame->[1]";
220             say "RSV2: $frame->[2]";
221             say "RSV3: $frame->[3]";
222             say "Opcode: $frame->[4]";
223             say "Payload: $frame->[5]";
224             });
225              
226             =head2 json
227              
228             $ws->on(json => sub ($ws, $json) {...});
229              
230             Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
231             decoded. Note that this event only gets emitted when it has at least one subscriber.
232              
233             $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
234              
235             =head2 message
236              
237             $ws->on(message => sub ($ws, $msg) {...});
238              
239             Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
240             this event only gets emitted when it has at least one subscriber.
241              
242             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
243              
244             =head2 resume
245              
246             $tx->on(resume => sub ($tx) {...});
247              
248             Emitted when transaction is resumed.
249              
250             =head2 text
251              
252             $ws->on(text => sub ($ws, $bytes) {...});
253              
254             Emitted when a complete WebSocket text message has been received.
255              
256             $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
257              
258             =head1 ATTRIBUTES
259              
260             L inherits all attributes from L and implements the following new
261             ones.
262              
263             =head2 compressed
264              
265             my $bool = $ws->compressed;
266             $ws = $ws->compressed($bool);
267              
268             Compress messages with C extension.
269              
270             =head2 established
271              
272             my $bool = $ws->established;
273             $ws = $ws->established($bool);
274              
275             WebSocket connection established.
276              
277             =head2 handshake
278              
279             my $handshake = $ws->handshake;
280             $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
281              
282             The original handshake transaction, usually a L object.
283              
284             =head2 masked
285              
286             my $bool = $ws->masked;
287             $ws = $ws->masked($bool);
288              
289             Mask outgoing frames with XOR cipher and a random 32-bit key.
290              
291             =head2 max_websocket_size
292              
293             my $size = $ws->max_websocket_size;
294             $ws = $ws->max_websocket_size(1024);
295              
296             Maximum WebSocket message size in bytes, defaults to the value of the C environment variable
297             or C<262144> (256KiB).
298              
299             =head1 METHODS
300              
301             L inherits all methods from L and implements the following new ones.
302              
303             =head2 build_message
304              
305             my $frame = $ws->build_message({binary => $bytes});
306             my $frame = $ws->build_message({text => $bytes});
307             my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
308             my $frame = $ws->build_message($chars);
309              
310             Build WebSocket message.
311              
312             =head2 client_read
313              
314             $ws->client_read($data);
315              
316             Read data client-side, used to implement user agents such as L.
317              
318             =head2 client_write
319              
320             my $bytes = $ws->client_write;
321              
322             Write data client-side, used to implement user agents such as L.
323              
324             =head2 closed
325              
326             $tx = $tx->closed;
327              
328             Same as L, but also indicates that all transaction data has been sent.
329              
330             =head2 connection
331              
332             my $id = $ws->connection;
333              
334             Connection identifier.
335              
336             =head2 finish
337              
338             $ws = $ws->finish;
339             $ws = $ws->finish(1000);
340             $ws = $ws->finish(1003 => 'Cannot accept data!');
341              
342             Close WebSocket connection gracefully.
343              
344             =head2 is_websocket
345              
346             my $bool = $ws->is_websocket;
347              
348             True, this is a L object.
349              
350             =head2 kept_alive
351              
352             my $bool = $ws->kept_alive;
353              
354             Connection has been kept alive.
355              
356             =head2 local_address
357              
358             my $address = $ws->local_address;
359              
360             Local interface address.
361              
362             =head2 local_port
363              
364             my $port = $ws->local_port;
365              
366             Local interface port.
367              
368             =head2 parse_message
369              
370             $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
371              
372             Parse WebSocket message.
373              
374             =head2 protocol
375              
376             my $proto = $ws->protocol;
377              
378             Return negotiated subprotocol or C.
379              
380             =head2 remote_address
381              
382             my $address = $ws->remote_address;
383              
384             Remote interface address.
385              
386             =head2 remote_port
387              
388             my $port = $ws->remote_port;
389              
390             Remote interface port.
391              
392             =head2 req
393              
394             my $req = $ws->req;
395              
396             Handshake request, usually a L object.
397              
398             =head2 res
399              
400             my $res = $ws->res;
401              
402             Handshake response, usually a L object.
403              
404             =head2 resume
405              
406             $ws = $ws->resume;
407              
408             Resume L transaction.
409              
410             =head2 send
411              
412             $ws = $ws->send({binary => $bytes});
413             $ws = $ws->send({text => $bytes});
414             $ws = $ws->send({json => {test => [1, 2, 3]}});
415             $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
416             $ws = $ws->send($chars);
417             $ws = $ws->send($chars => sub {...});
418              
419             Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
420             written.
421              
422             # Send "Ping" frame
423             use Mojo::WebSocket qw(WS_PING);
424             $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
425              
426             =head2 server_read
427              
428             $ws->server_read($data);
429              
430             Read data server-side, used to implement web servers such as L.
431              
432             =head2 server_write
433              
434             my $bytes = $ws->server_write;
435              
436             Write data server-side, used to implement web servers such as L.
437              
438             =head2 with_compression
439              
440             $ws->with_compression;
441              
442             Negotiate C extension for this WebSocket connection.
443              
444             =head2 with_protocols
445              
446             my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
447              
448             Negotiate subprotocol for this WebSocket connection.
449              
450             =head1 SEE ALSO
451              
452             L, L, L.
453              
454             =cut