File Coverage

lib/PAGI/App/Throttle.pm
Criterion Covered Total %
statement 57 63 90.4
branch 10 16 62.5
condition 8 15 53.3
subroutine 7 9 77.7
pod 3 5 60.0
total 85 108 78.7


line stmt bran cond sub pod time code
1             package PAGI::App::Throttle;
2              
3 1     1   437 use strict;
  1         1  
  1         29  
4 1     1   3 use warnings;
  1         1  
  1         35  
5 1     1   3 use Future::AsyncAwait;
  1         1  
  1         5  
6              
7             =head1 NAME
8              
9             PAGI::App::Throttle - Rate-limited request processing
10              
11             =head1 SYNOPSIS
12              
13             use PAGI::App::Throttle;
14              
15             my $app = PAGI::App::Throttle->new(
16             app => $inner_app,
17             rate => 10, # requests per second
18             burst => 20, # max burst
19             key_for => sub { $_[0]->{client}[0] }, # key by IP
20             )->to_app;
21              
22             =cut
23              
24             # Token bucket state per key
25             my %buckets;
26              
27             sub new {
28 4     4 0 230 my ($class, %args) = @_;
29              
30             return bless {
31             app => $args{app},
32             rate => $args{rate} // 10,
33             burst => $args{burst} // ($args{rate} // 10),
34             key_for => $args{key_for},
35             on_limit => $args{on_limit},
36 4   50     51 headers => $args{headers} // 1,
      0        
      33        
      100        
37             }, $class;
38             }
39              
40             sub to_app {
41 4     4 0 5 my ($self) = @_;
42              
43 4         7 my $app = $self->{app};
44 4         3 my $rate = $self->{rate};
45 4         5 my $burst = $self->{burst};
46 4         2 my $key_for = $self->{key_for};
47 4         5 my $on_limit = $self->{on_limit};
48 4         5 my $add_headers = $self->{headers};
49              
50 7     7   401 return async sub {
51 7         11 my ($scope, $receive, $send) = @_;
52 7 100       13 my $key = $key_for ? $key_for->($scope) : 'global';
53 7   50     48 $key //= 'global';
54              
55             # Initialize bucket
56 7   100     26 $buckets{$key} //= {
57             tokens => $burst,
58             last_time => time(),
59             };
60              
61 7         6 my $bucket = $buckets{$key};
62              
63             # Refill tokens based on elapsed time
64 7         8 my $now = time();
65 7         9 my $elapsed = $now - $bucket->{last_time};
66 7         9 $bucket->{tokens} += $elapsed * $rate;
67 7 50       13 $bucket->{tokens} = $burst if $bucket->{tokens} > $burst;
68 7         9 $bucket->{last_time} = $now;
69              
70             # Check if request allowed
71 7 100       9 if ($bucket->{tokens} >= 1) {
72 5         5 $bucket->{tokens} -= 1;
73              
74             # Wrap send to add rate limit headers
75 5         4 my $wrapped_send = $send;
76 5 50       9 if ($add_headers) {
77 10         359 $wrapped_send = async sub {
78 10         9 my ($event) = @_;
79 10 100       15 if ($event->{type} eq 'http.response.start') {
80 5   50     6 my @headers = @{$event->{headers} // []};
  5         8  
81 5         8 push @headers, ['x-ratelimit-limit', $burst];
82 5         8 push @headers, ['x-ratelimit-remaining', int($bucket->{tokens})];
83 5         14 push @headers, ['x-ratelimit-reset', int($now + ($burst - $bucket->{tokens}) / $rate)];
84 5         13 $event = { %$event, headers => \@headers };
85             }
86 10         15 await $send->($event);
87 5         31 };
88             }
89              
90 5         9 await $app->($scope, $receive, $wrapped_send);
91             } else {
92             # Rate limited
93 2         4 my $retry_after = int((1 - $bucket->{tokens}) / $rate) + 1;
94              
95 2 50       4 if ($on_limit) {
96 0         0 await $on_limit->($scope, $receive, $send, $retry_after);
97             } else {
98 2         4 my @headers = (
99             ['content-type', 'text/plain'],
100             ['retry-after', $retry_after],
101             );
102              
103 2 50       4 if ($add_headers) {
104 2         3 push @headers, ['x-ratelimit-limit', $burst];
105 2         2 push @headers, ['x-ratelimit-remaining', 0];
106 2         3 push @headers, ['x-ratelimit-reset', int($now + $retry_after)];
107             }
108              
109             await $send->({
110             type => 'http.response.start',
111             status => 429,
112             headers => \@headers,
113 2         6 });
114 2         50 await $send->({
115             type => 'http.response.body',
116             body => 'Too Many Requests',
117             more => 0,
118             });
119             }
120             }
121 4         20 };
122             }
123              
124             # Class method to reset a key's bucket
125             sub reset {
126 0     0 1 0 my ($class, $key) = @_;
127              
128 0         0 delete $buckets{$key};
129             }
130              
131             # Class method to reset all buckets
132             sub reset_all {
133 4     4 1 6182 my ($class) = @_;
134              
135 4         11 %buckets = ();
136             }
137              
138             # Class method to get bucket info
139             sub info {
140 0     0 1   my ($class, $key) = @_;
141              
142 0 0         return undef unless $buckets{$key};
143             return {
144             tokens => $buckets{$key}{tokens},
145             last_time => $buckets{$key}{last_time},
146 0           };
147             }
148              
149             1;
150              
151             __END__