File Coverage

blib/lib/PAGI/Server/TransportState.pm
Criterion Covered Total %
statement 51 52 98.0
branch 16 20 80.0
condition 2 2 100.0
subroutine 12 12 100.0
pod 6 6 100.0
total 87 92 94.5


line stmt bran cond sub pod time code
1             package PAGI::Server::TransportState;
2              
3 110     110   333038 use strict;
  110         262  
  110         3756  
4 110     110   419 use warnings;
  110         181  
  110         6514  
5              
6             our $VERSION = '0.002001';
7              
8 110     110   458 use Scalar::Util qw(weaken);
  110         155  
  110         80395  
9              
10             =head1 NAME
11              
12             PAGI::Server::TransportState - Outbound flow-control introspection for a connection
13              
14             =head1 SYNOPSIS
15              
16             # Built by the server from an outbound-buffer source (not by the app):
17             my $transport = PAGI::Server::TransportState->new(
18             measure => sub { $conn->_get_write_buffer_size },
19             high => sub { $conn->{write_high_watermark} },
20             low => sub { $conn->{write_low_watermark} },
21             arm_drain => sub { my $fire = shift; $conn->_wait_for_drain->on_ready($fire) },
22             );
23              
24             # Read by the application via the scope:
25             my $transport = $scope->{'pagi.transport'};
26              
27             # Bytes queued for the client but not yet written to the network
28             my $pending = $transport->buffered_amount;
29              
30             # The backpressure band (sends block at high, resume at low)
31             my $ceiling = $transport->high_water_mark;
32             my $floor = $transport->low_water_mark;
33              
34             =head1 DESCRIPTION
35              
36             PAGI::Server::TransportState is the object placed in the C scope
37             key. It gives an application a synchronous, read-only view of B
38             control> -- how much data the server has queued for the client but not yet
39             written to the network -- so it can conflate, coalesce, shed load, or disconnect
40             a slow client instead of only blocking until the buffer drains. It is the
41             server-side analogue of the browser WebSocket API's C.
42              
43             The handle is source-agnostic: it measures the outbound buffer through coderefs
44             supplied by the server, never by reaching into a connection itself. That lets
45             the same hysteresis logic serve different transports -- under HTTP/1.1 the
46             source reads the shared TCP write buffer, while under HTTP/2 it reads a
47             per-stream send queue. All reads are live: each call invokes the source and
48             reports its current state. See the "Transport Flow Control" section in
49             L for the full specification.
50              
51             =head1 METHODS
52              
53             =head2 new
54              
55             my $transport = PAGI::Server::TransportState->new(
56             measure => sub { ... }, # current buffered bytes
57             high => $bytes, # high-water mark (value or coderef)
58             low => $bytes, # low-water mark (value or coderef)
59             arm_drain => sub { my $fire = shift; ... },
60             );
61              
62             Creates a transport-state handle. B
63             application> -- apps receive the finished handle via the C scope
64             key. The arguments describe the outbound buffer source:
65              
66             =over 4
67              
68             =item * C -- coderef returning the current buffered byte count.
69             C/missing is treated as C<0>.
70              
71             =item * C / C -- the backpressure band. Each may be a plain value or
72             a coderef returning the current mark; C means unavailable.
73              
74             =item * C -- coderef invoked when the buffer crosses the high mark. It
75             receives a single C<$fire> callback and must invoke it exactly once when the
76             buffer next falls below the low mark, so C fires and the cycle re-arms.
77              
78             =back
79              
80             =cut
81              
82             sub new {
83 268     268 1 18420 my ($class, %args) = @_;
84              
85             my $self = bless {
86             _measure => $args{measure}, # coderef -> current buffered bytes
87             _high => $args{high}, # value or coderef -> high mark (undef ok)
88             _low => $args{low}, # value or coderef -> low mark (undef ok)
89             _arm_drain => $args{arm_drain}, # coderef: (fire) -> call fire once when below low
90              
91             # Backpressure callbacks + hysteresis state. _above_high is true once
92             # the buffer has crossed the high mark and not yet drained below the low
93             # mark, so on_high_water is edge-triggered (fires once per cycle).
94 268         3807 _high_water_callbacks => [],
95             _drain_callbacks => [],
96             _above_high => 0,
97             }, $class;
98              
99 268         4725 return $self;
100             }
101              
102             =head2 buffered_amount
103              
104             my $pending = $transport->buffered_amount;
105              
106             Returns the number of bytes queued for the client but not yet written to the
107             network, as an integer; C<0> when the send buffer is fully drained (or once the
108             underlying connection has gone away). A synchronous, non-blocking,
109             non-destructive read.
110              
111             =cut
112              
113             sub buffered_amount {
114 292     292 1 460 my $self = shift;
115 292         489 my $measure = $self->{_measure};
116 292 100       640 return 0 unless $measure;
117 291   100     601 return $measure->() // 0;
118             }
119              
120             =head2 high_water_mark
121              
122             my $ceiling = $transport->high_water_mark;
123              
124             Returns the buffered-byte threshold at or above which the server applies
125             backpressure (a C<$send> that would exceed it blocks until the buffer drains),
126             or C if unavailable. Applications use it to threshold relative to the
127             ceiling rather than hard-coding a byte count.
128              
129             =cut
130              
131             sub high_water_mark {
132 291     291 1 653 my $self = shift;
133 291         601 my $high = $self->{_high};
134 291 100       1114 return ref $high eq 'CODE' ? $high->() : $high;
135             }
136              
137             =head2 low_water_mark
138              
139             my $floor = $transport->low_water_mark;
140              
141             Returns the buffered-byte threshold the buffer must fall back to before the
142             server releases backpressure (the drain point), or C if unavailable.
143              
144             =cut
145              
146             sub low_water_mark {
147 6     6 1 18 my $self = shift;
148 6         9 my $low = $self->{_low};
149 6 100       21 return ref $low eq 'CODE' ? $low->() : $low;
150             }
151              
152             =head2 on_high_water
153              
154             $transport->on_high_water(sub { $source->pause });
155              
156             Registers a callback invoked when the outbound buffer reaches or exceeds
157             L (backpressure engaged). Edge-triggered: it fires once when
158             the buffer crosses up, and not again until the buffer has drained below the low
159             mark and crossed up again. If the buffer is already at or above the mark when
160             the callback is registered, it is invoked immediately. Multiple callbacks may be
161             registered; they are invoked in registration order with no arguments. Returns
162             the handle for chaining.
163              
164             =cut
165              
166             sub on_high_water {
167 9     9 1 197 my ($self, $cb) = @_;
168 9         13 push @{$self->{_high_water_callbacks}}, $cb;
  9         23  
169              
170 9 50       21 if ($self->{_above_high}) {
171             # Already in the high state: this late registrant fires now.
172 0         0 $self->_fire([$cb]);
173             }
174             else {
175             # May already be above the mark but not yet detected (no send since).
176 9         21 $self->_check_watermarks;
177             }
178              
179 9         26 return $self;
180             }
181              
182             =head2 on_drain
183              
184             $transport->on_drain(sub { $source->resume });
185              
186             Registers a callback invoked when the outbound buffer falls back below
187             L after having reached the high mark (backpressure released).
188             It is not invoked merely because the buffer is below the low mark when
189             registered -- only on an actual high-then-low transition. Multiple callbacks may
190             be registered; they are invoked in registration order with no arguments. Returns
191             the handle for chaining.
192              
193             =cut
194              
195             sub on_drain {
196 5     5 1 38 my ($self, $cb) = @_;
197 5         15 push @{$self->{_drain_callbacks}}, $cb;
  5         10  
198 5         17 return $self;
199             }
200              
201             =head2 _check_watermarks
202              
203             $transport->_check_watermarks;
204              
205             B - Called by the server after an application send. Detects a
206             high-water crossing and fires C, then arms drain detection (via
207             the source's C coderef) so C fires once the buffer falls
208             below the low mark. Edge-triggered and idempotent while above.
209              
210             =cut
211              
212             sub _check_watermarks {
213 305     305   2160 my ($self) = @_;
214              
215 305 100       858 return if $self->{_above_high}; # already armed; waiting for drain
216              
217 285         1052 my $high = $self->high_water_mark;
218 285 50       702 return unless defined $high;
219 285 100       985 return unless $self->buffered_amount >= $high;
220              
221 11         58 $self->{_above_high} = 1;
222 11         38 $self->_fire($self->{_high_water_callbacks});
223              
224             # Arm drain detection through the source: when the buffer falls below the
225             # low mark, fire on_drain and re-arm the cycle.
226 11 50       27 my $arm = $self->{_arm_drain} or return;
227 11         21 weaken(my $weak = $self);
228             $arm->(sub {
229 7 50   7   558 return unless $weak;
230 7         15 $weak->{_above_high} = 0;
231 7         23 $weak->_fire($weak->{_drain_callbacks});
232 11         59 });
233              
234 11         85 return;
235             }
236              
237             # Invoke a list of callbacks in order, isolating exceptions.
238             sub _fire {
239 18     18   29 my ($self, $cbs) = @_;
240 18         35 for my $cb (@$cbs) {
241 14 100       40 eval { $cb->(); 1 } or warn "transport callback error: $@";
  14         33  
  13         63  
242             }
243             }
244              
245             1;
246              
247             __END__