File Coverage

lib/Net/uTP.pm
Criterion Covered Total %
statement 23 23 100.0
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 31 31 100.0


line stmt bran cond sub pod time code
1 1     1   165524 use v5.42;
  1         3  
2 1     1   4 use feature 'class';
  1         2  
  1         138  
3 1     1   4 no warnings 'experimental::class';
  1         3  
  1         75  
4             #
5             class Net::uTP v1.0.1 {
6 1     1   4 use Carp qw[carp croak];
  1         1  
  1         65  
7 1     1   5 use Time::HiRes qw[gettimeofday time];
  1         1  
  1         7  
8              
9             # uTP Packet Types
10 1     1   91 use constant { ST_DATA => 0, ST_FIN => 1, ST_STATE => 2, ST_RESET => 3, ST_SYN => 4 };
  1         1  
  1         84  
11              
12             # Protocol version
13 1     1   3 use constant VERSION => 1;
  1         1  
  1         53  
14              
15             # LEDBAT Constants
16 1     1   5 use constant TARGET_DELAY => 100_000; # 100ms in microseconds
  1         1  
  1         2471  
17             field $conn_id_send : param;
18             field $conn_id_recv : param;
19             field $state : reader = 'CLOSED'; # CLOSED, SYN_SENT, SYN_RECV, CONNECTED, FIN_SENT, RESET
20             field $seq_nr;
21             field $ack_nr : reader : writer = 0;
22             field $window_size = 1500;
23             field $cur_window = 0;
24             field @base_delays;
25             field $last_delay = 0;
26             field %out_buffer : reader;
27             field %in_buffer : reader; # seq_nr => payload (for SACK/out-of-order)
28             field $rto = 1.0;
29             field $rtt = 0;
30             field $rtt_var = 0.8;
31             field %on;
32             method set_in_buffer_val ( $sn, $data ) { $in_buffer{$sn} = $data }
33             method set_out_buffer_val ( $sn, $data ) { $out_buffer{$sn} = $data }
34             ADJUST { $seq_nr = int( rand(65535) ) + 1 }
35             method on ( $event, $cb ) { push $on{$event}->@*, $cb }
36              
37             method _emit ( $event, @args ) {
38             for my $cb ( $on{$event}->@* ) {
39             try { $cb->(@args) } catch ($e) {
40             carp "uTP event $event failed: $e"
41             }
42             }
43             }
44              
45             method connect () {
46             $state = 'SYN_SENT';
47             my $pkt = $self->pack_header( ST_SYN, 0, $conn_id_recv );
48             $out_buffer{$seq_nr} = { data => $pkt, ts => time, retries => 0 };
49             $seq_nr = ( $seq_nr + 1 ) & 0xFFFF;
50             $pkt;
51             }
52              
53             method send_data ($data) {
54             return undef unless $state eq 'CONNECTED' && defined $data && length($data) > 0;
55             my $out = '';
56             my $now = time;
57             while ( length($data) > 0 ) {
58             my $chunk = substr( $data, 0, 1400, '' );
59             my $pkt = $self->pack_header(ST_DATA) . $chunk;
60             $out_buffer{$seq_nr} = { data => $pkt, ts => $now, retries => 0 };
61             $out .= $pkt;
62             $cur_window += length($pkt);
63             $seq_nr = ( $seq_nr + 1 ) & 0xFFFF;
64             }
65             $out;
66             }
67              
68             method tick ($delta) {
69             my $now = time;
70             my @to_resend;
71             for my $sn ( sort { $a <=> $b } keys %out_buffer ) {
72             my $entry = $out_buffer{$sn};
73             if ( $now - $entry->{ts} > $rto ) {
74             $entry->{retries}++;
75             if ( $entry->{retries} > 4 ) {
76             $state = 'CLOSED';
77             $self->_emit('closed');
78             return $self->pack_header(ST_RESET);
79             }
80             $rto *= 2;
81             $rto = 30 if $rto > 30;
82             $entry->{ts} = $now;
83             push @to_resend, $entry->{data};
84             last # Fast Retransmit: only resend the first timed out packet
85             }
86             }
87             join '', @to_resend;
88             }
89              
90             method _build_sack_extension () {
91             return '' unless keys %in_buffer;
92              
93             # SACK bitmask starts from ack_nr + 2
94             my $base = ( $ack_nr + 2 ) & 0xFFFF;
95             my $mask = 0;
96             my $max_bit = 0;
97             for my $sn ( keys %in_buffer ) {
98             my $diff = ( $sn - $base ) & 0xFFFF;
99             if ( $diff >= 0 && $diff < 32 ) {
100             $mask |= ( 1 << $diff );
101             $max_bit = $diff if $diff > $max_bit;
102             }
103             }
104              
105             # Round up to nearest byte, min 4 bytes per spec recommendation
106             pack 'C C V', 0, 4, $mask; # next_ext=0, len=4, mask=32bits
107             }
108              
109             method pack_header ( $type, $extension //= 0, $conn_id //= undef ) {
110             my ( $s, $us ) = gettimeofday();
111             my $timestamp = ( $s * 1_000_000 + $us ) & 0xFFFFFFFF;
112             $conn_id //= $conn_id_send;
113             my $vt = ( VERSION << 4 ) | $type;
114             my $sack = $self->_build_sack_extension();
115             $extension = 1 if $sack; # 1 = SACK
116             pack( 'C C n N N N n n', $vt, $extension, $conn_id, $timestamp, $last_delay, $window_size, $seq_nr, $ack_nr ) . $sack;
117             }
118              
119             method unpack_header ($data) {
120             return undef if length($data) < 20;
121             my %h;
122             ( $h{vt}, $h{ext}, $h{conn_id}, $h{ts}, $h{t_diff}, $h{wnd}, $h{seq}, $h{ack} ) = unpack 'C C n N N N n n', $data;
123             $h{type} = $h{vt} & 0x0F;
124             $h{version} = $h{vt} >> 4;
125             \%h;
126             }
127              
128             method _handle_sack ( $bitmask, $ack_nr ) {
129             my $base = ( $ack_nr + 2 ) & 0xFFFF;
130             my @bytes = unpack 'C*', $bitmask;
131             for my $i ( 0 .. $#bytes ) {
132             my $byte = $bytes[$i];
133             for ( my $bit = 0; $bit < 8; $bit++ ) {
134             $self->_ack_packet( ( $base + ( $i * 8 ) + $bit ) & 0xFFFF ) if $byte & ( 1 << $bit );
135             }
136             }
137             }
138              
139             method _ack_packet ($sn) {
140             if ( exists $out_buffer{$sn} ) {
141             my $entry = $out_buffer{$sn};
142             my $measured_rtt = time - $entry->{ts};
143             if ( $rtt == 0 ) {
144             $rtt = $measured_rtt;
145             $rtt_var = $measured_rtt / 2;
146             }
147             else {
148             my $alpha = 0.125;
149             my $beta = 0.25;
150             $rtt_var = ( 1 - $beta ) * $rtt_var + $beta * abs( $rtt - $measured_rtt );
151             $rtt = ( 1 - $alpha ) * $rtt + $alpha * $measured_rtt;
152             }
153             $rto = $rtt + $rtt_var * 4;
154             $rto = 0.5 if $rto < 0.5;
155             $cur_window -= length $entry->{data};
156             delete $out_buffer{$sn};
157             }
158             }
159              
160             method receive_packet ($data) {
161             my $header = $self->unpack_header($data) or return undef;
162             my $payload = substr( $data, 20 );
163              
164             # Handle Extensions
165             my $ext_type = $header->{ext};
166             while ( $ext_type != 0 && length($payload) >= 2 ) {
167             my ( $next_ext, $ext_len ) = unpack 'C C', $payload;
168             if ( length($payload) < 2 + $ext_len ) {
169             carp 'Malformed uTP extension';
170             last;
171             }
172             my $ext_data = substr( $payload, 2, $ext_len );
173             substr( $payload, 0, 2 + $ext_len, '' );
174             $self->_handle_sack( $ext_data, $header->{ack} ) if $ext_type == 1; # SACK
175             $ext_type = $next_ext;
176             }
177              
178             # Update metrics
179             my ( $s, $us ) = gettimeofday();
180             my $now = ( $s * 1_000_000 + $us ) & 0xFFFFFFFF;
181             my $delay = ( $now - $header->{ts} ) & 0xFFFFFFFF;
182             $self->_update_base_delay($delay);
183             my $min_delay = $self->_min_base_delay;
184             if ($min_delay) {
185             my $our_delay = ( $delay - $min_delay ) & 0xFFFFFFFF;
186             my $off_target = TARGET_DELAY - $our_delay;
187             my $adj = ( $off_target / TARGET_DELAY ) * 100;
188             $window_size += $adj;
189             $window_size = 1500 if $window_size < 1500;
190             }
191             $last_delay = $delay;
192              
193             # Handle ACKs (Cumulative)
194             for my $sn ( sort { $a <=> $b } keys %out_buffer ) {
195             $self->_ack_packet($sn) if ( ( ( $header->{ack} - $sn ) & 0xFFFF ) < 0x8000 );
196             }
197             if ( $header->{type} == ST_SYN ) {
198             if ( $state eq 'CLOSED' ) {
199             $state = 'CONNECTED';
200             $ack_nr = $header->{seq};
201             $self->_emit('connected');
202             return $self->pack_header(ST_STATE);
203             }
204             }
205             elsif ( $header->{type} == ST_STATE ) {
206             if ( $state eq 'SYN_SENT' ) {
207             $state = 'CONNECTED';
208             $self->_emit('connected');
209             }
210             $ack_nr = $header->{seq};
211             }
212             elsif ( $header->{type} == ST_DATA ) {
213             my $sn = $header->{seq};
214             if ( $sn == ( ( $ack_nr + 1 ) & 0xFFFF ) || $ack_nr == 0 ) {
215             $ack_nr = $sn;
216             $self->_emit( 'data', $payload );
217             while ( exists $in_buffer{ ( $ack_nr + 1 ) & 0xFFFF } ) {
218             $ack_nr = ( $ack_nr + 1 ) & 0xFFFF;
219             my $buffered_payload = delete $in_buffer{$ack_nr};
220             $self->_emit( 'data', $buffered_payload );
221             }
222             }
223             elsif ( ( ( $sn - $ack_nr ) & 0xFFFF ) < 0x8000 && $sn != $ack_nr ) {
224             $in_buffer{$sn} = $payload;
225             }
226             return $self->pack_header(ST_STATE);
227             }
228             elsif ( $header->{type} == ST_RESET ) {
229             $state = 'CLOSED';
230             $self->_emit('closed');
231             }
232             elsif ( $header->{type} == ST_FIN ) {
233             $state = 'CLOSED';
234             $self->_emit('closed');
235             return $self->pack_header(ST_STATE);
236             }
237             undef;
238             }
239              
240             method _update_base_delay ($delay) {
241             push @base_delays, $delay;
242             shift @base_delays if @base_delays > 60;
243             }
244              
245             method _min_base_delay () {
246             return undef unless @base_delays;
247             my $min = $base_delays[0];
248             for (@base_delays) { $min = $_ if $_ < $min }
249             $min;
250             }
251             };
252             1;