File Coverage

lib/Net/BitTorrent/Peer.pm
Criterion Covered Total %
statement 17 17 100.0
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 23 23 100.0


line stmt bran cond sub pod time code
1 19     19   246 use v5.40;
  19         174  
2 19     19   119 use feature 'class';
  19         32  
  19         2474  
3 19     19   189 no warnings 'experimental::class';
  19         38  
  19         1012  
4 19     19   102 use Net::BitTorrent::Emitter;
  19         33  
  19         1578  
5             class Net::BitTorrent::Peer v2.0.0 : isa(Net::BitTorrent::Emitter) {
6 19     19   107 use Net::BitTorrent::Types qw[:encryption :state];
  19         33  
  19         22462  
7             field $protocol : param;
8              
9             # Instance of Net::BitTorrent::Protocol::BEP03 or subclass
10             field $torrent : param : reader; # Parent Net::BitTorrent::Torrent object
11             field $transport : param : reader; # Net::BitTorrent::Transport::*
12             field $ip : param : reader = undef;
13             field $port : param : reader = undef;
14             field $am_choking : reader = 1;
15             field $am_interested : reader = 0;
16             field $peer_choking : reader = 1;
17             field $peer_interested : reader = 0;
18             field $blocks_inflight : reader = 0;
19             field $bitfield_status : reader : writer = undef; # 'all', 'none', or raw data
20             field $offered_piece = undef;
21             field $bytes_down = 0;
22             field $bytes_up = 0;
23             field $rate_down : reader = 0;
24             field $rate_up : reader = 0;
25             field $reputation : reader = 100; # Start at 100
26             field $debug : param : reader = 0;
27             field $encryption : param : reader = ENCRYPTION_PREFERRED; # none, preferred, required
28             field $mse : param = undef;
29             field @allowed_fast_set; # Pieces we are allowed to request even if choked
30             field @suggested_pieces;
31             field $pwp_handshake_sent = 0;
32             method protocol () {$protocol}
33             method is_encrypted () { defined $mse && $mse->state eq 'PAYLOAD' }
34             method is_seeder () { defined $bitfield_status && $bitfield_status eq 'all' }
35              
36             method flags () {
37             my $f = 0;
38             $f |= 0x01 if $self->is_encrypted;
39             $f |= 0x02 if $self->is_seeder;
40             return $f;
41             }
42             ADJUST {
43             $self->set_parent_emitter($torrent);
44             builtin::weaken($torrent) if defined $torrent;
45             if ( $protocol->can('set_peer') ) {
46             $protocol->set_peer($self);
47             }
48             if ( !$mse && $encryption != ENCRYPTION_NONE ) {
49 19     19   10165 use Net::BitTorrent::Protocol::MSE;
  19         74  
  19         108978  
50             $mse = Net::BitTorrent::Protocol::MSE->new(
51             infohash => $torrent ? ( $torrent->infohash_v1 // $torrent->infohash_v2 ) : undef,
52             is_initiator => 1, # Outgoing
53             allow_plaintext => ( $encryption == ENCRYPTION_PREFERRED ? 1 : 0 ),
54             );
55             if ( $mse->supported ) {
56             $transport->set_filter($mse);
57             }
58             else {
59             $mse = undef;
60             }
61             }
62             my $weak_self = $self;
63             builtin::weaken($weak_self);
64             $transport->on(
65             'data',
66             sub ( $emitter, $data ) {
67             $weak_self->receive_data($data) if $weak_self;
68             }
69             );
70             $transport->on(
71             'disconnected',
72             sub ( $emitter, @args ) {
73             $weak_self->disconnected() if $weak_self;
74             }
75             );
76             $transport->on(
77             'filter_failed',
78             sub ( $emitter, $leftover ) {
79             return unless $weak_self;
80             return if $weak_self->encryption == ENCRYPTION_REQUIRED;
81             $weak_self->_emit( log => " [DEBUG] Falling back to plaintext handshake...\n", level => 'debug' ) if $weak_self->debug;
82              
83             # We can't easily change $mse from here because it's a field
84             # but we can call a method or just use it.
85             # Actually $mse is in scope but it's a field.
86             # In ADJUST, we can access fields.
87             $mse = undef;
88             $protocol->send_handshake();
89             $pwp_handshake_sent = 1;
90             }
91             );
92             $transport->on(
93             'connected',
94             sub ( $emitter, @args ) {
95             return unless $weak_self;
96             if ($mse) {
97             $weak_self->_emit( log => " [DEBUG] Starting MSE handshake...\n", level => 'debug' ) if $weak_self->debug;
98              
99             # Handshake is driven by transport filter's write_buffer in tick()
100             }
101             else {
102             $protocol->send_handshake();
103             $pwp_handshake_sent = 1;
104             }
105             }
106             );
107             $self->on(
108             'handshake_complete',
109             sub ( $emitter, @args ) {
110             return unless $weak_self;
111              
112             # Some peers need us to be unchoked/interested to talk to us
113             # but we'll stay choked until we have metadata if we want to be safe.
114             # However, we MUST send bitfield/have_none to be protocol compliant.
115             # BEP 03: Send bitfield if we have one
116             if ( $torrent && $torrent->bitfield ) {
117             $protocol->send_bitfield( $torrent->bitfield->data );
118             }
119              
120             # BEP 06: Send HAVE_NONE ONLY if remote supports Fast Extension
121             elsif ( ord( substr( $protocol->reserved, 7, 1 ) ) & 0x04 ) {
122             if ( $protocol->can('send_have_none') ) {
123             $protocol->send_have_none();
124             }
125             }
126              
127             # If in METADATA mode, we don't send unchoke/interested yet
128             return if $torrent && $torrent->state == STATE_METADATA;
129             $weak_self->unchoke();
130             $weak_self->_check_interest();
131              
132             # BEP 06: Send Allowed Fast set immediately after handshake
133             if ( $protocol->isa('Net::BitTorrent::Protocol::BEP06') ) {
134             my $set = $torrent->get_allowed_fast_set( $weak_self->ip );
135             for my $idx (@$set) {
136             $protocol->send_allowed_fast($idx);
137             }
138             }
139             }
140             );
141             }
142              
143             method send_suggest ($index) {
144             $protocol->send_suggest($index) if $protocol->can('send_suggest');
145             }
146              
147             method send_allowed_fast ($index) {
148             $protocol->send_allowed_fast($index) if $protocol->can('send_allowed_fast');
149             }
150              
151             method on_data ($data) {
152             $self->receive_data($data);
153             }
154              
155             method set_protocol ($p) {
156             $protocol = $p;
157             }
158              
159             method set_torrent ($t) {
160             $torrent = $t;
161             }
162              
163             method receive_data ($data) {
164             $self->_emit( log => " [DEBUG] Peer received " . length($data) . " bytes of data\n", level => 'debug' ) if $debug;
165             $torrent->can_read( length $data );
166             $protocol->receive_data($data);
167             }
168              
169             method write_buffer () {
170             my $raw = $protocol->write_buffer();
171             return '' unless length $raw;
172              
173             # Rate limiting logic
174             my $allowed = length $raw;
175             if ($torrent) {
176             $allowed = $torrent->can_write( length $raw );
177             }
178             if ( $allowed < length $raw ) {
179              
180             # Simplified: if we can't send all, we send none or partial.
181             # TRULY correct rate limiting for loop-agnostic core requires
182             # the loop itself to check can_read/can_write BEFORE calling these.
183             }
184             return $transport->send_data($raw);
185             }
186              
187             method handle_hash_request ( $root, $proof_layer, $base_layer, $index, $length ) {
188             my $file = $torrent->storage->get_file_by_root($root);
189             if ( !$file || !$file->merkle ) {
190             $protocol->send_hash_reject( $root, $proof_layer, $base_layer, $index, $length ) if $protocol->can('send_hash_reject');
191             return;
192             }
193             my $hashes = $file->merkle->get_hashes( $base_layer, $index, $length );
194              
195             # Simplified: no proof nodes added yet
196             $protocol->send_hashes( $root, $proof_layer, $base_layer, $index, $length, $hashes ) if $protocol->can('send_hashes');
197             }
198              
199             method handle_hashes ( $root, $proof_layer, $base_layer, $index, $length, $hashes ) {
200             my $file = $torrent->storage->get_file_by_root($root);
201             return unless $file && $file->merkle;
202             my $node_size = 32;
203             my $num_hashes = length($hashes) / $node_size;
204              
205             # BEP 52: index and length refer to the range of nodes at base_layer.
206             # The hashes string contains these nodes concatenated.
207             for ( my $i = 0; $i < $num_hashes; $i++ ) {
208             my $hash = substr( $hashes, $i * $node_size, $node_size );
209             $file->merkle->set_node( $base_layer, $index + $i, $hash );
210             }
211             $self->_emit(
212             log => " [DEBUG] Received and stored $num_hashes hashes for root " . unpack( 'H*', $root ) . " at layer $base_layer\n",
213             level => 'debug'
214             ) if $debug;
215             }
216              
217             method handle_hash_reject ( $root, $proof_layer, $base_layer, $index, $length ) {
218             $self->_emit( log => " [DEBUG] Peer rejected hash request for root " . unpack( 'H*', $root ) . "\n", level => 'debug' ) if $debug;
219             }
220              
221             method handle_metadata_request ($piece) {
222             $torrent->handle_metadata_request( $self, $piece );
223             }
224              
225             method handle_metadata_data ( $piece, $total_size, $data ) {
226             $torrent->handle_metadata_data( $self, $piece, $total_size, $data );
227             }
228              
229             method handle_metadata_reject ($piece) {
230             $torrent->handle_metadata_reject( $self, $piece );
231             }
232              
233             method handle_pex ( $added, $dropped, $added6, $dropped6 ) {
234             for my $p ( @$added, @$added6 ) {
235             $torrent->add_peer($p);
236             }
237             }
238              
239             method handle_hp_rendezvous ($id) {
240              
241             # Remote wants to connect to a node with $id via us.
242             # Find node in our swarm.
243             my $target;
244             for my $p ( values $torrent->peer_objects_hash->%* ) {
245             if ( $p->protocol->can('peer_id') && $p->protocol->peer_id eq $id ) {
246             $target = $p;
247             last;
248             }
249             }
250             if ( $target && exists $target->protocol->remote_extensions->{ut_holepunch} ) {
251              
252             # Relay connect instruction to target
253             $target->protocol->send_hp_connect( $self->ip, $self->port );
254              
255             # Acknowledge to source (optional, BEP 55 says relay then ack?)
256             # Actually, BEP says relay 'connect' to target.
257             }
258             else {
259             $protocol->send_hp_error(0x01) if $protocol->can('send_hp_error'); # 0x01 = peer not found
260             }
261             }
262              
263             method handle_hp_connect ( $ip, $port ) {
264             $self->_emit( log => " [BEP 55] Instructed to connect to $ip:$port\n", level => 'info' ) if $debug;
265              
266             # Trigger uTP connection
267             $torrent->client->connect_to_peer( $ip, $port, $torrent->infohash_v2 || $torrent->infohash_v1 );
268             }
269              
270             method handle_hp_error ($err) {
271             $self->_emit( log => " [BEP 55] Received holepunch error: $err\n", level => 'error' ) if $debug;
272             }
273              
274             method handle_message ( $id, $payload ) {
275              
276             # warn ' [DEBUG] Peer ' . ($socket ? $socket->peerhost : 'sim') . " sent message ID $id (len " . length($payload) . ")\n";
277             if ( $id == 0 ) { # CHOKE
278             $peer_choking = 1;
279             $self->_emit('choked');
280             }
281             elsif ( $id == 1 ) { # UNCHOKE
282             $peer_choking = 0;
283             $self->_emit('unchoked');
284             $self->_request_next_block();
285             }
286             elsif ( $id == 2 ) { # INTERESTED
287             $peer_interested = 1;
288             $self->_emit('interested');
289             }
290             elsif ( $id == 3 ) { # NOT_INTERESTED
291             $peer_interested = 0;
292             $self->_emit('not_interested');
293             }
294             elsif ( $id == 4 ) { # HAVE
295             my $index = unpack( 'N', $payload );
296             $torrent->update_peer_have( $self, $index );
297              
298             # BEP 16: If we see this peer (or others) have our offered piece,
299             # we can offer a new one. (Simplified global check)
300             if ( defined $offered_piece && $index == $offered_piece ) {
301             $offered_piece = undef;
302             }
303             $self->_check_interest();
304             }
305             elsif ( $id == 5 ) { # BITFIELD
306             $bitfield_status = $payload;
307             $torrent->set_peer_bitfield( $self, $payload );
308             $self->_emit( bitfield => $torrent->peer_bitfields->{$self} );
309              
310             # BEP 16: If superseeding, we don't send our real bitfield.
311             # Instead, we wait for interest and then offer pieces.
312             $self->_check_interest();
313             }
314             elsif ( $id == 6 ) { # REQUEST
315             my ( $index, $begin, $len ) = unpack( 'N N N', $payload );
316             $self->_handle_request( $index, $begin, $len );
317             }
318             elsif ( $id == 7 ) { # PIECE
319             my ( $index, $begin ) = unpack( 'N N', substr( $payload, 0, 8, '' ) );
320             $self->_handle_piece_data( $index, $begin, $payload );
321             }
322             elsif ( $id == 13 ) { # SUGGEST_PIECE
323             my $index = unpack( 'N', $payload );
324             push @suggested_pieces, $index;
325             $self->_check_interest();
326             }
327             elsif ( $id == 14 ) { # HAVE_ALL
328             $bitfield_status = 'all';
329             $torrent->set_peer_have_all($self);
330             $self->_emit('have_all');
331             $self->_check_interest();
332             }
333             elsif ( $id == 15 ) { # HAVE_NONE
334             $bitfield_status = 'none';
335             $torrent->set_peer_have_none($self);
336             $self->_emit('have_none');
337             }
338             elsif ( $id == 16 ) { # REJECT
339             my ( $index, $begin, $len ) = unpack( 'N N N', $payload );
340             $self->_handle_reject( $index, $begin, $len );
341             }
342             elsif ( $id == 17 ) { # ALLOWED_FAST
343             my $index = unpack( 'N', $payload );
344             push @allowed_fast_set, $index;
345             $self->_check_interest();
346             }
347             }
348              
349             method _handle_reject ( $index, $begin, $len ) {
350             $blocks_inflight--;
351              
352             # Ideally tell torrent to un-pending this block
353             # For now, we just proceed to request next.
354             $self->_request_next_block();
355             }
356              
357             method _check_interest () {
358             if ( $torrent->is_superseed ) {
359             $self->_check_superseed();
360             }
361             if ( !$am_interested ) {
362              
363             # In a real client, we check if the peer has any piece we lack
364             $am_interested = 1;
365             $protocol->send_message(2); # INTERESTED
366             }
367             }
368              
369             method _check_superseed () {
370             return if defined $offered_piece;
371              
372             # Pick a piece to offer
373             my $bitfield = $torrent->bitfield;
374             my $p_bfs = $torrent->peer_bitfields;
375             my $p_bf = $p_bfs->{$self};
376             return unless $p_bf;
377             for ( my $i = 0; $i < $bitfield->size; $i++ ) {
378             if ( $bitfield->get($i) && !$p_bf->get($i) ) {
379             $offered_piece = $i;
380             $protocol->send_message( 4, pack( 'N', $i ) ); # HAVE
381             last;
382             }
383             }
384             }
385              
386             method _request_next_block () {
387             while ( $blocks_inflight < 5 ) {
388             my $req = $torrent->get_next_request($self);
389             if ($req) {
390              
391             # BEP 06: Can request if not choked OR if piece is in allowed_fast_set
392             if ( !$peer_choking || $self->is_allowed_fast( $req->{index} ) ) {
393             $protocol->send_message( 6, pack( 'N N N', $req->{index}, $req->{begin}, $req->{length} ) );
394             $blocks_inflight++;
395             }
396             else {
397             # We picked a piece but we are choked and it's not fast-allowed.
398             # We must un-pending it so others can pick it.
399             delete $torrent->blocks_pending->{ $req->{index} }{ $req->{begin} };
400             last;
401             }
402             }
403             else {
404             last;
405             }
406             }
407             }
408              
409             method is_allowed_fast ($index) {
410             return grep { $_ == $index } @allowed_fast_set;
411             }
412              
413             method _handle_request ( $index, $begin, $len ) {
414             return if $am_choking;
415              
416             # Reputation checks
417             # Do we even have this piece?
418             if ( !$torrent->bitfield->get($index) ) {
419             $self->adjust_reputation(-5);
420             return;
421             }
422              
423             # Does the peer already have this piece?
424             my $p_bf = $torrent->peer_bitfields->{$self};
425             if ( $p_bf && $p_bf->get($index) ) {
426             $self->adjust_reputation(-5);
427             return;
428             }
429             my $piece_len = $torrent->metadata->{info}{'piece length'} // 16384;
430             my $abs_offset = ( $index * $piece_len ) + $begin;
431             my $data = $torrent->storage->read_global( $abs_offset, $len );
432             if ($data) {
433             $bytes_up += length($data);
434             $protocol->send_message( 7, pack( 'N N', $index, $begin ) . $data );
435             }
436             }
437              
438             method _handle_piece_data ( $index, $begin, $data ) {
439             $self->_emit( log => " [DEBUG] Received " . length($data) . " bytes for piece $index at $begin\n", level => 'debug' ) if $debug;
440             $bytes_down += length($data);
441             $blocks_inflight--;
442             my $status = $torrent->receive_block( $self, $index, $begin, $data );
443             if ( $status == 1 ) {
444              
445             # Verified and saved
446             }
447             elsif ( $status == -1 ) {
448              
449             # Failed verification
450             }
451             $self->_request_next_block();
452             }
453              
454             method disconnected () {
455             $torrent->peer_disconnected($self) if $torrent;
456             $transport->close() if $transport;
457             $self->_emit('disconnected');
458             }
459              
460             method unchoke () {
461             $am_choking = 0;
462             $protocol->send_message(1); # UNCHOKE
463             }
464              
465             method choke () {
466             $am_choking = 1;
467             $protocol->send_message(0); # CHOKE
468             }
469              
470             method interested () {
471             $am_interested = 1;
472             $protocol->send_message(2); # INTERESTED
473             }
474              
475             method not_interested () {
476             $am_interested = 0;
477             $protocol->send_message(3); # NOT_INTERESTED
478             }
479              
480             method request ( $index, $begin, $len ) {
481             $blocks_inflight++;
482             $protocol->send_message( 6, pack( 'N N N', $index, $begin, $len ) );
483             }
484              
485             method tick () {
486              
487             # Simple moving average / decay
488             $rate_down = ( $rate_down * 0.8 ) + ( $bytes_down * 0.2 );
489             $rate_up = ( $rate_up * 0.8 ) + ( $bytes_up * 0.2 );
490             $bytes_down = 0;
491             $bytes_up = 0;
492             $transport->tick() if $transport->can('tick');
493              
494             # MSE Transition Check
495             if ( $mse && $mse->state eq 'PAYLOAD' && !$pwp_handshake_sent ) {
496             $self->_emit( log => " [DEBUG] MSE handshake complete, sending protocol handshake...\n", level => 'debug' ) if $debug;
497             $protocol->send_handshake();
498             $pwp_handshake_sent = 1;
499             }
500              
501             # Fatal Protocol Error Check
502             if ( $protocol->state eq 'CLOSED' ) {
503             $self->_emit( log => " [PEER] Fatal protocol error from $ip:$port. Disconnecting.\n", level => 'error' ) if $debug;
504             $self->disconnected();
505             return;
506             }
507             $self->write_buffer();
508             }
509              
510             method adjust_reputation ($delta) {
511             $reputation += $delta;
512             if ( $reputation <= 50 ) {
513             $self->_emit( log => " [PEER] Blacklisting peer $ip:$port due to low reputation ($reputation)\n", level => 'error' ) if $debug;
514             $self->disconnected();
515             }
516             }
517             } 1;