File Coverage

blib/lib/PAGI/Middleware/WebSocket/RateLimit.pm
Criterion Covered Total %
statement 71 85 83.5
branch 15 24 62.5
condition 11 18 61.1
subroutine 9 11 81.8
pod 1 1 100.0
total 107 139 76.9


line stmt bran cond sub pod time code
1             package PAGI::Middleware::WebSocket::RateLimit;
2              
3 2     2   371919 use strict;
  2         2  
  2         70  
4 2     2   7 use warnings;
  2         41  
  2         113  
5 2     2   8 use parent 'PAGI::Middleware';
  2         3  
  2         10  
6 2     2   175 use Future::AsyncAwait;
  2         3  
  2         9  
7 2     2   105 use Time::HiRes qw(time);
  2         3  
  2         12  
8              
9             =head1 NAME
10              
11             PAGI::Middleware::WebSocket::RateLimit - Rate limiting for WebSocket connections
12              
13             =head1 SYNOPSIS
14              
15             use PAGI::Middleware::Builder;
16              
17             my $app = builder {
18             enable 'WebSocket::RateLimit',
19             messages_per_second => 10,
20             bytes_per_second => 65536,
21             burst_multiplier => 2;
22             $my_app;
23             };
24              
25             =head1 DESCRIPTION
26              
27             PAGI::Middleware::WebSocket::RateLimit enforces rate limits on incoming
28             WebSocket messages. Connections exceeding limits can be throttled or
29             closed.
30              
31             =head1 CONFIGURATION
32              
33             =over 4
34              
35             =item * messages_per_second (default: 100)
36              
37             Maximum incoming messages per second.
38              
39             =item * bytes_per_second (default: 1048576)
40              
41             Maximum incoming bytes per second (1MB default).
42              
43             =item * burst_multiplier (default: 2)
44              
45             Allow bursts up to N times the limit before enforcing.
46              
47             =item * on_limit_exceeded (optional)
48              
49             Callback when limit exceeded. Receives ($scope, $type, $current, $limit).
50             Return true to close connection, false to just drop the message.
51              
52             =item * close_code (default: 1008)
53              
54             WebSocket close code when closing due to rate limit (Policy Violation).
55              
56             =item * close_reason (default: 'Rate limit exceeded')
57              
58             Close reason message.
59              
60             =item * on_error (optional)
61              
62             Callback invoked when a send operation fails. Receives C<($error, $event)>
63             where C<$event> is the event hash that failed to send. Default behavior
64             is to warn to STDERR.
65              
66             on_error => sub {
67             my ($error, $event) = @_;
68             $logger->warn("RateLimit close send failed: $error");
69             }
70              
71             =back
72              
73             =cut
74              
75             sub _init {
76 6     6   13 my ($self, $config) = @_;
77              
78 6   100     29 $self->{messages_per_second} = $config->{messages_per_second} // 100;
79 6   100     20 $self->{bytes_per_second} = $config->{bytes_per_second} // 1048576;
80 6   100     18 $self->{burst_multiplier} = $config->{burst_multiplier} // 2;
81 6         11 $self->{on_limit_exceeded} = $config->{on_limit_exceeded};
82 6   50     17 $self->{close_code} = $config->{close_code} // 1008;
83 6   50     18 $self->{close_reason} = $config->{close_reason} // 'Rate limit exceeded';
84             $self->{on_error} = $config->{on_error} // sub {
85 0     0   0 my ($error, $event) = @_;
86 0         0 warn "WebSocket::RateLimit send failed: $error\n";
87 6   66     33 };
88             }
89              
90             sub wrap {
91 4     4 1 38 my ($self, $app) = @_;
92              
93 4     4   103 return async sub {
94 4         7 my ($scope, $receive, $send) = @_;
95             # Only apply to WebSocket connections
96 4 100       11 if ($scope->{type} ne 'websocket') {
97 1         3 await $app->($scope, $receive, $send);
98 1         89 return;
99             }
100              
101 3         4 my $msg_limit = $self->{messages_per_second};
102 3         4 my $byte_limit = $self->{bytes_per_second};
103 3         6 my $burst = $self->{burst_multiplier};
104              
105             # Token bucket state
106 3         4 my $msg_tokens = $msg_limit * $burst;
107 3         3 my $byte_tokens = $byte_limit * $burst;
108 3         7 my $last_update = time();
109              
110 3         3 my $closed = 0;
111              
112             # Refill tokens based on elapsed time
113             my $refill_tokens = sub {
114 5         8 my $now = time();
115 5         36 my $elapsed = $now - $last_update;
116 5         9 $last_update = $now;
117              
118 5         9 $msg_tokens += $elapsed * $msg_limit;
119 5 100       11 $msg_tokens = $msg_limit * $burst if $msg_tokens > $msg_limit * $burst;
120              
121 5         33 $byte_tokens += $elapsed * $byte_limit;
122 5 50       12 $byte_tokens = $byte_limit * $burst if $byte_tokens > $byte_limit * $burst;
123 3         9 };
124              
125             # Wrap receive to apply rate limiting
126 5         194 my $wrapped_receive = async sub {
127 5 50       10 return { type => 'websocket.disconnect' } if $closed;
128              
129 5         7 RECV: while (1) {
130 6         8 my $event = await $receive->();
131              
132 6 50       144 return $event if $closed;
133 6 100       16 return $event if $event->{type} ne 'websocket.receive';
134              
135 5         8 $refill_tokens->();
136              
137             # Calculate message size
138 5   33     12 my $data = $event->{text} // $event->{bytes} // '';
      0        
139 5         7 my $size = length($data);
140              
141             # Check message rate
142 5 100       11 if ($msg_tokens < 1) {
143 1         4 my $should_close = $self->_handle_limit_exceeded(
144             $scope, $send, 'messages', 0, $msg_limit
145             );
146 1 50       2 if ($should_close) {
147 0         0 $closed = 1;
148 0         0 return { type => 'websocket.disconnect' };
149             }
150             # Drop message but continue
151 1         3 next RECV;
152             }
153              
154             # Check byte rate
155 4 50       5 if ($byte_tokens < $size) {
156 0         0 my $should_close = $self->_handle_limit_exceeded(
157             $scope, $send, 'bytes', $byte_tokens, $byte_limit
158             );
159 0 0       0 if ($should_close) {
160 0         0 $closed = 1;
161 0         0 return { type => 'websocket.disconnect' };
162             }
163             # Drop message but continue
164 0         0 next RECV;
165             }
166              
167             # Consume tokens
168 4         6 $msg_tokens -= 1;
169 4         4 $byte_tokens -= $size;
170              
171 4         19 return $event;
172             }
173 3         9 };
174              
175             # Add rate limit info to scope
176 3         12 my $new_scope = {
177             %$scope,
178             'pagi.websocket.rate_limit' => {
179             messages_per_second => $msg_limit,
180             bytes_per_second => $byte_limit,
181             burst_multiplier => $burst,
182             },
183             };
184              
185 3         8 await $app->($new_scope, $wrapped_receive, $send);
186 4         16 };
187             }
188              
189             sub _handle_limit_exceeded {
190 1     1   3 my ($self, $scope, $send, $type, $current, $limit) = @_;
191              
192 1         2 my $should_close = 1; # Default to closing
193              
194 1 50       3 if ($self->{on_limit_exceeded}) {
195 1         3 $should_close = $self->{on_limit_exceeded}->($scope, $type, $current, $limit);
196             }
197              
198 1 50       5 if ($should_close) {
199             # Send close frame
200             my $close_event = {
201             type => 'websocket.close',
202             code => $self->{close_code},
203             reason => $self->{close_reason},
204 0         0 };
205 0         0 my $on_error = $self->{on_error};
206             $send->($close_event)->on_fail(sub {
207 0     0   0 my ($error) = @_;
208 0         0 $on_error->($error, $close_event);
209 0         0 })->retain;
210             }
211              
212 1         2 return $should_close;
213             }
214              
215             1;
216              
217             __END__