File Coverage

lib/Net/BitTorrent.pm
Criterion Covered Total %
statement 103 168 61.3
branch 1 30 3.3
condition 0 15 0.0
subroutine 33 41 80.4
pod n/a
total 137 254 53.9


line stmt bran cond sub pod time code
1 19     19   3443955 use v5.40;
  19         75  
2 19     19   163 use feature 'class', 'try';
  19         35  
  19         3553  
3 19     19   139 no warnings 'experimental::class', 'experimental::builtin', 'experimental::try';
  19         44  
  19         1035  
4 19     19   9014 use Net::BitTorrent::Emitter;
  19         56  
  19         1729  
5             #
6             class Net::BitTorrent v2.0.1 : isa(Net::BitTorrent::Emitter) {
7 19     19   11885 use Net::BitTorrent::Torrent;
  19         70  
  19         1004  
8 19     19   13425 use Net::BitTorrent::DHT;
  19         84  
  19         1338  
9 19     19   11517 use Net::uTP::Manager; # Standalone spin-off
  19         137747  
  19         1155  
10 19     19   178 use Digest::SHA qw[sha1];
  19         38  
  19         1227  
11 19     19   114 use version;
  19         44  
  19         146  
12 19     19   1738 use Time::HiRes qw[time];
  19         63  
  19         127  
13 19     19   1363 use Net::BitTorrent::Types qw[:encryption];
  19         116  
  19         20724  
14             #
15             field %torrents; # infohash => Torrent object
16             field %pending_peers; # transport => Peer object
17             field $dht;
18             field $tcp_listener;
19             field $tick_debt = 0;
20             field $utp : reader = Net::uTP::Manager->new();
21             field $lpd;
22             field $node_id : reader : writer;
23             field %dht_queries; # tid => { cb => sub, type => ... }
24             field %dht_index; # infohash_hex => timestamp (BEP 51 crawling)
25             field $port_mapper : reader;
26             field $port : param : reader = 49152 + int( rand(10000) );
27             field $user_agent : param : reader //= join '/', __CLASS__, our $VERSION;
28             field $debug : param : reader //= 0;
29             field $encryption : param : reader = ENCRYPTION_REQUIRED;
30              
31             # Feature Toggles (Default to enabled)
32             field $bep05 : param = 1; # DHT
33             field $bep06 : param = 1; # Fast Extension
34             field $bep09 : param = 1; # Metadata Exchange
35             field $bep10 : param = 1; # Extension Protocol
36             field $bep11 : param = 1; # PEX
37             field $bep52 : param = 1; # v2
38             field $bep55 : param = 1; # Holepunching
39             field $limit_up : reader;
40             field $limit_down : reader;
41             field $upnp_enabled : param = 0;
42              
43             # Verification Throttling
44             field @hashing_queue; # Array of { torrent => $t, index => $i, data => $d }
45             field $hashing_rate_limit : writer = 1024 * 1024 * 500; # 500MB/s limit for hashing
46             field $hashing_allowance = 0;
47              
48             method features () {
49             { bep05 => $bep05, bep06 => $bep06, bep09 => $bep09, bep10 => $bep10, bep11 => $bep11, bep52 => $bep52, bep55 => $bep55, };
50             }
51             ADJUST {
52             $node_id //= _generate_peer_id();
53              
54             # Normalize encryption param
55             if ( defined $encryption && $encryption !~ /^\d+$/ ) {
56             if ( $encryption eq 'none' ) { $encryption = ENCRYPTION_NONE }
57             elsif ( $encryption eq 'preferred' ) { $encryption = ENCRYPTION_PREFERRED }
58             elsif ( $encryption eq 'required' ) { $encryption = ENCRYPTION_REQUIRED }
59             }
60             my $weak_self = $self;
61             builtin::weaken($weak_self);
62              
63             # TCP Listener
64 19     19   170 use IO::Socket::IP;
  19         76  
  19         292  
65             $tcp_listener = IO::Socket::IP->new( LocalPort => $port, Listen => 5, ReuseAddr => 1, Blocking => 0, );
66             if ($tcp_listener) {
67             $self->_emit( log => " [DEBUG] TCP listener started on port $port\n", level => 'debug' ) if $debug;
68             }
69             else {
70             $self->_emit( log => " [ERROR] Could not start TCP listener on port $port: $!\n", level => 'error' );
71             }
72             $utp->on(
73             'new_connection',
74             sub ( $utp_conn, $ip, $port ) {
75             return unless $weak_self;
76              
77             #~ warn " [uTP] Incoming connection from $ip:$port\n";
78 19     19   29728 use Net::BitTorrent::Protocol::HandshakeOnly;
  19         60  
  19         1036  
79 19     19   10375 use Net::BitTorrent::Peer;
  19         80  
  19         4254  
80             my $proto = Net::BitTorrent::Protocol::HandshakeOnly->new(
81             infohash => undef, # Dummy, will be overwritten by any incoming
82             peer_id => $weak_self->node_id, # Dummy
83             on_handshake_cb => sub ( $ih, $id ) {
84             $weak_self->_upgrade_pending_peer( $utp_conn, $ih, $id, $ip, $port );
85             }
86             );
87             my $peer = Net::BitTorrent::Peer->new(
88             protocol => $proto,
89             torrent => undef, # Not known yet
90             transport => $utp_conn,
91             ip => $ip,
92             port => $port,
93             debug => $debug
94             );
95             $pending_peers{$utp_conn} = $peer;
96             }
97             );
98 19     19   233 use Algorithm::RateLimiter::TokenBucket;
  19         38  
  19         1050  
99             $limit_up = Algorithm::RateLimiter::TokenBucket->new( limit => 0 );
100             $limit_down = Algorithm::RateLimiter::TokenBucket->new( limit => 0 );
101              
102             # Initialize LPD (BEP 14)
103 19     19   11958 use Net::Multicast::PeerDiscovery;
  19         51797  
  19         33055  
104             $lpd = Net::Multicast::PeerDiscovery->new();
105             $lpd->on(
106             peer_found => sub ($p_info) {
107             return unless $weak_self;
108             if ( my $t = $torrents{ $p_info->{info_hash} } ) {
109             $t->add_peer( { ip => $p_info->{ip}, port => $p_info->{port} } );
110             }
111             }
112             );
113              
114             # Initialize PortMapper if enabled and available
115             if ($upnp_enabled) {
116 1     1   7 builtin::load_module 'Acme::UPnP';
  1     1   2  
  1         3  
  1         7  
  1         2  
  1         3  
117             my $mapper = Acme::UPnP->new();
118             if ( $mapper->is_available() ) {
119             $port_mapper = $mapper;
120             }
121             else {
122             #~ warn " [UPnP] UPnP requested but Net::UPnP::ControlPoint not available. Skipping.\n";
123             $upnp_enabled = 0; # Disable UPnP if module not available
124             $port_mapper = undef; # Explicitly set to undef
125             }
126             }
127             else {
128             $port_mapper = undef; # Ensure it's undef if UPnP is disabled
129             }
130              
131             # Register PortMapper events if port_mapper is initialized
132             if ($port_mapper) {
133              
134             #~ $port_mapper->on( 'device_found', sub ($name_hash) { warn " [UPnP] Device found: $name_hash->{name}\n"; } );
135             #~ $port_mapper->on( 'device_not_found', sub { warn " [UPnP] No device found.\n"; } );
136             #~ $port_mapper->on( 'map_success',
137             #~ sub ($args) { warn " [UPnP] Port mapped: $args->{ext_p}/$args->{proto} for $args->{int_p} ($args->{desc})\n"; } );
138             #~ $port_mapper->on( 'map_failed', sub ($args) { warn " [UPnP] Port map failed: $args->{err_c} - $args->{err_d}\n"; } );
139             #~ $port_mapper->on( 'unmap_success', sub ($args) { warn " [UPnP] Port unmapped: $args->{ext_p}/$args->{proto}\n"; } );
140             #~ $port_mapper->on( 'unmap_failed', sub ($args) { warn " [UPnP] Port unmap failed: $args->{err_c} - $args->{err_d}\n"; } );
141             $self->forward_ports();
142             }
143             }
144              
145 46     46   102 sub _generate_peer_id () {
  46         79  
146 46         107 my $v_id = '200'; # Hardcoded version for stability in ID generation
147 46         109 my $chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~';
148 46         161 return pack( 'a20', sprintf( '-NB%sS-%sSanko', $v_id, join( '', map { substr( $chars, rand(66), 1 ) } 1 .. 7 ) ) );
  322         1294  
149              
150             #~ $self->_emit( log => ' [DEBUG] Generated Peer ID: ' . unpack( 'H*', $id ) . " (" . $id . ")\n", level => 'debug' ) if $self->debug;
151             }
152              
153             method forward_ports () {
154             if ($port_mapper) {
155             $port_mapper->discover_device();
156             }
157             }
158              
159             method shutdown () {
160             if ($port_mapper) {
161             $port_mapper->unmap_port( 6881, 'TCP' );
162             $port_mapper->unmap_port( 6881, 'UDP' );
163             }
164             for my $t ( values %torrents ) {
165             $t->stop if $t;
166             }
167             }
168              
169             method DESTROY () {
170             $self->shutdown();
171             }
172              
173             method handle_udp_packet ( $data, $addr ) {
174             return unless length $data;
175             my $first = substr( $data, 0, 1 );
176             if ( $first eq 'd' ) {
177              
178             # Likely DHT
179             return $self->dht->handle_incoming( $data, $addr ) if $self->dht;
180             }
181             elsif ( ord($first) >> 4 == 1 ) {
182              
183             # Likely uTP (version 1)
184             my $res = $utp->handle_packet( $data, $addr );
185             if ($res) {
186              
187             # Send response back
188             $self->dht->socket->send( $res, 0, $addr );
189             }
190             }
191             }
192              
193             method _handle_new_tcp_transport ($transport) {
194             my $weak_self = $self;
195             builtin::weaken($weak_self);
196             my $weak_transport = $transport;
197             builtin::weaken($weak_transport);
198              
199             # We wait for the first chunk of data to decide if it's PWP or MSE
200 0         0 $transport->on(
201 0     0   0 data => sub ( $emitter, $data ) {
  0         0  
  0         0  
202 0 0 0     0 return unless $weak_self && $weak_transport;
203 0         0 my $entry = $weak_self->pending_peers_hash->{$weak_transport};
204 0 0       0 return unless $entry;
205 0 0       0 if ( $entry->{peer} ) {
206 0         0 $entry->{peer}->on_data($data);
207             }
208             else {
209 0         0 $weak_self->_autodetect_protocol( $weak_transport, $data );
210             }
211             }
212             );
213              
214             # Store in pending_peers to keep it alive
215             $pending_peers{$transport} = { transport => $transport, timestamp => time() };
216             }
217              
218             method _autodetect_protocol ( $transport, $data ) {
219             my $entry = $pending_peers{$transport};
220             return unless $entry;
221             return if $entry->{detected};
222              
223             # If we already have a peer object or a filter, this data is for them.
224             if ( $entry->{peer} || $transport->filter ) {
225             return;
226             }
227             $entry->{detected} = 1;
228             my $first_byte = ord( substr( $data, 0, 1 ) );
229             if ( $first_byte == 0x13 ) {
230             if ( $encryption == ENCRYPTION_REQUIRED ) {
231             $self->_emit( log => " [DEBUG] Rejecting plaintext connection because encryption is required\n", level => 'debug' ) if $debug;
232             $transport->socket->close();
233             delete $pending_peers{$transport};
234             return;
235             }
236             $self->_emit( log => " [DEBUG] Autodetected PWP handshake\n", level => 'debug' ) if $debug;
237 19     19   202 use Net::BitTorrent::Protocol::HandshakeOnly;
  19         43  
  19         7240  
238 0         0 my $proto = Net::BitTorrent::Protocol::HandshakeOnly->new(
239             infohash => undef,
240             peer_id => $self->node_id,
241 0     0   0 on_handshake_cb => sub ( $ih, $id ) {
  0         0  
  0         0  
242 0         0 $self->_upgrade_pending_peer( $transport, $ih, $id, $transport->socket->peerhost, $transport->socket->peerport );
243             }
244             );
245             $entry->{peer} = Net::BitTorrent::Peer->new(
246             protocol => $proto,
247             transport => $transport,
248             torrent => undef,
249             ip => $transport->socket->peerhost,
250             port => $transport->socket->peerport,
251             debug => $debug
252             );
253              
254             # Feed the data we already read to the new protocol handler
255             $entry->{peer}->on_data($data);
256             }
257             else {
258             $self->_emit( log => " [DEBUG] Autodetected potential MSE handshake\n", level => 'debug' ) if $debug;
259              
260             # MSE handling will be complex because we don't know the infohash yet
261             # We need an MSE object that can try ALL our hosted infohashes?
262             # No, MSE spec says Req2 is XORed with the infohash.
263             # We must wait until we have Req2, then XOR it with each IH we have until one matches.
264             $self->_handle_incoming_mse( $transport, $data );
265             }
266             }
267              
268             method _handle_incoming_mse ( $transport, $data ) {
269 19     19   964 use Net::BitTorrent::Protocol::MSE;
  19         870  
  19         17483  
270             my $weak_self = $self;
271             builtin::weaken($weak_self);
272 0         0 my $mse = Net::BitTorrent::Protocol::MSE->new(
273             infohash => undef, # Not known yet
274             is_initiator => 0,
275 0     0   0 on_infohash_probe => sub ( $mse_obj, $xor_part, $s ) {
  0         0  
  0         0  
  0         0  
276 0 0       0 return undef unless $weak_self;
277 0         0 my $torrents = $weak_self->torrents();
278 0         0 for my $t (@$torrents) {
279 0         0 my $ih1 = $t->infohash_v1;
280 0         0 my $ih2 = $t->infohash_v2;
281 0         0 for my $ih ( grep {defined} ( $ih1, $ih2 ) ) {
  0         0  
282 0         0 my $expected_xor = $mse_obj->_xor_strings( sha1( 'req2' . $ih ), sha1( 'req3' . $s ) );
283 0 0       0 if ( $xor_part eq $expected_xor ) {
284 0 0       0 $weak_self->_emit( log => " [DEBUG] MSE matched infohash: " . unpack( 'H*', $ih ) . "\n", level => 'debug' )
285             if $weak_self->debug;
286 0         0 return $ih;
287             }
288             }
289             }
290 0         0 return undef;
291             }
292             );
293             my $weak_transport = $transport;
294             builtin::weaken($weak_transport);
295 0         0 $mse->on(
296             'infohash_identified',
297 0     0   0 sub ( $emitter, $ih ) {
  0         0  
  0         0  
298 0 0 0     0 return unless $weak_self && $weak_transport;
299 0         0 $weak_self->_upgrade_pending_peer( $weak_transport, $ih, undef, $weak_transport->socket->peerhost,
300             $weak_transport->socket->peerport );
301             }
302             );
303             $transport->set_filter($mse);
304             $self->_emit( log => " [DEBUG] Incoming MSE handshake started\n", level => 'debug' ) if $debug;
305              
306             # Feed the data we already have
307             $mse->receive_data($data);
308             my $entry = $pending_peers{$transport};
309             $entry->{mse} = $mse;
310             }
311              
312             method _upgrade_pending_peer ( $transport, $ih, $peer_id, $ip, $port ) {
313             my $entry = $pending_peers{$transport};
314             if ( !$entry ) {
315              
316             # Already upgraded or timed out
317             return;
318             }
319             delete $pending_peers{$transport};
320             my $torrent = $torrents{$ih};
321             if ( !$torrent ) {
322             $self->_emit( log => " [DEBUG] Handshake for unknown torrent " . unpack( 'H*', $ih ) . " from $ip:$port\n", level => 'debug' )
323             if $debug;
324             $transport->socket->close() if $transport->socket;
325             return;
326             }
327 19     19   12258 use Net::BitTorrent::Protocol::PeerHandler;
  19         81  
  19         34241  
328             my $p_handler = Net::BitTorrent::Protocol::PeerHandler->new(
329             infohash => $ih,
330             peer_id => $self->node_id,
331             features => $torrent->features,
332             debug => $debug,
333             metadata_size => $torrent->metadata ? length( Net::BitTorrent::Protocol::BEP03::Bencode::bencode( $torrent->metadata->{info} ) ) : 0,
334             );
335             my $peer;
336             if ( $entry->{peer} ) {
337             $peer = $entry->{peer};
338             $peer->set_protocol($p_handler);
339             $peer->set_torrent($torrent);
340             }
341             else {
342             $peer = Net::BitTorrent::Peer->new(
343             protocol => $p_handler,
344             torrent => $torrent,
345             transport => $transport,
346             ip => $ip,
347             port => $port,
348             debug => $debug,
349             mse => $entry->{mse},
350             encryption => $encryption,
351             );
352             }
353             $p_handler->set_peer($peer);
354             $p_handler->set_parent_emitter($peer);
355             if ( defined $peer_id ) {
356             $p_handler->on_handshake( $ih, $peer_id );
357             }
358             $torrent->register_peer_object($peer);
359             }
360             method set_limit_down ($val) { $limit_down->set_limit($val) }
361             method hashing_queue_size () { scalar @hashing_queue }
362              
363             method queue_verification ( $torrent, $index, $data ) {
364             $self->_emit( log => "\n [LOUD] PIECE $index: Queuing for verification (" . length($data) . " bytes)\n", level => 'info' );
365             push @hashing_queue, { torrent => $torrent, index => $index, data => $data };
366             }
367              
368             method _process_hashing_queue ($delta) {
369             $hashing_allowance += $hashing_rate_limit * $delta;
370             if ( @hashing_queue && $hashing_allowance < length( $hashing_queue[0]{data} ) ) {
371             $self->_emit(
372             log => sprintf( "\r [LOUD] Hashing Throttled: %.2f%% of next piece ready",
373             ( $hashing_allowance / length( $hashing_queue[0]{data} ) ) * 100 ),
374             level => 'info'
375             );
376             }
377             while (@hashing_queue) {
378             my $task = $hashing_queue[0];
379             my $len = length( $task->{data} );
380             if ( $hashing_allowance >= $len ) {
381             shift @hashing_queue;
382             $hashing_allowance -= $len;
383             $self->_emit( log => "\n [LOUD] PIECE $task->{index}: Processing hash...\n", level => 'info' );
384             $task->{torrent}->_verify_queued_piece( $task->{index}, $task->{data} );
385             }
386             else {
387             # Not enough allowance to finish this piece yet
388             last;
389             }
390             }
391             }
392              
393             method dht_get ( $target, $cb ) {
394             return unless $self->dht;
395              
396             # First, iterative find_node to get close to target.
397             # Then call get_remote on closest nodes
398             # Simplified: trigger iterative lookup and register callback
399             $self->dht->find_node_remote( $target, $_->[0], $_->[1] ) for @{ $self->dht->boot_nodes };
400             $dht_queries{$target} = { cb => $cb, type => 'get' };
401             }
402              
403             method dht_put ( $value, $cb = undef ) {
404             return unless $self->dht;
405             my $target = Digest::SHA::sha1($value);
406              
407             # Simplified: find nodes and then put
408             $self->dht->find_node_remote( $target, $_->[0], $_->[1] ) for @{ $self->dht->boot_nodes };
409             $dht_queries{$target} = { cb => $cb, type => 'put', value => $value };
410             }
411              
412             method dht_scrape ( $infohash, $cb ) {
413             return unless $self->dht;
414             $self->dht->scrape($infohash);
415             $dht_queries{$infohash} = { cb => $cb, type => 'scrape' };
416             }
417              
418             method dht_crawl () {
419             return unless $self->dht;
420              
421             # Random sample to discover new infohashes
422             my $random_target = pack( 'H*', join( '', map { sprintf( '%02x', rand(256) ) } 1 .. 20 ) );
423             $self->dht->sample($random_target);
424             }
425             method dht_index () { return \%dht_index }
426              
427             method connect_to_peer ( $ip, $port, $ih ) {
428 19     19   207 use IO::Socket::IP;
  19         43  
  19         311  
429             my $socket = IO::Socket::IP->new( PeerHost => $ip, PeerPort => $port, Type => SOCK_STREAM, Blocking => 0, );
430             return unless $socket;
431             $self->_emit( log => " [DEBUG] Connecting to $ip:$port for " . unpack( 'H*', $ih ) . "\n", level => 'debug' ) if $debug;
432 19     19   74980 use Net::BitTorrent::Transport::TCP;
  19         63  
  19         2950  
433             my $transport = Net::BitTorrent::Transport::TCP->new( socket => $socket, connecting => 1 );
434              
435             # Add to pending_peers immediately
436             $pending_peers{$transport} = { transport => $transport, timestamp => time() };
437             my $weak_self = $self;
438             builtin::weaken($weak_self);
439             my $weak_transport = $transport;
440             builtin::weaken($weak_transport);
441             if ( $encryption == ENCRYPTION_REQUIRED || $encryption == ENCRYPTION_PREFERRED ) {
442 19     19   153 use Net::BitTorrent::Protocol::MSE;
  19         37  
  19         25384  
443             my $mse = Net::BitTorrent::Protocol::MSE->new( infohash => $ih, is_initiator => 1, );
444 0         0 $mse->on(
445             'infohash_identified',
446 0     0   0 sub ( $emitter, $ih ) {
  0         0  
  0         0  
447 0 0 0     0 return unless $weak_self && $weak_transport;
448 0         0 $weak_self->_upgrade_pending_peer(
449             $weak_transport, $ih, undef,
450             $weak_transport->socket->peerhost,
451             $weak_transport->socket->peerport
452             );
453             }
454             );
455             $transport->set_filter($mse);
456             $pending_peers{$transport}{mse} = $mse;
457 0         0 $transport->on(
458             'filter_failed',
459 0     0   0 sub ( $emitter, $leftover ) {
  0         0  
  0         0  
460 0 0 0     0 return unless $weak_self && $weak_transport;
461 0 0       0 $weak_self->_emit( log => " [DEBUG] connect_to_peer: MSE failed, falling back to plaintext\n", level => 'debug' )
462             if $weak_self->debug;
463 0         0 $weak_self->_upgrade_pending_peer(
464             $weak_transport, $ih, undef,
465             $weak_transport->socket->peerhost,
466             $weak_transport->socket->peerport
467             );
468             }
469             );
470             }
471             else {
472             # Plaintext outgoing: create peer immediately
473             $self->_upgrade_pending_peer( $transport, $ih, undef, $ip, $port );
474             }
475              
476             # Reuse incoming data handler logic
477 0         0 $transport->on(
478             'data',
479 0     0   0 sub ( $emitter, $data ) {
  0         0  
  0         0  
480 0 0 0     0 return unless $weak_self && $weak_transport;
481 0         0 my $entry = $weak_self->pending_peers_hash->{$weak_transport};
482 0 0       0 return unless $entry; # Might have been upgraded already
483 0 0       0 if ( $entry->{peer} ) {
484 0         0 $entry->{peer}->on_data($data);
485             }
486             else {
487 0         0 $weak_self->_autodetect_protocol( $weak_transport, $data );
488             }
489             }
490             );
491             return $transport;
492             }
493             method pending_peers_hash () { \%pending_peers }
494              
495             method add ( $thing, $base_path, %args ) {
496             if ( $thing =~ /^magnet:/i ) {
497             return $self->add_magnet( $thing, $base_path, %args );
498             }
499             elsif ( length($thing) == 20 || ( length($thing) == 40 && $thing =~ /^[0-9a-f]+$/i ) ) {
500             return $self->add_infohash( $thing, $base_path, %args );
501             }
502             elsif ( length($thing) == 32 || ( length($thing) == 64 && $thing =~ /^[0-9a-f]+$/i ) ) {
503             return $self->add_infohash( $thing, $base_path, %args );
504             }
505             elsif ( -f $thing ) {
506             return $self->add_torrent( $thing, $base_path, %args );
507             }
508             $self->_emit( log => "Don't know how to add '$thing'", level => 'fatal' );
509             return undef;
510             }
511              
512             method add_torrent ( $path, $base_path, %args ) {
513             my $t = Net::BitTorrent::Torrent->new( path => $path, base_path => $base_path, client => $self, debug => $debug, peer_id => $node_id, %args );
514             $torrents{ $t->infohash_v1 } = $t if $t->infohash_v1;
515             $torrents{ $t->infohash_v2 } = $t if $t->infohash_v2;
516             $self->_emit( 'torrent_added', $t );
517             return $t;
518             }
519              
520             method add_infohash ( $ih, $base_path, %args ) {
521             my $t
522             = Net::BitTorrent::Torrent->new( infohash => $ih, base_path => $base_path, client => $self, debug => $debug, peer_id => $node_id, %args );
523             $torrents{ $t->infohash_v1 } = $t if $t->infohash_v1;
524             $torrents{ $t->infohash_v2 } = $t if $t->infohash_v2;
525             $self->_emit( 'torrent_added', $t );
526             return $t;
527             }
528              
529             method add_magnet ( $uri, $base_path, %args ) {
530 19     19   12948 use Net::BitTorrent::Protocol::BEP53;
  19         67  
  19         17527  
531             my $m = Net::BitTorrent::Protocol::BEP53->parse($uri);
532             my $t = Net::BitTorrent::Torrent->new(
533             infohash_v1 => $m->infohash_v1,
534             infohash_v2 => $m->infohash_v2,
535             initial_trackers => $m->trackers,
536             initial_peers => $m->nodes, # x.pe
537             base_path => $base_path,
538             client => $self,
539             debug => $debug,
540             peer_id => $node_id,
541             %args
542             );
543             $torrents{ $t->infohash_v1 } = $t if $t->infohash_v1;
544             $torrents{ $t->infohash_v2 } = $t if $t->infohash_v2;
545             $self->_emit( 'torrent_added', $t );
546             return $t;
547             }
548              
549             method torrents () {
550             return [ values %torrents ];
551             }
552              
553             method dht () {
554             return undef unless $bep05;
555             if ( !$dht ) {
556             $dht = Net::BitTorrent::DHT->new(
557             node_id_bin => $node_id,
558             port => $port,
559             want_v6 => 1,
560             bep32 => 1,
561             bep42 => 0,
562             debug => $debug,
563             boot_nodes => [ [ 'router.bittorrent.com', 6881 ], [ 'router.utorrent.com', 6881 ], [ 'dht.transmissionbt.com', 6881 ] ]
564             );
565             my $weak_self = $self;
566             builtin::weaken($weak_self);
567 1         2 $dht->on(
568             'external_ip_detected',
569 1     1   3 sub ( $emitter, $ip ) {
  1         3  
  1         2  
570 1 50       6 return unless $weak_self;
571              
572             #~ warn " [DHT] External IP detected: $ip. Rotating node_id.\n";
573             # my $sec = Net::BitTorrent::DHT::Security->new();
574             # my $new_id = $sec->generate_node_id($ip);
575             # $weak_self->set_node_id($new_id);
576             # $dht->set_node_id($new_id);
577             }
578             );
579             $dht->bootstrap();
580             }
581             return $dht;
582             }
583              
584             method tick ( $timeout //= 0.1 ) {
585             $tick_debt += $timeout;
586             $tick_debt = 5.0 if $tick_debt > 5.0; # Max debt to avoid huge bursts
587             my $real_start = time();
588             while ( $tick_debt >= 0.01 ) {
589             my $slice = 0.1;
590             $slice = $tick_debt if $tick_debt < $slice;
591             $self->_run_one_tick($slice);
592             $tick_debt -= $slice;
593              
594             # Don't block the caller's main loop for more than 200ms
595             last if ( time() - $real_start ) > 0.2;
596             }
597             }
598              
599             method _run_one_tick ($timeout) {
600             $self->_emit( log => " [DEBUG] Net::BitTorrent::_run_one_tick starting (timeout=$timeout)\n", level => 'debug' ) if $debug > 1;
601             my $start = time();
602             $limit_up->tick($timeout);
603             $limit_down->tick($timeout);
604              
605             # Accept incoming TCP connections
606             if ($tcp_listener) {
607             my $sel = IO::Select->new($tcp_listener);
608             if ( $sel->can_read(0) ) {
609             while ( my $socket = $tcp_listener->accept() ) {
610             $socket->blocking(0);
611             $self->_emit(
612             log => " [DEBUG] Accepted TCP connection from " . $socket->peerhost . ":" . $socket->peerport . "\n",
613             level => 'debug'
614             ) if $debug;
615 19     19   200 use Net::BitTorrent::Transport::TCP;
  19         41  
  19         21312  
616             my $transport = Net::BitTorrent::Transport::TCP->new( socket => $socket, connecting => 0 );
617              
618             # Autodetect MSE vs PWP will happen in the first data received
619             $self->_handle_new_tcp_transport($transport);
620             }
621             }
622             }
623             else {
624             $self->_emit( log => " [DEBUG] No TCP listener active\n", level => 'debug' ) if $debug > 1;
625             }
626              
627             # Process hashing queue (throttled)
628             $self->_process_hashing_queue($timeout);
629              
630             # Update LPD
631             $lpd->tick($timeout) if $lpd;
632              
633             # Update torrents (including trackers and storage)
634             for my $ih ( keys %torrents ) {
635             $torrents{$ih}->tick($timeout);
636             }
637              
638             # Update pending peers (ones being autodetected or in handshake)
639             for my $t_key ( keys %pending_peers ) {
640             my $entry = $pending_peers{$t_key};
641             my $transport = $entry->{transport};
642             if ( $entry->{peer} ) {
643             $entry->{peer}->tick();
644             }
645             else {
646             # If no peer object yet, we still need to tick the transport
647             # to read the autodetection data.
648             $transport->tick();
649             }
650              
651             # Timeout old pending connections (30s)
652             if ( time() - $entry->{timestamp} > 30 ) {
653             if ($debug) {
654             my $host = 'unknown';
655             try {
656             if ( $transport->socket ) {
657             $host = $transport->socket->peerhost // 'unknown';
658             }
659             }
660             catch ($e) { }
661             $self->_emit( log => " [DEBUG] Timing out pending connection from $host\n", level => 'debug' );
662             }
663             $transport->socket->close() if $transport->socket;
664             delete $pending_peers{$t_key};
665             }
666             }
667              
668             # Collect DHT events from direct packet processing
669             my ( @packet_nodes, @packet_peers, @packet_data );
670              
671             # Read from UDP socket (DHT/uTP)
672             if ( $dht && $dht->socket ) {
673             my $sel = IO::Select->new( $dht->socket );
674             while ( $sel->can_read(0) ) {
675             my $remote_addr = $dht->socket->recv( my $data, 65535 );
676             if ($remote_addr) {
677             my @res = $self->handle_udp_packet( $data, $remote_addr );
678             if (@res) {
679             push @packet_nodes, @{ $res[0] } if ref $res[0] eq 'ARRAY';
680             push @packet_peers, @{ $res[1] } if ref $res[1] eq 'ARRAY';
681             push @packet_data, $res[2] if $res[2];
682             }
683             }
684             }
685             }
686              
687             # Update uTP
688             my $utp_packets = $utp->tick($timeout);
689             for my $pkt (@$utp_packets) {
690              
691             # Send retransmissions etc.
692             # We use the DHT socket for convenience if it exists
693             if ( $dht && $dht->socket ) {
694              
695             # Need to convert ip/port to sockaddr
696 19     19   182 use Socket qw[pack_sockaddr_in inet_aton];
  19         43  
  19         33550  
697             my $addr = pack_sockaddr_in( $pkt->{port}, inet_aton( $pkt->{ip} ) );
698             $dht->socket->send( $pkt->{data}, 0, $addr );
699             }
700             }
701              
702             # Update DHT
703             if ($dht) {
704             my ( $tick_nodes, $tick_peers, $tick_data ) = $dht->tick($timeout);
705             my @all_nodes = ( @{ $tick_nodes // [] }, @packet_nodes );
706             my @all_peers = ( @{ $tick_peers // [] }, @packet_peers );
707              
708             # Merge packet-derived data with tick-derived data
709             my @all_data = grep {defined} ( $tick_data, @packet_data );
710             if ( $debug && ( @all_nodes || @all_peers || @all_data ) ) {
711             $self->_emit(
712             log => sprintf(
713             " [DEBUG] DHT tick+packets: nodes=%d, peers=%d, data=%d\n",
714             scalar(@all_nodes), scalar(@all_peers), scalar(@all_data)
715             ),
716             level => 'debug'
717             );
718             }
719              
720             # If we found new nodes, add them to the frontier of starving torrents
721             if (@all_nodes) {
722             for my $t ( values %torrents ) {
723             next if scalar( @{ $t->peer_objects // [] } ) >= 20;
724             $t->add_dht_nodes( \@all_nodes );
725             }
726             }
727              
728             # Dispatch peers to relevant torrents
729             # Net::BitTorrent::DHT::handle_incoming returns (nodes, peers, data)
730             # The 'data' (result) hash contains 'queried_target' which is the infohash
731             # we were looking for when these peers were returned.
732             for my $d (@all_data) {
733             my $ih = $d->{queried_target};
734             if ( $ih && ( my $t = $torrents{$ih} ) ) {
735             if ( $debug && @all_peers ) {
736             $self->_emit(
737             log => " [DEBUG] Dispatching " . scalar(@all_peers) . " peers to torrent " . unpack( "H*", $ih ) . "\n",
738             level => 'debug'
739             );
740             }
741             for my $peer (@all_peers) {
742             $t->add_peer($peer);
743             }
744             }
745             elsif ( $debug && $ih ) {
746             $self->_emit( log => " [DEBUG] DHT result for unknown infohash " . unpack( "H*", $ih ) . "\n", level => 'debug' );
747             }
748             }
749              
750             # Handle BEP 33, 44, 51 data
751             for my $d (@all_data) {
752             if ( exists $d->{samples} ) { # BEP 51
753             for my $ih ( @{ $d->{samples} } ) {
754             my $key = unpack( 'H*', $ih );
755             $dht_index{$key} = time();
756             if ( keys %dht_index > 1000 ) {
757              
758             # Remove oldest or just random?
759             # For simplicity, remove first key (random in Perl)
760             delete $dht_index{ ( keys %dht_index )[0] };
761             }
762             }
763             }
764             if ( exists $d->{v} ) { # BEP 44
765              
766             # Calculate target (immutable) or use key (mutable)
767             my $target = Digest::SHA::sha1( $d->{v} );
768             if ( my $q = delete $dht_queries{$target} ) {
769             $q->{cb}->( $d->{v}, $d ) if $q->{cb};
770             }
771             }
772             if ( exists $d->{sn} ) { # BEP 33
773              
774             # Scrape result - find matching torrent
775             if ( my $ih = $d->{queried_target} ) {
776             if ( my $t = $torrents{$ih} ) {
777             $t->handle_dht_scrape($d);
778             }
779             }
780             }
781             }
782             }
783              
784             # Update all torrents (evaluates choking, etc.)
785             for my $t ( values %torrents ) {
786             $t->tick($timeout);
787              
788             # BEP 14: Periodically announce on local network
789             # (Simplified: every ~60s if we tracked a timer, here we just do it occasionally)
790             if ( rand() < 0.01 ) { # Hack for now
791             if ($lpd) {
792             $lpd->announce( $t->infohash_v2, 6881 ) if $t->infohash_v2;
793             $lpd->announce( $t->infohash_v1, 6881 ) if $t->infohash_v1;
794             }
795             }
796             }
797             }
798              
799             method save_state ($path) {
800 19     19   17807 use JSON::PP qw[encode_json];
  19         377909  
  19         2336  
801 19     19   206 use Path::Tiny qw[path];
  19         43  
  19         6124  
802             my %data = ( node_id => $node_id, torrents => {}, );
803             my %seen;
804             for my $ih ( keys %torrents ) {
805             my $t = $torrents{$ih};
806             next if $seen{ builtin::refaddr($t) }++;
807             $data{torrents}{ unpack( 'H*', $ih ) } = $t->dump_state();
808             }
809             path($path)->spew_utf8( encode_json( \%data ) );
810             }
811              
812             method load_state ($path) {
813 19     19   148 use JSON::PP qw[decode_json];
  19         47  
  19         1261  
814 19     19   115 use Path::Tiny qw[path];
  19         42  
  19         17655  
815             return unless path($path)->exists;
816             my $data = decode_json( path($path)->slurp_utf8 );
817             $node_id = $data->{node_id} if $data->{node_id};
818             for my $ih_hex ( keys %{ $data->{torrents} // {} } ) {
819             my $ih = pack( 'H*', $ih_hex );
820             if ( my $t = $torrents{$ih} ) {
821             $t->load_state( $data->{torrents}{$ih_hex} );
822             }
823             }
824             }
825              
826             method finished () {
827             return [ grep { $_->is_finished } values %torrents ];
828             }
829              
830             method wait ( $condition = undef, $timeout = undef ) {
831             my $start = time();
832             $condition //= sub {
833 0     0   0 my @t = values %torrents;
834 0 0       0 return 1 if !@t;
835 0         0 return ( grep { $_->is_finished } @t ) == @t;
  0         0  
836             };
837             while ( !$condition->($self) ) {
838             $self->tick(0.1);
839             if ( defined $timeout && ( time() - $start ) > $timeout ) {
840             return 0;
841             }
842             select( undef, undef, undef, 0.05 );
843             }
844             return 1;
845             }
846             };
847             #
848             1;