| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package PAGI::Server::TransportState; |
|
2
|
|
|
|
|
|
|
|
|
3
|
110
|
|
|
110
|
|
333038
|
use strict; |
|
|
110
|
|
|
|
|
262
|
|
|
|
110
|
|
|
|
|
3756
|
|
|
4
|
110
|
|
|
110
|
|
419
|
use warnings; |
|
|
110
|
|
|
|
|
181
|
|
|
|
110
|
|
|
|
|
6514
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.002001'; |
|
7
|
|
|
|
|
|
|
|
|
8
|
110
|
|
|
110
|
|
458
|
use Scalar::Util qw(weaken); |
|
|
110
|
|
|
|
|
155
|
|
|
|
110
|
|
|
|
|
80395
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
=head1 NAME |
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
PAGI::Server::TransportState - Outbound flow-control introspection for a connection |
|
13
|
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
# Built by the server from an outbound-buffer source (not by the app): |
|
17
|
|
|
|
|
|
|
my $transport = PAGI::Server::TransportState->new( |
|
18
|
|
|
|
|
|
|
measure => sub { $conn->_get_write_buffer_size }, |
|
19
|
|
|
|
|
|
|
high => sub { $conn->{write_high_watermark} }, |
|
20
|
|
|
|
|
|
|
low => sub { $conn->{write_low_watermark} }, |
|
21
|
|
|
|
|
|
|
arm_drain => sub { my $fire = shift; $conn->_wait_for_drain->on_ready($fire) }, |
|
22
|
|
|
|
|
|
|
); |
|
23
|
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
# Read by the application via the scope: |
|
25
|
|
|
|
|
|
|
my $transport = $scope->{'pagi.transport'}; |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
# Bytes queued for the client but not yet written to the network |
|
28
|
|
|
|
|
|
|
my $pending = $transport->buffered_amount; |
|
29
|
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
# The backpressure band (sends block at high, resume at low) |
|
31
|
|
|
|
|
|
|
my $ceiling = $transport->high_water_mark; |
|
32
|
|
|
|
|
|
|
my $floor = $transport->low_water_mark; |
|
33
|
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
35
|
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
PAGI::Server::TransportState is the object placed in the C scope |
|
37
|
|
|
|
|
|
|
key. It gives an application a synchronous, read-only view of B
|
|
38
|
|
|
|
|
|
|
control> -- how much data the server has queued for the client but not yet |
|
39
|
|
|
|
|
|
|
written to the network -- so it can conflate, coalesce, shed load, or disconnect |
|
40
|
|
|
|
|
|
|
a slow client instead of only blocking until the buffer drains. It is the |
|
41
|
|
|
|
|
|
|
server-side analogue of the browser WebSocket API's C. |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
The handle is source-agnostic: it measures the outbound buffer through coderefs |
|
44
|
|
|
|
|
|
|
supplied by the server, never by reaching into a connection itself. That lets |
|
45
|
|
|
|
|
|
|
the same hysteresis logic serve different transports -- under HTTP/1.1 the |
|
46
|
|
|
|
|
|
|
source reads the shared TCP write buffer, while under HTTP/2 it reads a |
|
47
|
|
|
|
|
|
|
per-stream send queue. All reads are live: each call invokes the source and |
|
48
|
|
|
|
|
|
|
reports its current state. See the "Transport Flow Control" section in |
|
49
|
|
|
|
|
|
|
L for the full specification. |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
=head1 METHODS |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=head2 new |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
my $transport = PAGI::Server::TransportState->new( |
|
56
|
|
|
|
|
|
|
measure => sub { ... }, # current buffered bytes |
|
57
|
|
|
|
|
|
|
high => $bytes, # high-water mark (value or coderef) |
|
58
|
|
|
|
|
|
|
low => $bytes, # low-water mark (value or coderef) |
|
59
|
|
|
|
|
|
|
arm_drain => sub { my $fire = shift; ... }, |
|
60
|
|
|
|
|
|
|
); |
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
Creates a transport-state handle. B
|
|
63
|
|
|
|
|
|
|
application> -- apps receive the finished handle via the C scope |
|
64
|
|
|
|
|
|
|
key. The arguments describe the outbound buffer source: |
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
=over 4 |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
=item * C -- coderef returning the current buffered byte count. |
|
69
|
|
|
|
|
|
|
C/missing is treated as C<0>. |
|
70
|
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
=item * C / C -- the backpressure band. Each may be a plain value or |
|
72
|
|
|
|
|
|
|
a coderef returning the current mark; C means unavailable. |
|
73
|
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
=item * C -- coderef invoked when the buffer crosses the high mark. It |
|
75
|
|
|
|
|
|
|
receives a single C<$fire> callback and must invoke it exactly once when the |
|
76
|
|
|
|
|
|
|
buffer next falls below the low mark, so C fires and the cycle re-arms. |
|
77
|
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
=back |
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=cut |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
sub new { |
|
83
|
268
|
|
|
268
|
1
|
18420
|
my ($class, %args) = @_; |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
my $self = bless { |
|
86
|
|
|
|
|
|
|
_measure => $args{measure}, # coderef -> current buffered bytes |
|
87
|
|
|
|
|
|
|
_high => $args{high}, # value or coderef -> high mark (undef ok) |
|
88
|
|
|
|
|
|
|
_low => $args{low}, # value or coderef -> low mark (undef ok) |
|
89
|
|
|
|
|
|
|
_arm_drain => $args{arm_drain}, # coderef: (fire) -> call fire once when below low |
|
90
|
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
# Backpressure callbacks + hysteresis state. _above_high is true once |
|
92
|
|
|
|
|
|
|
# the buffer has crossed the high mark and not yet drained below the low |
|
93
|
|
|
|
|
|
|
# mark, so on_high_water is edge-triggered (fires once per cycle). |
|
94
|
268
|
|
|
|
|
3807
|
_high_water_callbacks => [], |
|
95
|
|
|
|
|
|
|
_drain_callbacks => [], |
|
96
|
|
|
|
|
|
|
_above_high => 0, |
|
97
|
|
|
|
|
|
|
}, $class; |
|
98
|
|
|
|
|
|
|
|
|
99
|
268
|
|
|
|
|
4725
|
return $self; |
|
100
|
|
|
|
|
|
|
} |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
=head2 buffered_amount |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
my $pending = $transport->buffered_amount; |
|
105
|
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
Returns the number of bytes queued for the client but not yet written to the |
|
107
|
|
|
|
|
|
|
network, as an integer; C<0> when the send buffer is fully drained (or once the |
|
108
|
|
|
|
|
|
|
underlying connection has gone away). A synchronous, non-blocking, |
|
109
|
|
|
|
|
|
|
non-destructive read. |
|
110
|
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
=cut |
|
112
|
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
sub buffered_amount { |
|
114
|
292
|
|
|
292
|
1
|
460
|
my $self = shift; |
|
115
|
292
|
|
|
|
|
489
|
my $measure = $self->{_measure}; |
|
116
|
292
|
100
|
|
|
|
640
|
return 0 unless $measure; |
|
117
|
291
|
|
100
|
|
|
601
|
return $measure->() // 0; |
|
118
|
|
|
|
|
|
|
} |
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
=head2 high_water_mark |
|
121
|
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
my $ceiling = $transport->high_water_mark; |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
Returns the buffered-byte threshold at or above which the server applies |
|
125
|
|
|
|
|
|
|
backpressure (a C<$send> that would exceed it blocks until the buffer drains), |
|
126
|
|
|
|
|
|
|
or C if unavailable. Applications use it to threshold relative to the |
|
127
|
|
|
|
|
|
|
ceiling rather than hard-coding a byte count. |
|
128
|
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
=cut |
|
130
|
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
sub high_water_mark { |
|
132
|
291
|
|
|
291
|
1
|
653
|
my $self = shift; |
|
133
|
291
|
|
|
|
|
601
|
my $high = $self->{_high}; |
|
134
|
291
|
100
|
|
|
|
1114
|
return ref $high eq 'CODE' ? $high->() : $high; |
|
135
|
|
|
|
|
|
|
} |
|
136
|
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
=head2 low_water_mark |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
my $floor = $transport->low_water_mark; |
|
140
|
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
Returns the buffered-byte threshold the buffer must fall back to before the |
|
142
|
|
|
|
|
|
|
server releases backpressure (the drain point), or C if unavailable. |
|
143
|
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=cut |
|
145
|
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
sub low_water_mark { |
|
147
|
6
|
|
|
6
|
1
|
18
|
my $self = shift; |
|
148
|
6
|
|
|
|
|
9
|
my $low = $self->{_low}; |
|
149
|
6
|
100
|
|
|
|
21
|
return ref $low eq 'CODE' ? $low->() : $low; |
|
150
|
|
|
|
|
|
|
} |
|
151
|
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
=head2 on_high_water |
|
153
|
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
$transport->on_high_water(sub { $source->pause }); |
|
155
|
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
Registers a callback invoked when the outbound buffer reaches or exceeds |
|
157
|
|
|
|
|
|
|
L (backpressure engaged). Edge-triggered: it fires once when |
|
158
|
|
|
|
|
|
|
the buffer crosses up, and not again until the buffer has drained below the low |
|
159
|
|
|
|
|
|
|
mark and crossed up again. If the buffer is already at or above the mark when |
|
160
|
|
|
|
|
|
|
the callback is registered, it is invoked immediately. Multiple callbacks may be |
|
161
|
|
|
|
|
|
|
registered; they are invoked in registration order with no arguments. Returns |
|
162
|
|
|
|
|
|
|
the handle for chaining. |
|
163
|
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
=cut |
|
165
|
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
sub on_high_water { |
|
167
|
9
|
|
|
9
|
1
|
197
|
my ($self, $cb) = @_; |
|
168
|
9
|
|
|
|
|
13
|
push @{$self->{_high_water_callbacks}}, $cb; |
|
|
9
|
|
|
|
|
23
|
|
|
169
|
|
|
|
|
|
|
|
|
170
|
9
|
50
|
|
|
|
21
|
if ($self->{_above_high}) { |
|
171
|
|
|
|
|
|
|
# Already in the high state: this late registrant fires now. |
|
172
|
0
|
|
|
|
|
0
|
$self->_fire([$cb]); |
|
173
|
|
|
|
|
|
|
} |
|
174
|
|
|
|
|
|
|
else { |
|
175
|
|
|
|
|
|
|
# May already be above the mark but not yet detected (no send since). |
|
176
|
9
|
|
|
|
|
21
|
$self->_check_watermarks; |
|
177
|
|
|
|
|
|
|
} |
|
178
|
|
|
|
|
|
|
|
|
179
|
9
|
|
|
|
|
26
|
return $self; |
|
180
|
|
|
|
|
|
|
} |
|
181
|
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
=head2 on_drain |
|
183
|
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
$transport->on_drain(sub { $source->resume }); |
|
185
|
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
Registers a callback invoked when the outbound buffer falls back below |
|
187
|
|
|
|
|
|
|
L after having reached the high mark (backpressure released). |
|
188
|
|
|
|
|
|
|
It is not invoked merely because the buffer is below the low mark when |
|
189
|
|
|
|
|
|
|
registered -- only on an actual high-then-low transition. Multiple callbacks may |
|
190
|
|
|
|
|
|
|
be registered; they are invoked in registration order with no arguments. Returns |
|
191
|
|
|
|
|
|
|
the handle for chaining. |
|
192
|
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
=cut |
|
194
|
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
sub on_drain { |
|
196
|
5
|
|
|
5
|
1
|
38
|
my ($self, $cb) = @_; |
|
197
|
5
|
|
|
|
|
15
|
push @{$self->{_drain_callbacks}}, $cb; |
|
|
5
|
|
|
|
|
10
|
|
|
198
|
5
|
|
|
|
|
17
|
return $self; |
|
199
|
|
|
|
|
|
|
} |
|
200
|
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
=head2 _check_watermarks |
|
202
|
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
$transport->_check_watermarks; |
|
204
|
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
B - Called by the server after an application send. Detects a |
|
206
|
|
|
|
|
|
|
high-water crossing and fires C, then arms drain detection (via |
|
207
|
|
|
|
|
|
|
the source's C coderef) so C fires once the buffer falls |
|
208
|
|
|
|
|
|
|
below the low mark. Edge-triggered and idempotent while above. |
|
209
|
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
=cut |
|
211
|
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
sub _check_watermarks { |
|
213
|
305
|
|
|
305
|
|
2160
|
my ($self) = @_; |
|
214
|
|
|
|
|
|
|
|
|
215
|
305
|
100
|
|
|
|
858
|
return if $self->{_above_high}; # already armed; waiting for drain |
|
216
|
|
|
|
|
|
|
|
|
217
|
285
|
|
|
|
|
1052
|
my $high = $self->high_water_mark; |
|
218
|
285
|
50
|
|
|
|
702
|
return unless defined $high; |
|
219
|
285
|
100
|
|
|
|
985
|
return unless $self->buffered_amount >= $high; |
|
220
|
|
|
|
|
|
|
|
|
221
|
11
|
|
|
|
|
58
|
$self->{_above_high} = 1; |
|
222
|
11
|
|
|
|
|
38
|
$self->_fire($self->{_high_water_callbacks}); |
|
223
|
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
# Arm drain detection through the source: when the buffer falls below the |
|
225
|
|
|
|
|
|
|
# low mark, fire on_drain and re-arm the cycle. |
|
226
|
11
|
50
|
|
|
|
27
|
my $arm = $self->{_arm_drain} or return; |
|
227
|
11
|
|
|
|
|
21
|
weaken(my $weak = $self); |
|
228
|
|
|
|
|
|
|
$arm->(sub { |
|
229
|
7
|
50
|
|
7
|
|
558
|
return unless $weak; |
|
230
|
7
|
|
|
|
|
15
|
$weak->{_above_high} = 0; |
|
231
|
7
|
|
|
|
|
23
|
$weak->_fire($weak->{_drain_callbacks}); |
|
232
|
11
|
|
|
|
|
59
|
}); |
|
233
|
|
|
|
|
|
|
|
|
234
|
11
|
|
|
|
|
85
|
return; |
|
235
|
|
|
|
|
|
|
} |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
# Invoke a list of callbacks in order, isolating exceptions. |
|
238
|
|
|
|
|
|
|
sub _fire { |
|
239
|
18
|
|
|
18
|
|
29
|
my ($self, $cbs) = @_; |
|
240
|
18
|
|
|
|
|
35
|
for my $cb (@$cbs) { |
|
241
|
14
|
100
|
|
|
|
40
|
eval { $cb->(); 1 } or warn "transport callback error: $@"; |
|
|
14
|
|
|
|
|
33
|
|
|
|
13
|
|
|
|
|
63
|
|
|
242
|
|
|
|
|
|
|
} |
|
243
|
|
|
|
|
|
|
} |
|
244
|
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
1; |
|
246
|
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
__END__ |