File Coverage

lib/PAGI/App/Throttle.pm
Criterion Covered Total %
statement 57 63 90.4
branch 10 16 62.5
condition 8 15 53.3
subroutine 7 9 77.7
pod 3 5 60.0
total 85 108 78.7


line stmt bran cond sub pod time code
1             package PAGI::App::Throttle;
2              
3 1     1   444 use strict;
  1         2  
  1         31  
4 1     1   3 use warnings;
  1         1  
  1         34  
5 1     1   3 use Future::AsyncAwait;
  1         1  
  1         5  
6              
7             =head1 NAME
8              
9             PAGI::App::Throttle - Rate-limited request processing
10              
11             =head1 SYNOPSIS
12              
13             use PAGI::App::Throttle;
14              
15             my $app = PAGI::App::Throttle->new(
16             app => $inner_app,
17             rate => 10, # requests per second
18             burst => 20, # max burst
19             key_for => sub { $_[0]->{client}[0] }, # key by IP
20             )->to_app;
21              
22             =cut
23              
24             # Token bucket state per key
25             my %buckets;
26              
27             sub new {
28 4     4 0 230 my ($class, %args) = @_;
29              
30             return bless {
31             app => $args{app},
32             rate => $args{rate} // 10,
33             burst => $args{burst} // ($args{rate} // 10),
34             key_for => $args{key_for},
35             on_limit => $args{on_limit},
36 4   50     45 headers => $args{headers} // 1,
      0        
      33        
      100        
37             }, $class;
38             }
39              
40             sub to_app {
41 4     4 0 5 my ($self) = @_;
42              
43 4         8 my $app = $self->{app};
44 4         4 my $rate = $self->{rate};
45 4         5 my $burst = $self->{burst};
46 4         5 my $key_for = $self->{key_for};
47 4         4 my $on_limit = $self->{on_limit};
48 4         5 my $add_headers = $self->{headers};
49              
50 7     7   425 return async sub {
51 7         10 my ($scope, $receive, $send) = @_;
52 7 100       11 my $key = $key_for ? $key_for->($scope) : 'global';
53 7   50     20 $key //= 'global';
54              
55             # Initialize bucket
56 7   100     20 $buckets{$key} //= {
57             tokens => $burst,
58             last_time => time(),
59             };
60              
61 7         7 my $bucket = $buckets{$key};
62              
63             # Refill tokens based on elapsed time
64 7         8 my $now = time();
65 7         8 my $elapsed = $now - $bucket->{last_time};
66 7         12 $bucket->{tokens} += $elapsed * $rate;
67 7 50       11 $bucket->{tokens} = $burst if $bucket->{tokens} > $burst;
68 7         7 $bucket->{last_time} = $now;
69              
70             # Check if request allowed
71 7 100       12 if ($bucket->{tokens} >= 1) {
72 5         33 $bucket->{tokens} -= 1;
73              
74             # Wrap send to add rate limit headers
75 5         6 my $wrapped_send = $send;
76 5 50       8 if ($add_headers) {
77 10         349 $wrapped_send = async sub {
78 10         11 my ($event) = @_;
79 10 100       16 if ($event->{type} eq 'http.response.start') {
80 5   50     5 my @headers = @{$event->{headers} // []};
  5         10  
81 5         6 push @headers, ['x-ratelimit-limit', $burst];
82 5         9 push @headers, ['x-ratelimit-remaining', int($bucket->{tokens})];
83 5         14 push @headers, ['x-ratelimit-reset', int($now + ($burst - $bucket->{tokens}) / $rate)];
84 5         15 $event = { %$event, headers => \@headers };
85             }
86 10         15 await $send->($event);
87 5         13 };
88             }
89              
90 5         10 await $app->($scope, $receive, $wrapped_send);
91             } else {
92             # Rate limited
93 2         4 my $retry_after = int((1 - $bucket->{tokens}) / $rate) + 1;
94              
95 2 50       3 if ($on_limit) {
96 0         0 await $on_limit->($scope, $receive, $send, $retry_after);
97             } else {
98 2         5 my @headers = (
99             ['content-type', 'text/plain'],
100             ['retry-after', $retry_after],
101             );
102              
103 2 50       4 if ($add_headers) {
104 2         3 push @headers, ['x-ratelimit-limit', $burst];
105 2         4 push @headers, ['x-ratelimit-remaining', 0];
106 2         3 push @headers, ['x-ratelimit-reset', int($now + $retry_after)];
107             }
108              
109             await $send->({
110             type => 'http.response.start',
111             status => 429,
112             headers => \@headers,
113 2         6 });
114 2         51 await $send->({
115             type => 'http.response.body',
116             body => 'Too Many Requests',
117             more => 0,
118             });
119             }
120             }
121 4         17 };
122             }
123              
124             # Class method to reset a key's bucket
125             sub reset {
126 0     0 1 0 my ($class, $key) = @_;
127              
128 0         0 delete $buckets{$key};
129             }
130              
131             # Class method to reset all buckets
132             sub reset_all {
133 4     4 1 6413 my ($class) = @_;
134              
135 4         12 %buckets = ();
136             }
137              
138             # Class method to get bucket info
139             sub info {
140 0     0 1   my ($class, $key) = @_;
141              
142 0 0         return undef unless $buckets{$key};
143             return {
144             tokens => $buckets{$key}{tokens},
145             last_time => $buckets{$key}{last_time},
146 0           };
147             }
148              
149             1;
150              
151             __END__