File Coverage

lib/PAGI/Middleware/WebSocket/Compression.pm
Criterion Covered Total %
statement 61 97 62.8
branch 9 42 21.4
condition 8 25 32.0
subroutine 8 8 100.0
pod 1 1 100.0
total 87 173 50.2


line stmt bran cond sub pod time code
1             package PAGI::Middleware::WebSocket::Compression;
2              
3 2     2   205678 use strict;
  2         4  
  2         71  
4 2     2   7 use warnings;
  2         2  
  2         115  
5 2     2   477 use parent 'PAGI::Middleware';
  2         322  
  2         9  
6 2     2   106 use Future::AsyncAwait;
  2         2  
  2         8  
7              
8             =head1 NAME
9              
10             PAGI::Middleware::WebSocket::Compression - WebSocket per-message compression
11              
12             =head1 SYNOPSIS
13              
14             use PAGI::Middleware::Builder;
15              
16             my $app = builder {
17             enable 'WebSocket::Compression',
18             level => 6,
19             min_size => 128;
20             $my_app;
21             };
22              
23             =head1 DESCRIPTION
24              
25             PAGI::Middleware::WebSocket::Compression implements per-message deflate
26             compression (RFC 7692) for WebSocket connections. It negotiates the
27             permessage-deflate extension and transparently compresses/decompresses
28             messages.
29              
30             =head1 CONFIGURATION
31              
32             =over 4
33              
34             =item * level (default: 6)
35              
36             Compression level (1-9). Higher = better compression, slower.
37              
38             =item * min_size (default: 128)
39              
40             Minimum message size to compress. Messages smaller than this are sent
41             uncompressed.
42              
43             =item * server_no_context_takeover (default: 0)
44              
45             If true, don't use context takeover for server-to-client messages.
46              
47             =item * client_no_context_takeover (default: 0)
48              
49             If true, request client to not use context takeover.
50              
51             =back
52              
53             =cut
54              
55             sub _init {
56 3     3   6 my ($self, $config) = @_;
57              
58 3   100     16 $self->{level} = $config->{level} // 6;
59 3   100     16 $self->{min_size} = $config->{min_size} // 128;
60 3   50     25 $self->{server_no_context_takeover} = $config->{server_no_context_takeover} // 0;
61 3   50     11 $self->{client_no_context_takeover} = $config->{client_no_context_takeover} // 0;
62             }
63              
64             sub wrap {
65 3     3 1 25 my ($self, $app) = @_;
66              
67 3     3   42 return async sub {
68 3         4 my ($scope, $receive, $send) = @_;
69             # Only apply to WebSocket connections
70 3 100       9 if ($scope->{type} ne 'websocket') {
71 1         2 await $app->($scope, $receive, $send);
72 1         90 return;
73             }
74              
75             # Check if client offered permessage-deflate
76 2         5 my $extensions = $self->_parse_extensions($scope);
77 2         3 my $has_deflate = exists $extensions->{'permessage-deflate'};
78              
79 2 100       5 unless ($has_deflate) {
80             # No compression support, pass through
81 1         3 await $app->($scope, $receive, $send);
82 1         63 return;
83             }
84              
85             # Try to load Compress::Raw::Zlib
86 1         1 my $have_zlib = eval { require Compress::Raw::Zlib; 1 };
  1         576  
  1         5294  
87 1 50       4 unless ($have_zlib) {
88 0         0 await $app->($scope, $receive, $send);
89 0         0 return;
90             }
91              
92             # Create compression/decompression streams
93 1         1 my ($deflator, $inflator);
94 1         2 my $compression_active = 0;
95              
96             my $init_streams = sub {
97             my ($d_status, $deflate) = Compress::Raw::Zlib::Deflate->new(
98             -Level => $self->{level},
99 1         8 -WindowBits => -15, # Raw deflate
100             -AppendOutput => 1,
101             );
102 1         646 my ($i_status, $inflate) = Compress::Raw::Zlib::Inflate->new(
103             -WindowBits => -15,
104             -AppendOutput => 1,
105             );
106              
107 1 50 33     285 return ($deflate, $inflate) if $d_status == Compress::Raw::Zlib::Z_OK()
108             && $i_status == Compress::Raw::Zlib::Z_OK();
109 1         62 return;
110 1         5 };
111              
112             # Wrap send to handle compression
113 1         8 my $wrapped_send = async sub {
114 1         2 my ($event) = @_;
115 1 50       4 if ($event->{type} eq 'websocket.accept') {
116             # Initialize compression streams
117 1         5 ($deflator, $inflator) = $init_streams->();
118 1         13 $compression_active = defined $deflator;
119              
120 1 50       5 if ($compression_active) {
121             # Add extension to accept response
122 0         0 my @extensions = ('permessage-deflate');
123             push @extensions, 'server_no_context_takeover'
124 0 0       0 if $self->{server_no_context_takeover};
125             push @extensions, 'client_no_context_takeover'
126 0 0       0 if $self->{client_no_context_takeover};
127              
128 0         0 $event = {
129             %$event,
130             extensions => join('; ', @extensions),
131             };
132             }
133 1         4 await $send->($event);
134 1         85 return;
135             }
136              
137 0 0 0     0 if ($event->{type} eq 'websocket.send' && $compression_active) {
138 0         0 my $text = $event->{text};
139 0         0 my $bytes = $event->{bytes};
140 0 0       0 my $data = defined $text ? $text : $bytes;
141              
142             # Only compress if above min_size
143 0 0 0     0 if (defined $data && length($data) >= $self->{min_size}) {
144 0         0 my $compressed = '';
145 0         0 my $status = $deflator->deflate($data, $compressed);
146 0         0 $status = $deflator->flush($compressed, Compress::Raw::Zlib::Z_SYNC_FLUSH());
147              
148 0 0       0 if ($status == Compress::Raw::Zlib::Z_OK()) {
149             # Remove trailing 0x00 0x00 0xff 0xff
150 0         0 $compressed =~ s/\x00\x00\xff\xff$//;
151              
152 0 0       0 $event = {
153             %$event,
154             (defined $text ? (text => undef) : ()),
155             bytes => $compressed,
156             compressed => 1,
157             };
158              
159             # Reset context if no takeover
160 0 0       0 if ($self->{server_no_context_takeover}) {
161 0         0 ($deflator, $inflator) = $init_streams->();
162             }
163             }
164             }
165             }
166              
167 0         0 await $send->($event);
168 1         5 };
169              
170             # Wrap receive to handle decompression
171 0         0 my $wrapped_receive = async sub {
172 0         0 my $event = await $receive->();
173              
174 0 0 0     0 if ($event->{type} eq 'websocket.receive' && $compression_active) {
175 0 0       0 if ($event->{compressed}) {
176 0   0     0 my $data = $event->{bytes} // $event->{text};
177 0 0       0 if (defined $data) {
178             # Add trailer bytes
179 0         0 $data .= "\x00\x00\xff\xff";
180              
181 0         0 my $decompressed = '';
182 0         0 my $status = $inflator->inflate($data, $decompressed);
183              
184 0 0       0 if ($status == Compress::Raw::Zlib::Z_OK()) {
185 0 0       0 if (defined $event->{text}) {
186 0         0 $event = { %$event, text => $decompressed, bytes => undef };
187             } else {
188 0         0 $event = { %$event, bytes => $decompressed };
189             }
190              
191             # Reset context if no takeover
192 0 0       0 if ($self->{client_no_context_takeover}) {
193 0         0 ($deflator, $inflator) = $init_streams->();
194             }
195             }
196             }
197             }
198             }
199              
200 0         0 return $event;
201 1         5 };
202              
203             # Add compression info to scope
204             my $new_scope = {
205             %$scope,
206             'pagi.websocket.compression' => {
207             level => $self->{level},
208             min_size => $self->{min_size},
209 1         8 available => $has_deflate,
210             },
211             };
212              
213 1         4 await $app->($new_scope, $wrapped_receive, $wrapped_send);
214 3         16 };
215             }
216              
217             sub _parse_extensions {
218 2     2   4 my ($self, $scope) = @_;
219              
220 2         3 my %extensions;
221              
222 2   50     3 for my $h (@{$scope->{headers} // []}) {
  2         5  
223 1 50       5 next unless lc($h->[0]) eq 'sec-websocket-extensions';
224              
225 1         4 for my $ext (split /\s*,\s*/, $h->[1]) {
226 1         3 my ($name, @params) = split /\s*;\s*/, $ext;
227 1         3 $extensions{$name} = \@params;
228             }
229             }
230              
231 2         4 return \%extensions;
232             }
233              
234             1;
235              
236             __END__