File Coverage

lib/Net/BitTorrent/DHT.pm
Criterion Covered Total %
statement 32 32 100.0
branch n/a
condition n/a
subroutine 11 11 100.0
pod n/a
total 43 43 100.0


line stmt bran cond sub pod time code
1 32     32   2207786 use v5.40;
  32         130  
2 32     32   199 use feature 'class';
  32         53  
  32         5410  
3 32     32   213 no warnings 'experimental::class';
  32         105  
  32         2061  
4 32     32   6145 use Net::BitTorrent::Emitter;
  32         71  
  32         13277  
5             #
6             class Net::BitTorrent::DHT::Peer v2.0.6 {
7             field $ip : param : reader;
8             field $port : param : reader;
9             field $family : param : reader;
10             method to_string () {"$ip:$port"}
11             };
12             #
13             class Net::BitTorrent::DHT v2.0.6 : isa(Net::BitTorrent::Emitter) {
14 32     32   16956 use Algorithm::Kademlia;
  32         306169  
  32         2495  
15 32     32   16541 use Net::BitTorrent::DHT::Security;
  32         126  
  32         1894  
16 32     32   7558 use Net::BitTorrent::Protocol::BEP03::Bencode qw[bencode bdecode];
  32         80  
  32         2693  
17 32     32   9610 use IO::Socket::IP;
  32         571260  
  32         540  
18             use Socket
19 32     32   22833 qw[sockaddr_family pack_sockaddr_in unpack_sockaddr_in inet_aton inet_ntoa AF_INET AF_INET6 pack_sockaddr_in6 unpack_sockaddr_in6 inet_pton inet_ntop getaddrinfo SOCK_DGRAM];
  32         68  
  32         5184  
20 32     32   8741 use IO::Select;
  32         25821  
  32         3859  
21 32     32   8402 use Digest::SHA qw[sha1];
  32         53282  
  32         418336  
22             #
23             field $node_id_bin : param : reader //= pack 'C*', map { int( rand(256) ) } 1 .. 20;
24             field $port : param : reader = 6881;
25             field $address : param //= undef;
26             field $want_v4 : param : reader //= 1;
27             field $want_v6 : param : reader //= 1;
28             field $bep32 : param : reader //= 1;
29             field $bep42 : param : reader //= 1;
30             field $bep33 : param : reader //= 1;
31             field $bep44 : param : reader //= 1;
32             field $bep51 : param : reader //= 1;
33             field $read_only : param = 0;
34             field $security : reader = Net::BitTorrent::DHT::Security->new();
35             field $routing_table_v4 : reader = Algorithm::Kademlia::RoutingTable->new( local_id_bin => $node_id_bin, k => 8 );
36             field $routing_table_v6 : reader = Algorithm::Kademlia::RoutingTable->new( local_id_bin => $node_id_bin, k => 8 );
37             field $peer_storage : reader = Algorithm::Kademlia::Storage->new( ttl => 7200 );
38             field $data_storage : reader = Algorithm::Kademlia::Storage->new( ttl => 7200 );
39             field $socket : param : reader //= IO::Socket::IP->new( LocalAddr => $address, LocalPort => $port, Proto => 'udp', Blocking => 0 );
40             field $select //= IO::Select->new($socket);
41             field $token_secret = pack( 'N', rand( 2**32 ) ) . pack( 'N', rand( 2**32 ) );
42             field $token_old_secret = $token_secret;
43             field $last_rotation = time;
44             field $node_id_rotation_interval : param = 7200; # 2 hours
45             field $last_node_id_rotation = time;
46             field $boot_nodes : param : reader : writer //= [ [ 'router.bittorrent.com', 6881 ], [ 'router.utorrent.com', 6881 ],
47             [ 'dht.transmissionbt.com', 6881 ] ];
48             field @_resolved_boot_nodes;
49             field $v : param : reader //= ();
50             field $debug : param = 0;
51             field $_ed25519_backend = ();
52             field $running = 0;
53             field %_blacklist;
54             field %ip_votes; # external_ip => count
55             field $external_ip : reader = undef;
56             field %_pending_queries;
57             field $_tid_counter = 0;
58              
59             method set_node_id ($new_id) {
60             $node_id_bin = $new_id;
61             $routing_table_v4->set_local_id_bin($new_id);
62             $routing_table_v6->set_local_id_bin($new_id);
63             }
64             ADJUST {
65             $socket // die "Could not create UDP socket: $!";
66              
67             # Pre-resolve bootstrap nodes
68             for my $r (@$boot_nodes) {
69             my ( $err, @res ) = getaddrinfo( $r->[0], $r->[1], { socktype => SOCK_DGRAM } );
70             if ($err) {
71             warn "[WARN] Could not resolve bootstrap node $r->[0]:$r->[1]: $err" if $debug;
72             next;
73             }
74             push @_resolved_boot_nodes, $res[0]{addr};
75             }
76             $self->on(
77             external_ip_detected => sub ( $emitter, $ip ) {
78             $external_ip = $ip;
79             return unless $bep42;
80             my $new_id = $security->generate_node_id($ip);
81             $self->set_node_id($new_id);
82             }
83             );
84             if ($bep44) {
85             try {
86             require Crypt::PK::Ed25519;
87             $_ed25519_backend = method( $sig, $msg, $key ) {
88             my $ed = Crypt::PK::Ed25519->new();
89             try { $ed->import_key_raw( $key, 'public' )->verify_message( $sig, $msg ); }
90             catch ($e) { return 0; }
91             }
92             }
93             catch ($e) {
94             try {
95             require Crypt::Perl::Ed25519::PublicKey;
96             $_ed25519_backend = method( $sig, $msg, $key ) {
97             try {
98             # Crypt::Perl might throw if key or sig length is invalid
99             return 0 unless length($key) == 32;
100             return 0 unless length($sig) == 64;
101             my $pk = Crypt::Perl::Ed25519::PublicKey->new($key);
102             return $pk->verify( $msg, $sig );
103             }
104             catch ($e2) {
105             return 0;
106             }
107             }
108             }
109             catch ($e2) { }
110             }
111             }
112             }
113             method routing_table () {$routing_table_v4} # Backward compatibility
114              
115             method routing_table_stats () {
116             my $stats = { v4 => [], v6 => [] };
117             my $idx = 0;
118             push $stats->{v4}->@*, { index => $idx++, count => scalar @$_ } for $routing_table_v4->buckets;
119             $idx = 0;
120             push $stats->{v6}->@*, { index => $idx++, count => scalar @$_ } for $routing_table_v6->buckets;
121             return $stats;
122             }
123              
124             method export_state () {
125             my @nodes_v4;
126             for my $bucket ( $routing_table_v4->buckets ) {
127             push @nodes_v4, map { { id => $_->{id}, ip => $_->{data}{ip}, port => $_->{data}{port} } } @$bucket;
128             }
129             my @nodes_v6;
130             for my $bucket ( $routing_table_v6->buckets ) {
131             push @nodes_v6, map { { id => $_->{id}, ip => $_->{data}{ip}, port => $_->{data}{port} } } @$bucket;
132             }
133             my %peers = $peer_storage->entries;
134             my %data = $data_storage->entries;
135             return { id => $node_id_bin, nodes => \@nodes_v4, nodes6 => \@nodes_v6, peers => \%peers, data => \%data, };
136             }
137              
138             method import_state ($state) {
139             $node_id_bin = $state->{id} if defined $state->{id};
140             if ( $state->{nodes} ) {
141             my @to_import = map { { id => $_->{id}, data => { ip => $_->{ip}, port => $_->{port} } } } $state->{nodes}->@*;
142             $routing_table_v4->import_peers( \@to_import );
143             }
144             if ( $state->{nodes6} ) {
145             my @to_import = map { { id => $_->{id}, data => { ip => $_->{ip}, port => $_->{port} } } } $state->{nodes6}->@*;
146             $routing_table_v6->import_peers( \@to_import );
147             }
148             if ( $state->{peers} ) {
149             for my $hash ( keys $state->{peers}->%* ) {
150             $peer_storage->put( $hash, $state->{peers}{$hash}{value} );
151             }
152             }
153             if ( $state->{data} ) {
154             for my $hash ( keys $state->{data}->%* ) {
155             $data_storage->put( $hash, $state->{data}{$hash} );
156             }
157             }
158             }
159              
160             method _rotate_tokens () {
161             if ( time - $last_rotation > 300 ) {
162             $token_old_secret = $token_secret;
163             $token_secret = pack( 'N', rand( 2**32 ) ) . pack( 'N', rand( 2**32 ) );
164             $last_rotation = time;
165             }
166             }
167              
168             method _rotate_node_id () {
169             if ( $external_ip && $bep42 ) {
170             my $new_id = $security->generate_node_id($external_ip);
171             if ( $new_id ne $node_id_bin ) {
172             warn " [DHT] Rotating Node ID for $external_ip\n" if $debug;
173             $self->set_node_id($new_id);
174             }
175             }
176             $last_node_id_rotation = time;
177             }
178              
179             method _generate_token ( $ip, $secret //= undef ) {
180             $secret //= $token_secret;
181             return sha1( $ip . $secret );
182             }
183              
184             method _verify_token ( $ip, $token ) {
185             return 1 if $token eq $self->_generate_token( $ip, $token_secret );
186             return 1 if $token eq $self->_generate_token( $ip, $token_old_secret );
187             return 0;
188             }
189              
190             method bootstrap () {
191             for my $addr (@$boot_nodes) {
192             $self->_send( { t => 'pn', y => 'q', q => 'ping', a => { id => $node_id_bin } }, $addr );
193             $self->_send( { t => 'fn', y => 'q', q => 'find_node', a => { id => $node_id_bin, target => $node_id_bin } }, $addr );
194             }
195             }
196              
197             method ping ( $addr, $port = undef ) {
198             my $tid = $self->_next_tid();
199             $_pending_queries{$tid} = { q => 'ping', time => time };
200             $self->_send( { t => $tid, y => 'q', q => 'ping', a => { id => $node_id_bin } }, $addr, $port );
201             }
202              
203             method _next_tid () {
204             $_tid_counter = ( $_tid_counter + 1 ) % 0xFFFF;
205             pack 'n', $_tid_counter;
206             }
207              
208             method find_node_remote ( $target_id, $addr, $port = undef ) {
209             my $tid = $self->_next_tid();
210             $_pending_queries{$tid} = { q => 'find_node', target => $target_id, time => time };
211             $self->_send( { t => $tid, y => 'q', q => 'find_node', a => { id => $node_id_bin, target => $target_id } }, $addr, $port );
212             }
213              
214             method get_peers ( $info_hash, $addr, $port = undef ) {
215             my $tid = $self->_next_tid();
216             $_pending_queries{$tid} = { q => 'get_peers', target => $info_hash, time => time };
217             $self->_send( { t => $tid, y => 'q', q => 'get_peers', a => { id => $node_id_bin, info_hash => $info_hash } }, $addr, $port );
218             }
219              
220             method get_remote ( $target, $addr, $port = undef ) {
221             return unless $bep44;
222             my $tid = $self->_next_tid();
223             $_pending_queries{$tid} = { q => 'get', target => $target, time => time };
224             $self->_send( { t => $tid, y => 'q', q => 'get', a => { id => $node_id_bin, target => $target } }, $addr, $port );
225             }
226              
227             method put_remote ( $args, $addr, $port = undef ) {
228             return unless $bep44;
229             my $tid = $self->_next_tid();
230             $_pending_queries{$tid} = { q => 'put', time => time };
231              
232             # $args should contain 'v' and optionally 'k', 'sig', 'seq', 'salt', 'cas'
233             $self->_send( { t => $tid, y => 'q', q => 'put', a => { id => $node_id_bin, %$args } }, $addr, $port );
234             }
235              
236             method announce_peer ( $info_hash, $token, $announce_port, $addr, $port = undef, $is_seed //= 0 ) {
237             my $tid = $self->_next_tid();
238             $_pending_queries{$tid} = { q => 'announce_peer', target => $info_hash, time => time };
239             my $msg = {
240             t => $tid,
241             y => 'q',
242             q => 'announce_peer',
243             a => { id => $node_id_bin, info_hash => $info_hash, port => $announce_port, token => $token, ( $bep33 && $is_seed ? ( seed => 1 ) : () ) }
244             };
245             $self->_send( $msg, $addr, $port );
246             }
247              
248             method announce_infohash ( $ih, $port ) {
249             my @learned; # High level: find closest nodes and announce
250             push @learned, $routing_table_v4->find_closest($ih) if $want_v4;
251             push @learned, $routing_table_v6->find_closest($ih) if $want_v6 && $bep32;
252              
253             # get_peers first to get tokens
254             $self->get_peers( $ih, $_->{data}{ip}, $_->{data}{port} ) for @learned;
255             }
256              
257             method scrape_peers_remote ( $info_hash, $addr, $port = undef ) {
258             return unless $bep33;
259             my $tid = $self->_next_tid();
260             $_pending_queries{$tid} = { q => 'scrape_peers', target => $info_hash, time => time };
261             $self->_send( { t => $tid, y => 'q', q => 'scrape_peers', a => { id => $node_id_bin, info_hash => $info_hash } }, $addr, $port );
262             }
263              
264             method find_peers ($info_hash) {
265             my @learned;
266             push @learned, $routing_table_v4->find_closest($info_hash) if $want_v4;
267             push @learned, $routing_table_v6->find_closest($info_hash) if $want_v6 && $bep32;
268             $self->get_peers( $info_hash, $_->{data}{ip}, $_->{data}{port} ) for @learned;
269             }
270              
271             method scrape ($info_hash) {
272             return unless $bep33;
273             my @learned;
274             push @learned, $routing_table_v4->find_closest($info_hash) if $want_v4;
275             push @learned, $routing_table_v6->find_closest($info_hash) if $want_v6 && $bep32;
276             $self->scrape_peers_remote( $info_hash, $_->{data}{ip}, $_->{data}{port} ) for @learned;
277             }
278              
279             method sample ($target) {
280             return unless $bep51;
281             my @learned;
282             push @learned, $routing_table_v4->find_closest($target) if $want_v4;
283             push @learned, $routing_table_v6->find_closest($target) if $want_v6 && $bep32;
284             $self->sample_infohashes_remote( $target, $_->{data}{ip}, $_->{data}{port} ) for @learned;
285             }
286              
287             method sample_infohashes_remote ( $target, $addr, $port = undef ) {
288             return unless $bep51;
289             my $tid = $self->_next_tid();
290             $_pending_queries{$tid} = { q => 'sample_infohashes', target => $target, time => time };
291             $self->_send( { t => $tid, y => 'q', q => 'sample_infohashes', a => { id => $node_id_bin, target => $target } }, $addr, $port );
292             }
293              
294             method tick ( $timeout //= 0 ) {
295             $self->_rotate_tokens();
296             $self->_rotate_node_id() if time - $last_node_id_rotation >= $node_id_rotation_interval;
297             return $self->handle_incoming() if $select->can_read($timeout);
298             return ( [], [], undef );
299             }
300              
301             method handle_incoming ( $data //= undef, $sender //= undef ) {
302             $sender = $socket->recv( $data, 4096 ) unless defined $data;
303             return ( [], [], undef ) unless defined $data && length $data;
304             my $msg;
305             try { $msg = bdecode($data) }
306             catch ($e) { return ( [], [], undef ) }
307             return ( [], [], undef ) if ref($msg) ne 'HASH';
308             my ( $port, $ip ) = $self->_unpack_address($sender);
309             return ( [], [], undef ) unless $ip;
310              
311             if ($debug) {
312             my $type = ( $msg->{y} // '' ) eq 'q' ? "QUERY ($msg->{q})" : "RESPONSE";
313             say "[DEBUG] RECV $type from $ip:$port";
314             }
315             if ( ( $msg->{y} // '' ) eq 'q' ) {
316             my $node = $self->_handle_query( $msg, $sender, $ip, $port );
317             return ( $node ? [$node] : [], [], undef ); # Return flat format
318             }
319             if ( ( $msg->{y} // '' ) eq 'e' ) {
320             if ($debug) {
321             my $code = $msg->{e}->[0] // 'unknown';
322             my $text = $msg->{e}->[1] // 'no message';
323             say "[DEBUG] RECV ERROR $code: $text from $ip:$port";
324             }
325             return ( [], [], undef );
326             }
327             return $self->_handle_response( $msg, $sender, $ip, $port ) if ( $msg->{y} // '' ) eq 'r';
328             return ( [], [], undef );
329             }
330              
331             method _unpack_address ($sockaddr) {
332             my $family;
333             try { $family = sockaddr_family($sockaddr) }
334             catch ($e) { return () }
335             if ( $family == AF_INET ) {
336             my ( $port, $ip_bin ) = unpack_sockaddr_in($sockaddr);
337             return ( $port, inet_ntoa($ip_bin) );
338             }
339             elsif ( $family == AF_INET6 ) {
340             my ( $port, $ip_bin, $scope, $flow ) = unpack_sockaddr_in6($sockaddr);
341             return ( $port, inet_ntop( AF_INET6, $ip_bin ) );
342             }
343             return ();
344             }
345              
346             method _handle_query ( $msg, $sender, $ip, $port ) {
347             return if $_blacklist{$ip};
348             my $q = $msg->{q} // return;
349             my $a = $msg->{a} // return;
350             my $id = $a->{id} // return;
351              
352             # BEP 42: Reject nodes with invalid IDs
353             return if $bep42 && !$security->validate_node_id( $id, $ip );
354             my $table = ( $ip =~ /:/ ) ? $routing_table_v6 : $routing_table_v4;
355             unless ( $a->{ro} ) {
356             my $stale = $table->add_peer( $id, { ip => $ip, port => $port } );
357             $self->ping( $stale->{data}{ip}, $stale->{data}{port} ) if $stale;
358             }
359             my $res = { t => $msg->{t}, y => 'r', r => { id => $node_id_bin } };
360             $res->{v} = $v if defined $v;
361             if ( my $ip_bin = ( $ip =~ /:/ ) ? inet_pton( AF_INET6, $ip ) : inet_aton($ip) ) {
362             $res->{ip} = $ip_bin;
363             }
364             my $w = $a->{want} // [];
365             $w = [$w] unless ref $w;
366             my %want = map { $_ => 1 } @$w;
367             if ( !@$w ) { # Default: same family as query
368             if ( $ip =~ /:/ ) { $want{n6} = 1 }
369             else { $want{n4} = 1 }
370             }
371             if ( $q eq 'ping' ) { }
372             elsif ( $q eq 'find_node' ) {
373             my @closest;
374             push @closest, $routing_table_v4->find_closest( $a->{target} ) if $want_v4 && $want{n4};
375             push @closest, $routing_table_v6->find_closest( $a->{target} ) if $want_v6 && $bep32 && $want{n6};
376             my ( $v4, $v6 ) = $self->_pack_nodes( \@closest );
377             $res->{r}{nodes} = $v4 if $v4 && $want_v4 && $want{n4};
378             $res->{r}{nodes6} = $v6 if $v6 && $want_v6 && $bep32 && $want{n6};
379             }
380             elsif ( $q eq 'get_peers' ) {
381             my $info_hash = $a->{info_hash};
382             $res->{r}{token} = $self->_generate_token($ip);
383             my $peers_obj = $peer_storage->get($info_hash);
384             if ( $peers_obj && @{ $peers_obj->value } ) {
385             my @filtered = grep { ( $_->{ip} =~ /:/ ) ? $want_v6 : $want_v4 } @{ $peers_obj->value };
386             $res->{r}{values} = $self->_pack_peers_raw( \@filtered );
387             }
388             else {
389             my @closest;
390             push @closest, $routing_table_v4->find_closest($info_hash) if $want_v4 && $want{n4};
391             push @closest, $routing_table_v6->find_closest($info_hash) if $want_v6 && $bep32 && $want{n6};
392             my ( $v4, $v6 ) = $self->_pack_nodes( \@closest );
393             $res->{r}{nodes} = $v4 if $v4 && $want_v4 && $want{n4};
394             $res->{r}{nodes6} = $v6 if $v6 && $want_v6 && $bep32 && $want{n6};
395             }
396             }
397             elsif ( $q eq 'announce_peer' ) {
398             my $info_hash = $a->{info_hash};
399             if ( $self->_verify_token( $ip, $a->{token} ) ) {
400             my $peers_obj = $peer_storage->get($info_hash);
401             my @peers = $peers_obj ? @{ $peers_obj->value } : ();
402             my $new_peer = {
403             ip => $ip,
404             port => ( $a->{implied_port} ? $port : $a->{port} ),
405             ( $bep33 && defined $a->{seed} ? ( seed => $a->{seed} ) : () )
406             };
407             @peers = grep { $_->{ip} ne $ip } @peers;
408             push @peers, $new_peer;
409             $peer_storage->put( $info_hash, \@peers );
410             }
411             }
412             elsif ( $q eq 'scrape_peers' ) {
413             if ($bep33) {
414             my $info_hash = $a->{info_hash};
415             my $peers_obj = $peer_storage->get($info_hash);
416             my $peers = $peers_obj ? $peers_obj->value : [];
417             my $seeders = grep { $_->{seed} } @$peers;
418             my $leechers = @$peers - $seeders;
419             $res->{r}{sn} = $seeders;
420             $res->{r}{ln} = $leechers;
421             }
422             else {
423             # If BEP 33 is disabled, we might want to return an error or just ignore.
424             # Standard is to just return 'id'.
425             }
426             }
427             elsif ( $q eq 'get' ) {
428             if ($bep44) {
429             my $target = $a->{target};
430             my $data_obj = $data_storage->get($target);
431             if ($data_obj) {
432             $res->{r} = { %{ $res->{r} }, %{ $data_obj->value } };
433             }
434             else {
435             my @closest;
436             push @closest, $routing_table_v4->find_closest($target) if $want_v4;
437             push @closest, $routing_table_v6->find_closest($target) if $want_v6 && $bep32;
438             my ( $v4, $v6 ) = $self->_pack_nodes( \@closest );
439             $res->{r}{nodes} = $v4 if $v4 && $want_v4;
440             $res->{r}{nodes6} = $v6 if $v6 && $want_v6 && $bep32;
441             }
442             $res->{r}{token} = $self->_generate_token($ip);
443             }
444             }
445             elsif ( $q eq 'put' ) {
446             if ( $bep44 && $self->_verify_token( $ip, $a->{token} ) ) {
447             my $v = $a->{v};
448             my $target = sha1($v);
449             my $is_mutable = defined $a->{k};
450             if ($is_mutable) {
451             $target = sha1( $a->{k} . ( $a->{salt} // '' ) );
452              
453             # Validate signature
454             my $to_sign = '';
455             $to_sign .= '3:cas' . bencode( $a->{cas} ) if defined $a->{cas};
456             $to_sign .= '4:salt' . bencode( $a->{salt} ) if defined $a->{salt} && length $a->{salt};
457             $to_sign .= '3:seq' . bencode( $a->{seq} );
458             $to_sign .= '1:v' . bencode($v);
459             if ( defined $_ed25519_backend && $_ed25519_backend->( $self, $a->{sig}, $to_sign, $a->{k} ) ) {
460             my $existing_obj = $data_storage->get($target);
461             if ( !defined $existing_obj || $a->{seq} > $existing_obj->value->{seq} ) {
462             if ( !defined $a->{cas} || ( $existing_obj && $existing_obj->value->{seq} == $a->{cas} ) ) {
463             $data_storage->put(
464             $target,
465             { v => $v,
466             k => $a->{k},
467             sig => $a->{sig},
468             seq => $a->{seq},
469             ( defined $a->{salt} ? ( salt => $a->{salt} ) : () )
470             }
471             );
472             }
473             }
474             }
475             else {
476             # BEP 44: "If the signature is invalid, the request MUST be rejected."
477             # Additionally, we blacklist the peer for attempting a malicious update.
478             $_blacklist{$ip} = time;
479             return;
480             }
481             }
482             else { # Immutable
483             $data_storage->put( $target, { v => $v } );
484             }
485             }
486             }
487             elsif ( $q eq 'sample_infohashes' ) {
488             if ($bep51) {
489             my $target = $a->{target};
490             my %entries = $peer_storage->entries;
491             my @all_keys = keys %entries;
492             my $num = scalar @all_keys;
493              
494             # BEP 51: return up to 20 samples closest to target
495             my @sorted = sort { ( $a^.$target ) cmp( $b^.$target ) } @all_keys;
496             my @samples = splice( @sorted, 0, 20 );
497             $res->{r}{samples} = join( '', @samples );
498             $res->{r}{num} = $num;
499             $res->{r}{interval} = 21600; # 6 hours default
500             my @closest;
501             push @closest, $routing_table_v4->find_closest($target) if $want_v4;
502             push @closest, $routing_table_v6->find_closest($target) if $want_v6 && $bep32;
503             my ( $v4, $v6 ) = $self->_pack_nodes( \@closest );
504             $res->{r}{nodes} = $v4 if $v4 && $want_v4;
505             $res->{r}{nodes6} = $v6 if $v6 && $want_v6 && $bep32;
506             }
507             }
508             $self->_check_external_ip( $msg->{ip} ) if exists $msg->{ip};
509             $self->_send_raw( bencode($res), $sender );
510             return { id => $id, ip => $ip, port => $port };
511             }
512              
513             method _handle_response ( $msg, $sender, $ip, $port ) {
514             $self->_check_external_ip( $msg->{ip} ) if exists $msg->{ip};
515             return ( [], [], undef ) if $_blacklist{$ip};
516             my $r = $msg->{r};
517             return ( [], [], undef ) unless $r && $r->{id};
518             my $tid = $msg->{t} // '';
519             my $pending = delete $_pending_queries{$tid};
520              
521             # Periodic cleanup of old pending queries (older than 30s)
522             if ( rand() < 0.01 ) {
523             my $now = time;
524             for my $k ( keys %_pending_queries ) {
525             delete $_pending_queries{$k} if $now - $_pending_queries{$k}{time} > 30;
526             }
527             }
528             if ( $bep42 && !$security->validate_node_id( $r->{id}, $ip ) ) {
529             return ( [], [], undef );
530             }
531             my $table = ( $ip =~ /:/ ) ? $routing_table_v6 : $routing_table_v4;
532             my $stale = $table->add_peer( $r->{id}, { ip => $ip, port => $port } );
533             $self->ping( $stale->{data}{ip}, $stale->{data}{port} ) if $stale;
534             my $peers = [];
535             $peers = $self->_unpack_peers( $r->{values} ) if $r->{values};
536             my @learned;
537             push @learned, $self->_unpack_nodes( $r->{nodes}, AF_INET )->@* if $r->{nodes};
538             push @learned, $self->_unpack_nodes( $r->{nodes6}, AF_INET6 )->@* if $r->{nodes6};
539              
540             for my $node (@learned) {
541             next if $bep42 && !$security->validate_node_id( $node->{id}, $node->{ip} );
542             my $ntable = ( $node->{ip} =~ /:/ ) ? $routing_table_v6 : $routing_table_v4;
543             $ntable->add_peer( $node->{id}, { ip => $node->{ip}, port => $node->{port} } );
544             }
545              
546             # Always include the responding node itself
547             push @learned, { id => $r->{id}, ip => $ip, port => $port };
548             my $scrape;
549             $scrape = { id => $r->{id}, ip => $ip, port => $port, sn => $r->{sn}, ln => $r->{ln} } if $pending && $pending->{q} eq 'scrape_peers';
550             my $data;
551             if ( defined $r->{v} ) {
552             $data = {
553             id => $r->{id},
554             ip => $ip,
555             port => $port,
556             v => $r->{v},
557             k => $r->{k},
558             sig => $r->{sig},
559             seq => $r->{seq},
560             salt => $r->{salt},
561             token => $r->{token}
562             };
563             }
564             my $sample;
565             if ( $pending && $pending->{q} eq 'sample_infohashes' && defined $r->{samples} ) {
566             my @samples;
567             my $blob = $r->{samples};
568             push @samples, substr( $blob, 0, 20, '' ) while length($blob) >= 20;
569             $sample = { id => $r->{id}, ip => $ip, port => $port, samples => \@samples, num => $r->{num}, interval => $r->{interval} };
570             }
571             my $token_only;
572             $token_only = { id => $r->{id}, ip => $ip, port => $port, token => $r->{token} } if defined $r->{token} && !$data;
573             my $result = $scrape // $data // $sample // $token_only;
574             $result->{queried_target} = $pending->{target} if $result && $pending && $pending->{target};
575             return ( \@learned, $peers, $result );
576             }
577              
578             method _send ( $msg, $addr, $port //= undef ) {
579             $msg->{v} = $v if defined $v;
580             $msg->{a}{ro} = 1 if $read_only && $msg->{y} eq 'q';
581             if ( !defined $port && !ref $addr && length($addr) >= 16 ) {
582             $self->_send_raw( bencode($msg), $addr );
583             return;
584             }
585             ( $addr, $port ) = @$addr if ref $addr eq 'ARRAY';
586             my ( $err, @res ) = getaddrinfo( $addr, $port, { socktype => SOCK_DGRAM } );
587             if ($err) {
588             warn "[WARN] getaddrinfo failed for $addr" . ( defined $port ? ":$port" : "" ) . ": $err" if $debug;
589             return;
590             }
591             for my $res (@res) {
592             my $family = sockaddr_family( $res->{addr} );
593             $self->_send_raw( bencode($msg), $res->{addr} ) if ( ( $family == AF_INET && $want_v4 ) || ( $family == AF_INET6 && $want_v6 ) );
594             }
595             }
596              
597             method _send_raw ( $data, $dest ) {
598             if ($debug) {
599             my ( $port, $ip ) = $self->_unpack_address($dest);
600             say "[DEBUG] SEND to $ip:$port";
601             }
602             $socket->send( $data, 0, $dest );
603             }
604              
605             method _pack_nodes ($peers) {
606             my $v4 = '';
607             my $v6 = '';
608             for my $p (@$peers) {
609             my $ip = $p->{data}{ip};
610             my $port = $p->{data}{port} // 0;
611             if ( $ip =~ /:/ ) {
612             next unless $want_v6;
613             my $ip_bin = inet_pton( AF_INET6, $ip );
614             $v6 .= $p->{id} . $ip_bin . pack( 'n', $port ) if $ip_bin;
615             }
616             else {
617             next unless $want_v4;
618             my $ip_bin = inet_aton($ip);
619             $v4 .= $p->{id} . $ip_bin . pack( 'n', $port ) if $ip_bin;
620             }
621             }
622             return ( $v4, $v6 );
623             }
624              
625             method _unpack_nodes ( $blob, $family //= AF_INET ) {
626             my @nodes;
627             my $stride = ( $family == AF_INET ) ? 26 : 38;
628             my $ip_len = ( $family == AF_INET ) ? 4 : 16;
629             while ( length($blob) >= $stride ) {
630             my $chunk = substr( $blob, 0, $stride, '' );
631             my $id = substr( $chunk, 0, 20 );
632             my $ip_bin = substr( $chunk, 20, $ip_len );
633             my $port = unpack( 'n', substr( $chunk, 20 + $ip_len, 2 ) );
634             my $ip = ( $family == AF_INET ) ? inet_ntoa($ip_bin) : inet_ntop( AF_INET6, $ip_bin );
635             push @nodes, { id => $id, ip => $ip, port => $port };
636             }
637             return \@nodes;
638             }
639              
640             method _unpack_peers ($list) {
641             my @peers;
642             my @blobs = ( ref($list) eq 'ARRAY' ) ? @$list : ($list);
643             for my $blob (@blobs) {
644             if ( length($blob) == 18 ) {
645             my ( $ip_bin, $port ) = unpack( 'a16 n', $blob );
646             push @peers, Net::BitTorrent::DHT::Peer->new( ip => inet_ntop( AF_INET6, $ip_bin ), port => $port, family => 6 ) if $want_v6;
647             }
648             elsif ( length($blob) == 6 ) {
649             my ( $ip_bin, $port ) = unpack( 'a4 n', $blob );
650             push @peers, Net::BitTorrent::DHT::Peer->new( ip => inet_ntoa($ip_bin), port => $port, family => 4 ) if $want_v4;
651             }
652             else { # Fallback for non-standard implementations that pack multiple peers into one string
653             while ( length($blob) >= 6 ) {
654             if ( length($blob) >= 18 && ( length($blob) % 18 == 0 ) ) {
655             my $chunk = substr( $blob, 0, 18, '' );
656             my ( $ip_bin, $port ) = unpack( 'a16 n', $chunk );
657             push @peers, Net::BitTorrent::DHT::Peer->new( ip => inet_ntop( AF_INET6, $ip_bin ), port => $port, family => 6 ) if $want_v6;
658             }
659             else {
660             my $chunk = substr( $blob, 0, 6, '' );
661             my ( $ip_bin, $port ) = unpack( 'a4 n', $chunk );
662             push @peers, Net::BitTorrent::DHT::Peer->new( ip => inet_ntoa($ip_bin), port => $port, family => 4 ) if $want_v4;
663             }
664             }
665             }
666             }
667             return \@peers;
668             }
669              
670             method _pack_peers_raw ($peers) {
671             return [
672             map {
673             ( $_->{ip} =~ /:/ ) ? ( inet_pton( AF_INET6, $_->{ip} ) . pack( 'n', $_->{port} ) ) :
674             ( inet_aton( $_->{ip} ) . pack( 'n', $_->{port} ) )
675             } @$peers
676             ];
677             }
678              
679             method _check_external_ip ( $ip_bin, $self_addr = undef ) {
680             if ( length($ip_bin) == 6 ) {
681             $ip_bin = substr( $ip_bin, 0, 4 );
682             }
683             elsif ( length($ip_bin) == 18 ) {
684             $ip_bin = substr( $ip_bin, 0, 16 );
685             }
686             my $ip = length($ip_bin) == 4 ? inet_ntoa($ip_bin) : length($ip_bin) == 16 ? inet_ntop( AF_INET6, $ip_bin ) : undef;
687             return unless $ip;
688             $ip_votes{$ip}++;
689             if ( $ip_votes{$ip} >= 5 ) { # Threshold for consensus
690             if ( !defined $external_ip || $external_ip ne $ip ) {
691             $external_ip = $ip;
692             $self->_emit( 'external_ip_detected', $ip );
693             }
694             %ip_votes = (); # Reset votes after consensus
695             }
696             }
697              
698             method run () {
699             $running = 1;
700             $self->bootstrap();
701             $self->tick(1) while $running;
702             }
703             };
704             #
705             1;