File Coverage

blib/lib/AnyEvent/BitTorrent.pm
Criterion Covered Total %
statement 193 621 31.0
branch 28 248 11.2
condition 13 108 12.0
subroutine 45 99 45.4
pod 10 13 76.9
total 289 1089 26.5


line stmt bran cond sub pod time code
1             package AnyEvent::BitTorrent;
2 4     4   19857 use AnyEvent;
  4         3666  
  4         86  
3 4     4   2383 use AnyEvent::Handle;
  4         53717  
  4         107  
4 4     4   1901 use AnyEvent::Socket;
  4         38936  
  4         330  
5 4     4   2168 use AnyEvent::HTTP;
  4         24011  
  4         228  
6 4     4   1665 use Moo;
  4         35028  
  4         17  
7 4     4   5948 use Type::Tiny;
  4         51310  
  4         112  
8 4     4   1794 use Types::Standard qw[ArrayRef CodeRef Enum HashRef Int Ref Str];
  4         130877  
  4         33  
9 4     4   3451 use Fcntl qw[/SEEK_/ /O_/ :flock];
  4         4  
  4         1481  
10 4     4   1632 use Digest::SHA qw[sha1];
  4         9274  
  4         229  
11 4     4   43 use File::Spec;
  4         4  
  4         65  
12 4     4   10 use File::Path;
  4         5  
  4         174  
13 4     4   1578 use Net::BitTorrent::Protocol qw[:all];
  4         44687  
  4         1496  
14 4     4   21 use Scalar::Util qw[/weak/];
  4         5  
  4         27915  
15             #
16             our $VERSION = "1.0.2";
17             #
18             # XXX - These should be ro attributes w/o init args:
19             my $block_size = 2**14;
20              
21             # Custom types
22             my $FILE = Type::Tiny->new(name => 'File',
23             parent => Str,
24             constraint => sub { -f $_ },
25             message => sub {"$_ isn't an existing file"},
26             );
27             my $RESERVED = Type::Tiny->new(name => 'Reserved',
28             parent => Str,
29             constraint => sub { length $_ == 8 },
30             message => sub {'reserved data is malformed'}
31             );
32             my $PEERID = Type::Tiny->new(
33             name => 'PeerID',
34             parent => Str,
35             constraint => sub { length $_ == 20 },
36             message => sub {
37             'Peer ID must be 20 chars in length';
38             }
39             );
40             my $INFOHASH = Type::Tiny->new(
41             name => 'Infohash',
42             parent => Str,
43             constraint => sub { length $_ == 20 },
44             message => sub {
45             'Infohashes are 20 bytes in length';
46             }
47             );
48             #
49             has port => (is => 'ro',
50             isa => Int,
51             default => sub {0},
52             writer => '_set_port'
53             );
54             has socket => (is => 'ro',
55             isa => Ref,
56             init_arg => undef,
57             predicate => '_has_socket',
58             builder => '_build_socket'
59             );
60              
61             sub _build_socket {
62 4     4   110 my $s = shift;
63             tcp_server undef, $s->port, sub {
64 0     0   0 my ($fh, $host, $port) = @_;
65 0         0 AE::log info => 'Accepted connection from %s:%d', $host, $port;
66 0 0       0 return $fh->destroy if $s->state eq 'stopped';
67             my $handle = AnyEvent::Handle->new(
68             fh => $fh,
69             on_error => sub {
70 0         0 my ($hdl, $fatal, $msg) = @_;
71              
72             # XXX - callback
73 0         0 AE::log error => 'Socket error: ' . $msg;
74 0         0 $s->_del_peer($hdl);
75             },
76             on_eof => sub {
77 0         0 my $h = shift;
78 0         0 AE::log info => 'Socket EOF';
79 0         0 $s->_del_peer($h);
80             },
81             on_read => sub {
82 0         0 AE::log debug => 'Read Socket';
83 0         0 $s->_on_read_incoming(@_);
84             }
85 0         0 );
86 0         0 $s->_add_peer($handle);
87             }, sub {
88 4     4   544 my ($fh, $thishost, $thisport) = @_;
89 4         30 $s->_set_port($thisport);
90 4         1410 AE::log info => "bound to $thishost, port $thisport";
91 4         55 };
92             }
93             has path => (is => 'ro',
94             isa => $FILE,
95             required => 1
96             );
97             has reserved => (is => 'ro',
98             builder => '_build_reserved',
99             isa => $RESERVED
100             );
101              
102             sub _build_reserved {
103 4     4   62 my $reserved = "\0" x 8;
104              
105             #vec($reserved, 5, 8) = 0x10; # Ext Protocol
106 4         10 vec($reserved, 7, 8) = 0x04; # Fast Ext
107 4         16 AE::log debug => '_build_reserved() => ' . $reserved;
108 4         25800 $reserved;
109             }
110             has peerid => (is => 'ro',
111             isa => $PEERID,
112             init_arg => undef,
113             required => 1,
114             builder => '_build_peerid'
115             );
116              
117             sub _build_peerid {
118             return pack(
119             'a20',
120             (sprintf(
121             '-AB%01d%01d%01d%1s-%7s%-5s',
122             ($VERSION =~ m[^v?(\d+)\.(\d+)\.(\d+)]),
123             ($VERSION =~ m[[^\d\.^v]] ? 'U' : 'S'),
124             (join '',
125             map {
126 4 50   4   132 ['A' .. 'Z', 'a' .. 'z', 0 .. 9, qw[- . _ ~]]->[rand(66)]
  28         321  
127             } 1 .. 7
128             ),
129             [qw[KaiLi April Aaron Sanko]]->[rand 4]
130             )
131             )
132             );
133             }
134             has bitfield => (is => 'ro',
135             lazy => 1,
136             builder => '_build_bitfield',
137             isa => Str,
138             init_arg => undef,
139             );
140 4     4   1362 sub _build_bitfield { pack 'b*', "\0" x shift->piece_count }
141              
142             sub wanted {
143 4     4 1 30 my $s = shift;
144 4         5 my $wanted = '';
145 4         5 for my $findex (0 .. $#{$s->files}) {
  4         43  
146 4         127 my $prio = !!$s->files->[$findex]{priority};
147 4         32 for my $index ($s->_file_to_range($findex)) {
148 8416   66     137883 vec($wanted, $index, 1) = $prio && !vec($s->bitfield, $index, 1);
149             }
150             }
151 4         211 AE::log debug => '->wanted() => ' . unpack 'b*', $wanted;
152 4         210 $wanted;
153             }
154              
155             sub complete {
156 0     0 1 0 my $s = shift;
157 0         0 -1 == index substr(unpack('b*', $s->wanted), 0, $s->piece_count + 1), 1;
158             }
159              
160             sub seed {
161 0     0 1 0 my $s = shift;
162 0         0 -1 == index substr(unpack('b*', $s->bitfield), 0, $s->piece_count + 1), 0;
163             }
164              
165             sub _left {
166 4     4   6 my $s = shift;
167 4         9 $s->piece_length * scalar grep {$_} split '',
  8416         5682  
168             substr unpack('b*', $s->wanted), 0, $s->piece_count + 1;
169             }
170             has $_ => (is => 'ro',
171             isa => Int,
172             default => sub {0},
173             writer => '_set_' . $_
174             ) for qw[uploaded downloaded];
175             has infohash => (is => 'ro',
176             lazy => 1,
177             builder => '_build_infohash',
178             isa => $INFOHASH,
179             init_arg => undef
180             );
181 3     3   1359 sub _build_infohash { sha1(bencode(shift->metadata->{info})) }
182             has metadata => (is => 'ro',
183             lazy_build => 1,
184             builder => '_build_metadata',
185             lazy => 1,
186             isa => HashRef,
187             init_arg => undef
188             );
189              
190             sub _build_metadata {
191 4     4   1318 my $s = shift;
192              
193             #return if ref $s ne __PACKAGE__; # Applying roles makes deep rec
194 4         148 open my $fh, '<', $s->path;
195 4         144 sysread $fh, my $raw, -s $fh;
196 4         20 my $metadata = bdecode $raw;
197             AE::log debug => sub {
198 0     0   0 require Data::Dump;
199 0         0 '_build_metadata() => ' . Data::Dump::dump($metadata);
200 4         2436 };
201 4         287 $metadata;
202             }
203 5     5 1 2980 sub name { shift->metadata->{info}{name} }
204 6312     6312 1 86724 sub pieces { shift->metadata->{info}{pieces} }
205 25271     25271 1 344731 sub piece_length { shift->metadata->{info}{'piece length'} }
206              
207             sub piece_count {
208 12635     12635 0 9430 my $s = shift;
209 12635         155043 my $count = $s->size / $s->piece_length;
210 12635 50       89893 int($count) + (($count == int $count) ? 1 : 0);
211             }
212             has basedir => (
213             is => 'ro',
214             lazy => 1,
215             isa => Str,
216             required => 1,
217             default => sub { File::Spec->rel2abs(File::Spec->curdir) },
218             trigger => sub {
219             my ($s, $n, $o) = @_;
220             $o // return;
221             $s->_clear_files; # So they can be rebuilt with the new basedir
222             }
223             );
224             has files => (is => 'ro',
225             lazy => 1,
226             builder => '_build_files',
227             isa => ArrayRef [HashRef],
228             init_arg => undef,
229             clearer => '_clear_files'
230             );
231              
232             sub _build_files {
233 4     4   1284 my $s = shift;
234             defined $s->metadata->{info}{files} ?
235             [
236             map {
237             {priority => 1,
238             fh => undef,
239             mode => 'c',
240             length => $_->{length},
241             timeout => undef,
242             path =>
243             File::Spec->rel2abs(
244 0         0 File::Spec->catfile($s->basedir, $s->name, @{$_->{path}})
  0         0  
245             )
246             }
247 0         0 } @{$s->metadata->{info}{files}}
248             ]
249             : [
250             {priority => 1,
251             fh => undef,
252             mode => 'c',
253             length => $s->metadata->{info}{length},
254 4 50       32 timeout => undef,
255             path =>
256             File::Spec->rel2abs(File::Spec->catfile($s->basedir, $s->name))
257             }
258             ];
259             }
260             has size => (is => 'ro',
261             lazy => 1,
262             builder => '_build_size',
263             isa => Int,
264             init_arg => undef
265             );
266              
267             sub _build_size {
268 4     4   1430 my $s = shift;
269 4         6 my $ret = 0;
270 4         4 $ret += $_->{length} for @{$s->files};
  4         39  
271 4         207 AE::log debug => '_build_size() => ' . $ret;
272 4         167 $ret;
273             }
274              
275             sub _open {
276 3     3   75 my ($s, $i, $m) = @_;
277             AE::log
278             trace => 'Opening file #%d (%s) for %s',
279 3         48 $i, $s->files->[$i]->{path}, $m;
280 3 50       130 return 1 if $s->files->[$i]->{mode} eq $m;
281 0 0       0 if (defined $s->files->[$i]->{fh}) {
282 0         0 AE::log trace => 'Closing %s', $s->files->[$i]->{fh};
283 0         0 flock $s->files->[$i]->{fh}, LOCK_UN;
284 0         0 close $s->files->[$i]->{fh};
285 0         0 $s->files->[$i]->{fh} = ();
286             }
287 0 0       0 if ($m eq 'r') {
    0          
    0          
288 0         0 AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
289 0 0       0 sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
290             || return;
291 0 0       0 flock($s->files->[$i]->{fh}, LOCK_SH) || return;
292 0 0       0 weaken $s unless isweak $s;
293 0         0 my $x = $i;
294             $s->files->[$x]->{timeout}
295 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
296             }
297             elsif ($m eq 'w') {
298 0         0 AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
299 0         0 my @split = File::Spec->splitdir($s->files->[$i]->{path});
300 0         0 pop @split; # File name itself
301 0         0 my $dir = File::Spec->catdir(@split);
302 0 0       0 File::Path::mkpath($dir) if !-d $dir;
303             sysopen($s->files->[$i]->{fh},
304             $s->files->[$i]->{path},
305 0 0       0 O_WRONLY | O_CREAT)
306             || return;
307 0         0 flock $s->files->[$i]->{fh}, LOCK_EX;
308             truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
309             if -s $s->files->[$i]->{fh}
310 0 0       0 != $s->files->[$i]->{length}; # XXX - pre-allocate files
311 0 0       0 weaken $s unless isweak $s;
312 0         0 my $x = $i;
313             $s->files->[$x]->{timeout}
314 0   0 0   0 = AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
315             }
316 0         0 elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
317 0         0 else {return}
318 0         0 return $s->files->[$i]->{mode} = $m;
319             }
320             has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });
321              
322             sub _cache_path {
323 0     0   0 my $s = shift;
324             File::Spec->catfile($s->basedir,
325 0 0       0 (scalar @{$s->files} == 1 ? () : $s->name),
  0         0  
326             '~ABPartFile_-'
327             . uc(substr(unpack('H*', $s->infohash), 0, 10))
328             . '.dat'
329             );
330             }
331              
332             sub _write_cache {
333 0     0   0 my ($s, $f, $o, $d) = @_;
334 0         0 my $path = $s->_cache_path;
335 0         0 AE::log
336             debug =>
337             'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
338             length($d), $path, $f, $o;
339 0         0 my @split = File::Spec->splitdir($path);
340 0         0 pop @split; # File name itself
341 0         0 my $dir = File::Spec->catdir(@split);
342 0 0       0 File::Path::mkpath($dir) if !-d $dir;
343 0 0       0 sysopen(my ($fh), $path, O_WRONLY | O_CREAT)
344             || return;
345 0         0 flock $fh, LOCK_EX;
346 0         0 my $pos = sysseek $fh, 0, SEEK_CUR;
347 0         0 my $w = syswrite $fh, $d;
348 0         0 flock $fh, LOCK_UN;
349 0         0 close $fh;
350 0         0 $s->piece_cache->{$f}{$o} = $pos;
351 0         0 AE::log debug => 'Wrote %d bytes to cache file', $w;
352 0         0 return $w;
353             }
354              
355             sub _read_cache {
356 6312     6312   6455 my ($s, $f, $o, $l) = @_;
357 6312   50     579174 $s->piece_cache->{$f} // return;
358 0   0     0 $s->piece_cache->{$f}{$o} // return;
359 0         0 my $path = $s->_cache_path;
360 0         0 AE::log
361             debug =>
362             'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
363             $l, $path, $f, $o;
364 0 0       0 sysopen(my ($fh), $path, O_RDONLY)
365             || return;
366 0         0 flock $fh, LOCK_SH;
367 0         0 sysseek $fh, $s->piece_cache->{$f}{$o}, SEEK_SET;
368 0         0 my $w = sysread $fh, my ($d), $l;
369 0         0 flock $fh, LOCK_UN;
370 0         0 close $fh;
371 0         0 return $d;
372             }
373              
374             sub _read {
375 6312     6312   27195 my ($s, $index, $offset, $length) = @_;
376 6312         9685 AE::log
377             debug =>
378             'Attempting to read %d bytes from piece %d starting at %d bytes',
379             $length, $index, $offset;
380 6312         149001 my $data = '';
381 6312         4599 my $file_index = 0;
382 6312         7110 my $total_offset = ($index * $s->piece_length) + $offset;
383             SEARCH:
384 6312         100839 while ($total_offset > $s->files->[$file_index]->{length}) {
385 0         0 $total_offset -= $s->files->[$file_index]->{length};
386 0         0 $file_index++;
387 0         0 AE::log
388             trace =>
389             'Searching for location... $total_offset = %d, $file_index = %d',
390             $total_offset, $file_index;
391             last SEARCH # XXX - return?
392 0 0       0 if not defined $s->files->[$file_index]->{length};
393             }
394 6312   33     45405 READ: while ((defined $length) && ($length > 0)) {
395             my $this_read
396             = (
397             ($total_offset + $length) >= $s->files->[$file_index]->{length})
398             ?
399 6312 100       77732 ($s->files->[$file_index]->{length} - $total_offset)
400             : $length;
401             AE::log
402             trace =>
403             'Attempting to read %d bytes from file #%d (%s), starting at %d',
404             $this_read,
405 6312         94733 $file_index, $s->files->[$file_index]->{path}, $total_offset;
406 6312 50 33     240851 if ( (!-f $s->files->[$file_index]->{path})
407             || (!$s->_open($file_index, 'r')))
408 6312   33     91329 { $data .= $s->_read_cache($file_index, $total_offset, $this_read)
409             // ("\0" x $this_read);
410 6312         15162 AE::log note => 'Failed to open file. Using null chars instead.';
411             }
412             else {
413 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
414 0         0 sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
415 0 0       0 $data .= $_data if $_data;
416 0         0 AE::log
417             trace =>
418             'Read %d bytes of data from file (%d bytes collected so far)',
419             length $_data, length $data;
420 0 0       0 weaken $s unless isweak $s;
421 0         0 my $x = $file_index;
422             $s->files->[$x]->{timeout}
423 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
424             }
425 6312         172886 $file_index++;
426 6312         5608 $length -= $this_read;
427 6312         9705 AE::log
428             trace => 'Still need to read %d bytes',
429             $length;
430 6312 50       237671 last READ if not defined $s->files->[$file_index];
431 0         0 $total_offset = 0;
432             }
433 6312         37483 AE::log trace => 'Returning %d bytes of data', length $data;
434 6312         145045 return $data;
435             }
436              
437             sub _write {
438 0     0   0 my ($s, $index, $offset, $data) = @_;
439 0         0 AE::log
440             debug =>
441             'Attempting to write %d bytes from piece %d starting at %d bytes',
442             length($data), $index, $offset;
443 0         0 my $file_index = 0;
444 0   0     0 my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
445 0         0 AE::log
446             debug => '...calculated offset == %d',
447             $total_offset;
448             SEARCH:
449 0         0 while ($total_offset > $s->files->[$file_index]->{length}) {
450 0         0 $total_offset -= $s->files->[$file_index]->{length};
451 0         0 $file_index++;
452 0         0 AE::log
453             trace =>
454             'Searching for location... $total_offset = %d, $file_index = %d',
455             $total_offset, $file_index;
456             last SEARCH # XXX - return?
457 0 0       0 if not defined $s->files->[$file_index]->{length};
458             }
459 0   0     0 WRITE: while ((defined $data) && (length $data > 0)) {
460             my $this_write
461             = (($total_offset + length $data)
462             >= $s->files->[$file_index]->{length})
463             ?
464 0 0       0 ($s->files->[$file_index]->{length} - $total_offset)
465             : length $data;
466             AE::log
467             trace =>
468             'Attempting to write %d bytes from file #%d (%s), starting at %d',
469             $this_write,
470 0         0 $file_index, $s->files->[$file_index]->{path}, $total_offset;
471 0 0       0 if ($s->files->[$file_index]->{priority} == 0) {
472 0         0 $s->_write_cache($file_index, $total_offset, substr $data, 0,
473             $this_write, '');
474 0         0 AE::log trace => 'Wrote data to cache...';
475             }
476             else {
477 0         0 $s->_open($file_index, 'w');
478 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
479 0         0 my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
480             $this_write, '';
481 0         0 AE::log
482             trace => 'Wrote %d bytes of data to file (%d bytes left)',
483             $w, length $data;
484 0 0       0 weaken $s unless isweak $s;
485 0         0 my $x = $file_index;
486             $s->files->[$x]->{timeout}
487 0   0 0   0 = AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
488             }
489 0         0 $file_index++;
490 0 0       0 last WRITE if not defined $s->files->[$file_index];
491 0         0 $total_offset = 0;
492             }
493 0         0 return length $data;
494             }
495              
496             sub hashcheck (;@) {
497 3     3 1 449 my $s = shift;
498 3 50       16 my @indexes = @_ ? @_ : (0 .. $s->piece_count);
499             AE::log trace => sub {
500 0     0   0 require Data::Dump;
501 0         0 'Hashcheck of : ' . Data::Dump::dump(\@indexes);
502 3         99 };
503 3         108 $s->bitfield; # Makes sure it's built
504 3         82 my $total_size = $s->size;
505 3         16 for my $index (@indexes) {
506 6312 50 33     17813 next if $index < 0 || $index > $s->piece_count;
507 6312 100       8259 my $piece = $s->_read($index,
508             0,
509             $index == $s->piece_count
510             ?
511             $total_size % $s->piece_length
512             : $s->piece_length
513             );
514 6312         10092 my $expected = substr($s->pieces, $index * 20, 20);
515 6312         14823501 my $reality = sha1($piece);
516 6312   66     24081 my $ok = defined($piece)
517             && ($expected eq $reality);
518 6312         16973 vec($s->{bitfield}, $index, 1) = $ok;
519             AE::log trace => sub {
520 0 0   0   0 "Validate piece #%06d %s, Expected: %s\n"
521             . " Reality: %s",
522             $index, ($ok ? 'PASS' : 'FAIL'), unpack('H*', $expected),
523             unpack('H*', $reality);
524 6312         32134 };
525 6312 100       185237 $ok ?
526             $s->_trigger_hash_pass($index)
527             : $s->_trigger_hash_fail($index);
528             }
529             }
530             has peers => (is => 'ro',
531             lazy => 1,
532             isa => HashRef,
533             clearer => '_clear_peers',
534             builder => '_build_peers'
535             );
536 4     4   1390 sub _build_peers { {} }
537              
538             sub _add_peer {
539 0     0   0 my ($s, $h) = @_;
540             $s->{peers}{+$h} = {
541             handle => $h,
542             peerid => '',
543             bitfield => (pack 'b*', "\0" x $s->piece_count),
544             remote_choked => 1,
545             remote_interested => 0,
546             remote_requests => [],
547             local_choked => 1,
548             local_interested => 0,
549             local_requests => [],
550 0     0   0 timeout => AE::timer(20, 0, sub { $s->_del_peer($h) }),
551             keepalive => AE::timer(
552             30, 120,
553             sub {
554 0     0   0 $s->_send_encrypted($h, build_keepalive());
555             }
556 0         0 ),
557              
558             # BEP06
559             local_allowed => [],
560             remote_allowed => [],
561             local_suggest => [],
562             remote_suggest => [],
563              
564             # No encryption :(
565             encryption => '?'
566             };
567             }
568              
569             sub _del_peer {
570 0     0   0 my ($s, $h) = @_;
571 0   0     0 $s->peers->{$h} // return;
572 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
573 0         0 my ($i, $o, $l) = @$req;
574 0         0 $s->working_pieces->{$i}{$o}[3] = ();
575             }
576 0         0 delete $s->peers->{$h};
577 0         0 $h->destroy;
578             }
579             my $shuffle;
580             has trackers => (is => 'ro',
581             lazy => 1,
582             builder => '_build_trackers',
583             isa => ArrayRef [HashRef],
584             init_arg => undef
585             );
586              
587             sub _build_trackers {
588 4     4   1797 my $s = shift;
589             $shuffle //= sub {
590 8     8   10 my $deck = shift; # $deck is a reference to an array
591 8 50       20 return unless @$deck; # must not be empty!
592 8         8 my $i = @$deck;
593 8         21 while (--$i) {
594 0         0 my $j = int rand($i + 1);
595 0         0 @$deck[$i, $j] = @$deck[$j, $i];
596             }
597 4   100     29 };
598             my $trackers = [
599             map {
600             {urls => $_,
601             complete => 0,
602             incomplete => 0,
603             peers => '',
604             peers6 => '',
605             announcer => undef,
606             ticker => AE::timer(
607             1,
608             15 * 60,
609             sub {
610 0 0   0   0 return if $s->state eq 'stopped';
611 0         0 $s->announce('started');
612             }
613 4         223 ),
614             failures => 0
615             }
616             } defined $s->metadata->{announce} ? [$s->metadata->{announce}]
617             : (),
618             defined $s->metadata->{'announce-list'}
619 4 50       47 ? @{$s->metadata->{'announce-list'}}
  0 50       0  
620             : ()
621             ];
622             AE::log trace => sub {
623 0     0   0 require Data::Dump;
624 0         0 '$trackers before shuffle => ' . Data::Dump::dump($trackers);
625 4         17 };
626 4         120 $shuffle->($trackers);
627 4         15 $shuffle->($_->{urls}) for @$trackers;
628             AE::log trace => sub {
629 0     0   0 require Data::Dump;
630 0         0 '$trackers after shuffle => ' . Data::Dump::dump($trackers);
631 4         42 };
632 4         157 $trackers;
633             }
634              
635             sub announce {
636 3     3 0 5 my ($s, $e) = @_;
637 3 50       10 return if $a++ > 10; # Retry attempts
638 3         55 for my $tier (@{$s->trackers}) {
  3         32  
639 4   33     3553 $tier->{announcer} //= $s->_announce_tier($e, $tier);
640             }
641             }
642              
643             sub _announce_tier {
644 4     4   8 my ($s, $e, $tier) = @_;
645 4         6 my @urls = grep {m[^https?://]} @{$tier->{urls}};
  4         18  
  4         11  
646 4 50       13 return if $tier->{failures} > 5;
647 4 50       7 return if $#{$tier->{urls}} < 0; # Empty tier?
  4         12  
648 4 50       17 return if $tier->{urls}[0] !~ m[^https?://.+];
649 4         16 local $AnyEvent::HTTP::USERAGENT
650             = 'AnyEvent::BitTorrent/' . $AnyEvent::BitTorrent::VERSION;
651             my $_url = $tier->{urls}[0] . '?info_hash=' . sub {
652 4     4   88 local $_ = shift;
653 4         18 s/([^A-Za-z0-9])/sprintf("%%%2.2X", ord($1))/ge;
  56         103  
654 4         63 $_;
655             }
656 4 50       43 ->($s->infohash)
657             . ('&peer_id=' . $s->peerid)
658             . ('&uploaded=' . $s->uploaded)
659             . ('&downloaded=' . $s->downloaded)
660             . ('&left=' . $s->_left)
661             . ('&port=' . $s->port)
662             . '&compact=1'
663             . ($e ? '&event=' . $e : '');
664 4         303 AE::log debug => 'Announce URL: ' . $_url;
665             http_get $_url, sub {
666 0     0   0 my ($body, $hdr) = @_;
667             AE::log trace => sub {
668 0         0 require Data::Dump;
669 0         0 'Announce response: ' . Data::Dump::dump($body, $hdr);
670 0         0 };
671 0         0 $tier->{announcer} = ();
672 0 0       0 if ($hdr->{Status} =~ /^2/) {
673 0         0 my $reply = bdecode($body);
674 0 0       0 if (defined $reply->{'failure reason'}) { # XXX - Callback?
675 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
676 0         0 $s->_announce_tier($e, $tier);
677 0         0 $tier->{'failure reason'} = $reply->{'failure reason'};
678 0         0 $tier->{failures}++;
679             }
680             else { # XXX - Callback?
681 0         0 $tier->{failures} = $tier->{'failure reason'} = 0;
682             $tier->{peers}
683             = compact_ipv4(
684             uncompact_ipv4($tier->{peers} . $reply->{peers}))
685 0 0       0 if $reply->{peers};
686             $tier->{peers6}
687             = compact_ipv6(
688             uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
689 0 0       0 if $reply->{peers6};
690 0         0 $tier->{complete} = $reply->{complete};
691 0         0 $tier->{incomplete} = $reply->{incomplete};
692             $tier->{ticker} = AE::timer(
693             $reply->{interval} // (15 * 60),
694             $reply->{interval} // (15 * 60),
695             sub {
696 0 0       0 return if $s->state eq 'stopped';
697 0         0 $s->_announce_tier($e, $tier);
698             }
699 0   0     0 );
      0        
700             }
701             }
702             else { # XXX - Callback?
703 0         0 $tier->{'failure reason'}
704             = "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
705 0         0 $tier->{failures}++;
706 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
707 0         0 $s->_announce_tier($e, $tier);
708             }
709             }
710 4         189 }
711             has _choke_timer => (
712             is => 'bare',
713             isa => Ref,
714             init_arg => undef,
715             required => 1,
716             default => sub {
717             my $s = shift;
718             AE::timer(
719             15, 45,
720             sub {
721             return if $s->state ne 'active';
722             AE::log trace => 'Choke timer...';
723             my @interested
724             = grep { $_->{remote_interested} && $_->{remote_choked} }
725             values %{$s->peers};
726              
727             # XXX - Limit the number of upload slots
728             for my $p (@interested) {
729             $p->{remote_choked} = 0;
730             $s->_send_encrypted($p->{handle}, build_unchoke());
731             AE::log trace => 'Choked %s', $p->{peerid};
732             }
733              
734             # XXX - Send choke to random peer
735             }
736             );
737             }
738             );
739             has _fill_requests_timer => (
740             is => 'bare',
741             isa => Ref,
742             init_arg => undef,
743             required => 1,
744             default => sub {
745             my $s = shift;
746             AE::timer(
747             15, 10,
748             sub { # XXX - Limit by time/bandwidth
749             return if $s->state ne 'active';
750             AE::log trace => 'Request fill timer...';
751             my @waiting
752             = grep { defined && scalar @{$_->{remote_requests}} }
753             values %{$s->peers};
754             return if !@waiting;
755             my $total_sent = 0;
756             while (@waiting && $total_sent < 2**20) {
757             my $p = splice(@waiting, rand @waiting, 1, ());
758             AE::log trace => 'Chosen peer: %s...', $p->{peerid};
759             while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
760             my $req = shift @{$p->{remote_requests}};
761             AE::log
762             trace =>
763             'Filling request i:%d, o:%d, l:%d for %s',
764             @$req;
765              
766             # XXX - If piece is bad locally
767             # if remote supports fast ext
768             # send reject
769             # else
770             # simply return
771             # else...
772             $s->_send_encrypted(
773             $p->{handle},
774             build_piece(
775             $req->[0], $req->[1],
776             $s->_read($req->[0], $req->[1], $req->[2])
777             )
778             );
779             $total_sent += $req->[2];
780             }
781             }
782             $s->_set_uploaded($s->uploaded + $total_sent);
783             }
784             );
785             }
786             );
787             has _peer_timer => (is => 'ro',
788             lazy => 1,
789             isa => Ref,
790             init_arg => undef,
791             clearer => '_clear_peer_timer',
792             builder => '_build_peer_timer'
793             );
794              
795             sub _build_peer_timer {
796 4     4   1368 my $s = shift;
797             AE::timer(
798             1, 15,
799             sub {
800 0 0   0   0 return if !$s->_left;
801 0         0 AE::log trace => 'Attempting to connect to new peer...';
802              
803             # XXX - Initiate connections when we are in Super seed mode?
804             my @cache = map {
805             $_->{peers} ? uncompact_ipv4($_->{peers}) : (),
806             $_->{peers6} ?
807             uncompact_ipv6($_->{peers6})
808 0 0       0 : ()
    0          
809 0         0 } @{$s->trackers};
  0         0  
810 0 0       0 return if !@cache;
811 0         0 for my $i (1 .. @cache) {
812 0 0       0 last if $i > 10; # XXX - Max half open
813             last
814 0 0       0 if scalar(keys %{$s->peers}) > 100; # XXX - Max peers
  0         0  
815 0         0 my $addr = splice @cache, rand $#cache, 1;
816 0         0 $s->_new_peer($addr);
817             }
818             }
819 4         74 );
820             }
821              
822             sub _new_peer {
823 0     0   0 my ($s, $addr) = @_;
824 0         0 AE::log trace => 'Connecting to %s:%d', @$addr;
825 0         0 my $handle;
826             $handle = AnyEvent::Handle->new(
827             connect => $addr,
828 0     0   0 on_prepare => sub {60},
829             on_error => sub {
830 0     0   0 my ($hdl, $fatal, $msg) = @_;
831              
832             # XXX - callback
833 0         0 AE::log
834             error => 'Socket error: %s (Removing peer)',
835             $msg;
836 0         0 $s->_del_peer($hdl);
837             },
838             on_connect_error => sub {
839 0     0   0 my ($hdl, $fatal, $msg) = @_;
840 0         0 $s->_del_peer($hdl);
841              
842             # XXX - callback
843 0 0 0     0 AE::log
844             error => sprintf "%sfatal error (%s)\n",
845             $fatal ? '' : 'non-',
846             $msg // 'Connection timed out';
847 0 0       0 return if !$fatal;
848             },
849             on_connect => sub {
850 0     0   0 my ($h, $host, $port, $retry) = @_;
851 0         0 AE::log
852             trace => 'Connection established with %s:%d',
853             $host, $port;
854 0         0 $s->_add_peer($handle);
855 0         0 $s->_send_handshake($handle);
856             },
857             on_eof => sub {
858 0     0   0 my $h = shift;
859 0         0 AE::log trace => 'EOF from peer';
860 0         0 $s->_del_peer($h);
861             },
862             on_read => sub {
863 0     0   0 $s->_on_read(@_);
864             }
865 0         0 );
866 0         0 return $handle;
867             }
868              
869             sub _on_read_incoming {
870 0     0   0 my ($s, $h) = @_;
871 0   0     0 $h->rbuf // return;
872              
873             # XXX - Handle things if the stream is encrypted
874 0         0 my $packet = parse_packet(\$h->rbuf);
875 0 0       0 return if !$packet;
876             AE::log trace => sub {
877 0     0   0 require Data::Dump;
878 0         0 'Incoming packet: ' . Data::Dump::dump($packet);
879 0         0 };
880 0 0       0 if (defined $packet->{error}) {
    0          
881 0         0 return $s->_del_peer($h);
882             }
883             elsif ($packet->{type} == $HANDSHAKE) {
884 0   0     0 ref $packet->{payload} // return;
885 0 0       0 return if ref $packet->{payload} ne 'ARRAY';
886 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
887             return $s->_del_peer($h)
888 0 0       0 if $packet->{payload}[1] ne $s->infohash;
889 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
890 0         0 $s->_send_handshake($h);
891 0         0 $s->_send_bitfield($h);
892             $s->peers->{$h}{timeout}
893 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
894 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
895 0     0   0 $h->on_read(sub { $s->_on_read(@_) });
  0         0  
896             }
897             else { # This should never happen
898             }
899 0         0 1;
900             }
901              
902             sub _on_read {
903 0     0   0 my ($s, $h) = @_;
904 0         0 while (my $packet = parse_packet(\$h->rbuf)) {
905 0 0       0 last if !$packet;
906             AE::log debug => sub {
907 0     0   0 require Data::Dump;
908 0         0 'Incoming packet: ' . Data::Dump::dump($packet->{error});
909 0         0 };
910 0 0       0 if (defined $packet->{error}) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
911 0         0 $s->_del_peer($h);
912 0         0 return;
913             }
914             elsif ($packet->{type} eq $KEEPALIVE) {
915              
916             # Do nothing!
917             }
918             elsif ($packet->{type} == $HANDSHAKE) {
919 0   0     0 ref $packet->{payload} // return;
920 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
921             return $s->_del_peer($h)
922 0 0       0 if $packet->{payload}[1] ne $s->infohash;
923 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
924 0         0 $s->_send_bitfield($h);
925             $s->peers->{$h}{timeout}
926 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
927 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
928             }
929             elsif ($packet->{type} == $INTERESTED) {
930 0         0 $s->peers->{$h}{remote_interested} = 1;
931             }
932             elsif ($packet->{type} == $NOT_INTERESTED) {
933 0         0 $s->peers->{$h}{remote_interested} = 0;
934              
935             # XXX - Clear any requests in queue
936             # XXX - Send choke just to be sure
937             }
938             elsif ($packet->{type} == $CHOKE) {
939 0         0 $s->peers->{$h}{local_choked} = 1;
940 0 0       0 if (!(vec($s->peers->{$h}{reserved}, 7, 1) & 0x04)) {
941 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
942             $s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
943             unless
944 0 0       0 defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
945             }
946             }
947 0         0 $s->_consider_peer($s->peers->{$h});
948             }
949             elsif ($packet->{type} == $UNCHOKE) {
950 0         0 $s->peers->{$h}{local_choked} = 0;
951             $s->peers->{$h}{timeout}
952 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
953 0         0 $s->_request_pieces($s->peers->{$h});
954             }
955             elsif ($packet->{type} == $HAVE) {
956 0         0 vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
957 0         0 $s->_consider_peer($s->peers->{$h});
958             $s->peers->{$h}{timeout}
959 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
960             }
961             elsif ($packet->{type} == $BITFIELD) {
962 0         0 $s->peers->{$h}{bitfield} = $packet->{payload};
963 0         0 $s->_consider_peer($s->peers->{$h});
964             }
965             elsif ($packet->{type} == $REQUEST) {
966             $s->peers->{$h}{timeout}
967 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
968              
969             # XXX - Make sure (index + offset + length) < $s->size
970             # if not, send reject if remote supports fast ext
971             # either way, ignore the request
972 0         0 push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
  0         0  
973             }
974             elsif ($packet->{type} == $PIECE) {
975             $s->peers->{$h}{timeout}
976 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
977 0         0 my ($index, $offset, $data) = @{$packet->{payload}};
  0         0  
978              
979             # Make sure $index is a working piece
980 0   0     0 $s->working_pieces->{$index} // return;
981              
982             # Make sure we req from this peer
983             return
984             if !grep {
985 0 0 0     0 $_->[0] == $index
986             && $_->[1] == $offset
987             && $_->[2] == length $data
988 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
989             $s->peers->{$h}{local_requests} = [
990             grep {
991 0 0 0     0 ($_->[0] != $index)
992             || ($_->[1] != $offset)
993             || ($_->[2] != length($data))
994 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
995             ];
996 0         0 $s->working_pieces->{$index}{$offset}[4] = $data;
997 0         0 $s->working_pieces->{$index}{$offset}[5] = ();
998 0         0 $s->_set_downloaded($s->downloaded + length $data);
999 0 0       0 if (0 == scalar grep { !defined $_->[4] }
  0         0  
1000 0         0 values %{$s->working_pieces->{$index}})
1001             { my $piece = join '',
1002 0         0 map { $s->working_pieces->{$index}{$_}[4] }
1003 0         0 sort { $a <=> $b }
1004 0         0 keys %{$s->working_pieces->{$index}};
  0         0  
1005 0 0       0 if ((substr($s->pieces, $index * 20, 20) eq sha1($piece))) {
1006 0         0 for my $attempt (1 .. 5) { # XXX = 5 == failure callback
1007 0 0       0 last unless $s->_write($index, 0, $piece);
1008             }
1009 0         0 vec($s->{bitfield}, $index, 1) = 1;
1010             $s->_broadcast(
1011             build_have($index),
1012             sub {
1013 0     0   0 !!!index substr(unpack('b*', $_->{bitfield}),
1014             0, $s->piece_count + 1),
1015             0, 0;
1016             }
1017 0         0 );
1018             $s->announce('complete')
1019 0 0       0 if !scalar grep {$_} split '',
  0         0  
1020             substr unpack('b*', ~$s->bitfield), 0,
1021             $s->piece_count + 1;
1022             $s->_consider_peer($_)
1023 0         0 for grep { $_->{local_interested} }
  0         0  
1024 0         0 values %{$s->peers};
1025 0         0 $s->_trigger_hash_pass($index);
1026             }
1027             else {
1028 0         0 $s->_trigger_hash_fail($index);
1029              
1030             # XXX - Not sure what to do... I'd
1031             # ban the peers involved and
1032             # try the same piece again.
1033             }
1034 0         0 delete $s->working_pieces->{$index};
1035             }
1036 0         0 $s->_request_pieces($s->peers->{$h});
1037             }
1038             elsif ($packet->{type} == $CANCEL) {
1039 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1040             return # XXX - error callback if this block is not in the queue
1041             if !grep {
1042 0 0 0     0 $_->[0] == $index
1043             && $_->[1] == $offset
1044             && $_->[2] == $length
1045 0 0       0 } @{$s->peers->{$h}{remote_requests}};
  0         0  
1046             $s->peers->{$h}{remote_requests} = [
1047             grep {
1048 0 0 0     0 ($_->[0] != $index)
1049             || ($_->[1] != $offset)
1050             || ($_->[2] != $length)
1051 0         0 } @{$s->peers->{$h}{remote_requests}}
  0         0  
1052             ];
1053             }
1054             elsif ($packet->{type} == $SUGGEST) {
1055 0         0 push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
  0         0  
1056             }
1057             elsif ($packet->{type} == $HAVE_ALL) {
1058 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
1059 0         0 $s->_consider_peer($s->peers->{$h});
1060             $s->peers->{$h}{timeout}
1061 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
1062             }
1063             elsif ($packet->{type} == $HAVE_NONE) {
1064 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
1065             $s->peers->{$h}{timeout}
1066 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1067             }
1068             elsif ($packet->{type} == $REJECT) {
1069 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1070             return # XXX - error callback if this block is not in the queue
1071             if !grep {
1072 0 0 0     0 $_->[0] == $index
1073             && $_->[1] == $offset
1074             && $_->[2] == $length
1075 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
1076 0         0 $s->working_pieces->{$index}{$offset}->[3] = ();
1077             $s->peers->{$h}{local_requests} = [
1078             grep {
1079 0 0 0     0 ($_->[0] != $index)
1080             || ($_->[1] != $offset)
1081             || ($_->[2] != $length)
1082 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
1083             ];
1084             $s->peers->{$h}{timeout}
1085 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1086             }
1087             elsif ($packet->{type} == $ALLOWED_FAST) {
1088 0         0 push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
  0         0  
1089             }
1090             else {
1091             # use Data::Dump qw[pp];
1092             # die 'Unhandled packet: ' . pp $packet;
1093             }
1094             last
1095 0 0 0     0 if 5 > length($h->rbuf // ''); # Min size for protocol
1096             }
1097             }
1098              
1099             sub _send_bitfield {
1100 0     0   0 my ($s, $h) = @_;
1101 0 0       0 if (vec($s->peers->{$h}{reserved}, 7, 1) & 0x04) {
1102 0 0       0 if ($s->seed) {
    0          
1103 0         0 return $s->_send_encrypted($h, build_haveall());
1104             }
1105             elsif ($s->bitfield() !~ m[[^\0]]) {
1106 0         0 return $s->_send_encrypted($h, build_havenone());
1107             }
1108             }
1109              
1110             # XXX - If it's cheaper to send HAVE packets than a full BITFIELD, do it
1111 0         0 $s->_send_encrypted($h, build_bitfield($s->bitfield));
1112             }
1113              
1114             sub _broadcast {
1115 0     0   0 my ($s, $data, $qualifier) = @_;
1116 0   0 0   0 $qualifier //= sub {1};
  0         0  
1117             $s->_send_encrypted($_->{handle}, $data)
1118 0         0 for grep { $qualifier->() } values %{$s->peers};
  0         0  
  0         0  
1119             }
1120              
1121             sub _consider_peer { # Figure out whether or not we find a peer interesting
1122 0     0   0 my ($s, $p) = @_;
1123 0 0       0 return if $s->state ne 'active';
1124 0 0       0 return if $s->complete;
1125 0         0 my $relevence = $p->{bitfield} & $s->wanted;
1126 0 0       0 my $interesting
1127             = (
1128             index(substr(unpack('b*', $relevence), 0, $s->piece_count + 1), 1, 0)
1129             != -1) ? 1 : 0;
1130 0 0       0 if ($interesting) {
1131 0 0       0 if (!$p->{local_interested}) {
1132 0         0 $p->{local_interested} = 1;
1133 0         0 $s->_send_encrypted($p->{handle}, build_interested());
1134             }
1135             }
1136             else {
1137 0 0       0 if ($p->{local_interested}) {
1138 0         0 $p->{local_interested} = 0;
1139 0         0 $s->_send_encrypted($p->{handle}, build_not_interested());
1140             }
1141             }
1142             }
1143             has working_pieces => (is => 'ro',
1144             lazy => 1,
1145             isa => HashRef,
1146             init_arg => undef,
1147             default => sub { {} }
1148             );
1149              
1150             sub _file_to_range {
1151 4     4   5 my ($s, $file) = @_;
1152 4         5 my $start = 0;
1153 4         12 for (0 .. $file - 1) {
1154 0         0 $start += $s->files->[$_]->{length};
1155             }
1156 4         51 my $end = $start + $s->files->[$file]->{length};
1157 4         25 $start = $start / $s->piece_length;
1158 4         25 $end = $end / $s->piece_length;
1159 4 50       193 (int($start) .. int $end + ($end != int($end) ? 0 : +1));
1160             }
1161              
1162             sub _request_pieces {
1163 0     0   0 my ($s, $p) = @_;
1164 0 0       0 return if $s->state ne 'active';
1165 0 0       0 weaken $p unless isweak $p;
1166 0   0     0 $p // return;
1167 0   0     0 $p->{handle} // return;
1168 0         0 my @indexes;
1169 0 0       0 if (scalar keys %{$s->working_pieces} < 10) { # XXX - Max working pieces
  0         0  
1170 0         0 for my $findex (0 .. $#{$s->files}) {
  0         0  
1171 0         0 for my $index ($s->_file_to_range($findex)) {
1172             next
1173 0 0 0     0 if !(vec($p->{bitfield}, $index, 1)
1174             && !vec($s->bitfield, $index, 1));
1175             push @indexes,
1176 0         0 map {$index} 1 .. $s->{files}[$findex]{priority};
  0         0  
1177             }
1178             }
1179             }
1180             else {
1181 0         0 @indexes = keys %{$s->working_pieces};
  0         0  
1182             }
1183 0 0       0 return if !@indexes;
1184 0         0 my $index = $indexes[rand @indexes]; # XXX - Weighted random/Rarest first
1185 0 0       0 my $piece_size
1186             = $index == $s->piece_count ?
1187             $s->size % $s->piece_length
1188             : $s->piece_length;
1189 0         0 my $block_count = $piece_size / $block_size;
1190 0 0       0 my @offsets = map { $_ * $block_size }
  0         0  
1191             0 .. $block_count - ((int($block_count) == $block_count) ? 1 : 0);
1192 0   0     0 $s->working_pieces->{$index} //= {map { $_ => {} } @offsets};
  0         0  
1193 0         0 my @unrequested = sort { $a <=> $b }
1194             grep { # XXX - If there are no unrequested blocks, pick a new index
1195 0         0 (!ref $s->working_pieces->{$index}{$_})
1196             || ( (!defined $s->working_pieces->{$index}{$_}[4])
1197 0   0     0 && (!defined $s->working_pieces->{$index}{$_}[3]))
1198             } @offsets;
1199             my @unfilled_local_requests
1200 0         0 = grep { !defined $_->[4] } @{$p->{local_requests}};
  0         0  
  0         0  
1201 0         0 for (scalar @unfilled_local_requests .. 12) {
1202 0         0 my $offset = shift @unrequested;
1203 0   0     0 $offset // return; # XXX - Start working on another piece
1204 0 0 0     0 my $_block_size
1205             = ($offset == $offsets[-1]) ?
1206             ($piece_size % $block_size) || $block_size
1207             : $block_size;
1208 0 0       0 next if !$_block_size;
1209              
1210             # XXX - Limit to x req per peer (future: based on bandwidth)
1211 0         0 AE::log
1212             trace => 'Requesting %d, %d, %d',
1213             $index, $offset, $_block_size;
1214             $s->_send_encrypted($p->{handle},
1215 0         0 build_request($index, $offset, $_block_size))
1216             ; # XXX - len for last piece
1217             $s->working_pieces->{$index}{$offset} = [
1218             $index, $offset,
1219             $_block_size,
1220             $p, undef,
1221             AE::timer(
1222             60, 0,
1223             sub {
1224 0   0 0   0 $p // return;
1225 0   0     0 $p->{handle} // return;
1226             $s->_send_encrypted($p->{handle},
1227 0         0 build_cancel($index, $offset, $_block_size));
1228 0         0 $s->working_pieces->{$index}{$offset}[3] = ();
1229             $p->{local_requests} = [
1230             grep {
1231 0 0 0     0 $_->[0] != $index
1232             || $_->[1] != $offset
1233             || $_->[2] != $_block_size
1234 0         0 } @{$p->{local_requests}}
  0         0  
1235             ];
1236             $p->{timeout} = AE::timer(45, 0,
1237 0         0 sub { $s->_del_peer($p->{handle}) });
  0         0  
1238              
1239             #$s->_request_pieces( $p) # XXX - Ask a different peer
1240             }
1241             )
1242 0         0 ];
1243             weaken($s->working_pieces->{$index}{$offset}[3])
1244 0 0       0 unless isweak($s->working_pieces->{$index}{$offset}[3]);
1245 0         0 push @{$p->{local_requests}}, [$index, $offset, $_block_size];
  0         0  
1246             }
1247             }
1248              
1249             # Cheap callback system
1250             has on_hash_pass => (
1251             is => 'rw',
1252             isa => CodeRef,
1253             default => sub {
1254             sub { !!1 }
1255             },
1256             clearer => '_no_hash_pass'
1257             );
1258 9     9   141 sub _trigger_hash_pass { shift->on_hash_pass()->(@_) }
1259             has on_hash_fail => (
1260             is => 'rw',
1261             isa => CodeRef,
1262             default => sub {
1263             sub { !!1 }
1264             },
1265             clearer => '_no_hash_fail'
1266             );
1267 6303     6303   111234 sub _trigger_hash_fail { shift->on_hash_fail()->(@_) }
1268             #
1269             has state => (is => 'ro',
1270             isa => Enum [qw[active stopped paused]],
1271             writer => '_set_state',
1272             default => sub {'active'}
1273             );
1274              
1275             sub stop {
1276 7     7 1 3397 my $s = shift;
1277 7         22 AE::log debug => 'Stopping...';
1278 7 100       223 return if $s->state eq 'stopped';
1279 3         8 AE::log trace => 'Announcing "stopped" event to trackers...';
1280 3         73 $s->announce('stopped');
1281 3         7509 AE::log trace => 'Disconnecting peers...';
1282 3         99 $s->_clear_peers;
1283 3         1038 AE::log trace => 'Stopping new peers ticker...';
1284 3         92 $s->_clear_peer_timer;
1285 3         843 AE::log trace => 'Closing files...';
1286 3         88 $s->_open($_, 'c') for 0 .. $#{$s->files};
  3         55  
1287 3         29 AE::log trace => 'Setting internal status...';
1288 3         113 $s->_set_state('stopped');
1289             }
1290              
1291             sub start {
1292 4     4 1 4 my $s = shift;
1293 4         7 AE::log debug => 'Starting...';
1294 4 50       110 $s->announce('started') unless $s->state eq 'active';
1295 4         28 $s->peers;
1296 4         60 AE::log trace => 'Starting new peers ticker...';
1297 4         123 $s->_peer_timer;
1298 4         64 AE::log trace => 'Setting internal status...';
1299 4         123 $s->_set_state('active');
1300             }
1301              
1302             sub pause {
1303 0     0 1 0 my $s = shift;
1304 0         0 AE::log debug => 'Pausing...';
1305 0         0 $s->peers;
1306 0         0 AE::log trace => 'Starting new peers ticker...';
1307 0         0 $s->_peer_timer;
1308 0         0 AE::log trace => 'Setting internal status...';
1309 0         0 $s->_set_state('paused');
1310             }
1311             #
1312             sub BUILD {
1313 4     4 0 84 my ($s, $a) = @_;
1314 4         10 AE::log debug => 'BUILD()';
1315 4 50 33     129 $s->start && AE::log debug => 'Calling ->start()'
1316             if $s->state eq 'active';
1317 4 50 0     1494 $s->paused && AE::log debug => 'Calling ->paused() '
1318             if $s->state eq 'paused';
1319             }
1320              
1321             # Testing stuff goes here
1322             sub _send_encrypted {
1323 0     0     my ($s, $h, $packet) = @_;
1324 0 0         return if !$h; # XXX - $s->_del_peer($p->{handle})
1325             AE::log trace => sub {
1326 0     0     require Data::Dump;
1327 0           'Outgoing packet: ' . Data::Dump::dump($packet);
1328 0           };
1329 0           return $h->push_write($packet);
1330             }
1331              
1332             sub _send_handshake {
1333 0     0     my ($s, $h) = @_;
1334 0           AE::log trace => 'Outgoing handshake';
1335 0           $h->push_write(build_handshake($s->reserved, $s->infohash, $s->peerid));
1336             }
1337             1337;
1338              
1339             =pod
1340              
1341             =head1 NAME
1342              
1343             AnyEvent::BitTorrent - Yet Another BitTorrent Client Module
1344              
1345             =head1 Synopsis
1346              
1347             use AnyEvent::BitTorrent;
1348             my $client = AnyEvent::BitTorrent->new( path => 'some.torrent' );
1349             AE::cv->recv;
1350              
1351             =head1 Description
1352              
1353             This is a painfully simple BitTorrent client written on a whim that implements
1354             the absolute basics. For a full list of what's currently supported, what you
1355             will likely find in a future version, and what you'll never get from this, see
1356             the section entitled "L"
1357              
1358             =head1 Methods
1359              
1360             The API, much like the module itself, is simple.
1361              
1362             Anything you find by skimming the source is likely not ready for public use
1363             and will be subject to change before C. Here's the public interface as
1364             of this version:
1365              
1366             =head2 C
1367              
1368             my $c = AnyEvent::BitTorrent->new(
1369             path => 'some/legal.torrent',
1370             basedir => './storage/',
1371             port => 6881,
1372             on_hash_pass => sub { ... },
1373             on_hash_fail => sub { ... },
1374             state => 'stopped',
1375             piece_cache => $quick_restore
1376             );
1377              
1378             This constructor understands the following arguments:
1379              
1380             =over
1381              
1382             =item C
1383              
1384             This is the only required parameter. It's the path to a valid .torrent file.
1385              
1386             =item C
1387              
1388             This is the base directory all data will be stored in and/or read from.
1389             Multifile torrents will create another directory below this to store all
1390             files.
1391              
1392             By default, this is the current working directory when
1393             L|/"new( ... )"> is called.
1394              
1395             =item C
1396              
1397             This is the preferred port local host binds and expects incoming peers to
1398             connect to.
1399              
1400             By default, this is a zero; the system will pick a port number randomly.
1401              
1402             =item C
1403              
1404             This is a subroutine called whenever a piece fails to pass
1405             L. The callback is handed the piece's index.
1406              
1407             =item C
1408              
1409             This is a subroutine called whenever a piece passes its
1410             L. The callback is handed the piece's index.
1411              
1412             =item C
1413              
1414             This must be one of the following:
1415              
1416             =over
1417              
1418             =item C
1419              
1420             This is the default. The client will attempt to create new connections, make
1421             and fill requests, etc. This is normal client behavior.
1422              
1423             =item C
1424              
1425             In this state, connections will be made and accepted but no piece requests
1426             will be made or filled. To resume full, normal behavior, you must call
1427             L|/"start( )">.
1428              
1429             =item C
1430              
1431             Everything is put on hold. No new outgoing connections are attempted and
1432             incoming connections are rejected. To resume full, normal behavior, you must
1433             call L|/"start( )">.
1434              
1435             =back
1436              
1437             =item C
1438              
1439             This is the index list returned by L|/"piece_cache( )"> in a
1440             previous instance. Using this should make a complete resume system a trivial
1441             task.
1442              
1443             =back
1444              
1445             =head2 C
1446              
1447             This method expects...
1448              
1449             =over
1450              
1451             =item ...a list of integers. You could use this to check a range of pieces (a
1452             single file, for example).
1453              
1454             $client->hashcheck( 1 .. 5, 34 .. 56 );
1455              
1456             =item ...a single integer. Only that specific piece is checked.
1457              
1458             $client->hashcheck( 17 );
1459              
1460             =item ...nothing. All data related to this torrent will be checked.
1461              
1462             $client->hashcheck( );
1463              
1464             =back
1465              
1466             As pieces pass or fail, your C and C callbacks are
1467             triggered.
1468              
1469             =head2 C
1470              
1471             Sends a 'started' event to trackers and starts performing as a client is
1472             expected. New connections are made and accepted, requests are made and filled,
1473             etc.
1474              
1475             =head2 C
1476              
1477             Sends a stopped event to trackers, closes all connections, stops attempting
1478             new outgoing connections, rejects incoming connections and closes all open
1479             files.
1480              
1481             =head2 C
1482              
1483             The client remains mostly active; new connections will be made and accepted,
1484             etc. but no requests will be made or filled while the client is paused.
1485              
1486             =head2 C
1487              
1488             Returns the 20-byte SHA1 hash of the value of the info key from the metadata
1489             file.
1490              
1491             =head2 C
1492              
1493             Returns the 20 byte string used to identify the client. Please see the
1494             L below.
1495              
1496             =head2 C
1497              
1498             Returns the port number the client is listening on.
1499              
1500             =head2 C
1501              
1502             Returns the total size of all L described in the torrent's
1503             metadata.
1504              
1505             =head2 C
1506              
1507             Returns the UTF-8 encoded string the metadata suggests we save the file (or
1508             directory, in the case of multi-file torrents) under.
1509              
1510             =head2 C
1511              
1512             Returns the total amount uploaded to remote peers.
1513              
1514             =head2 C
1515              
1516             Returns the total amount downloaded from other peers.
1517              
1518             =head2 C
1519              
1520             Returns the approximate amount based on the pieces we still
1521             L multiplied by the L.
1522              
1523             =head2 C
1524              
1525             Returns the number of bytes in each piece the file or files are split into.
1526             For the purposes of transfer, files are split into fixed-size pieces which are
1527             all the same length except for possibly the last one which may be truncated.
1528              
1529             =head2 C
1530              
1531             Returns a packed binary string in ascending order (ready for C). Each
1532             index that the client has is set to one and the rest are set to zero.
1533              
1534             =head2 C
1535              
1536             Returns a packed binary string in ascending order (ready for C). Each
1537             index that the client has or simply does not want is set to zero and the rest
1538             are set to one.
1539              
1540             This value is calculated every time the method is called. Keep that in mind.
1541              
1542             =head2 C
1543              
1544             Returns true if we have downloaded everything we L which
1545             is not to say that we have all data and can L.
1546              
1547             =head2 C
1548              
1549             Returns true if we have all data related to the torrent.
1550              
1551             =head2 C
1552              
1553             Returns a list of hash references with the following keys:
1554              
1555             =over
1556              
1557             =item C
1558              
1559             Which is the size of file in bytes.
1560              
1561             =item C
1562              
1563             Which is the absolute path of the file.
1564              
1565             =item C
1566              
1567             Download priority for this file. By default, all files have a priority of
1568             C<1>. There is no built in scale; the higher the priority, the better odds a
1569             piece from it will be downloaded first. Setting a file's priority to C<1000>
1570             while the rest are still at C<1> will likely force the file to complete before
1571             any other file is started.
1572              
1573             We do not download files with a priority of zero.
1574              
1575             =back
1576              
1577             =head2 C
1578              
1579             Returns the list of currently connected peers. The organization of these peers
1580             is not yet final so... don't write anything you don't expect to break before
1581             we hit C.
1582              
1583             =head2 C
1584              
1585             Returns C if the client is L, C if client
1586             is L, and C if the client is currently
1587             L.
1588              
1589             =head2 C
1590              
1591             Pieces which overlap files with zero priority are stored in a part file which
1592             is indexed internally. To save this index (for resume, etc.) store the values
1593             returned by this method and pass it to L.
1594              
1595             =head2 C
1596              
1597             Returns a list of hashes, each representing a single tier of trackers as
1598             defined by L. The hashes contain the
1599             following keys:
1600              
1601             =over
1602              
1603             =item C
1604              
1605             The is a count of complete peers (seeds) as returned by the most recent
1606             announce.
1607              
1608             =item C
1609              
1610             This is a running total of the number of failed announces we've had in a row.
1611             This value is reset when we have a successful announce.
1612              
1613             =item C
1614              
1615             The is a count of incomplete peers (leechers) as returned by the most recent
1616             announce.
1617              
1618             =item C
1619              
1620             Which is a compact collection of IPv4 peers returned by the tracker. See
1621             L.
1622              
1623             =item C
1624              
1625             Which is a compact collection of IPv6 peers returned by the tracker. See
1626             L.
1627              
1628             =item C
1629              
1630             Which is a list of URLs.
1631              
1632             =back
1633              
1634             =head1 This Module is Lame!
1635              
1636             Yeah, I said it.
1637              
1638             There are a few things a BitTorrent client must implement (to some degree) in
1639             order to interact with other clients in a modern day swarm.
1640             L is meant to meet that bare
1641             minimum but it's based on L so you could always subclass it to add more
1642             advanced functionality. Hint, hint!
1643              
1644             =head2 What is currently supported?
1645              
1646             Basic stuff. We can make and handle piece requests. Deal with cancels,
1647             disconnect idle peers, unchoke folks, fast extensions, file download
1648             priorities. Normal... stuff. HTTP trackers.
1649              
1650             =head2 What will probably be supported in the future?
1651              
1652             DHT (which will likely be in a separate dist), IPv6 stuff... I'll get around
1653             to those.
1654              
1655             Long term, UDP trackers may be supported.
1656              
1657             For a detailed list, see the TODO file included with this distribution.
1658              
1659             =head2 What will likely never be supported?
1660              
1661             We can't have nice things. Protocol encryption, uTP, endgame tricks, ...these
1662             will probably never be included in L.
1663              
1664             =head2 What should I use instead?
1665              
1666             If you're reading all of this with a scowl, there are many alternatives to
1667             this module, most of which are sure to be better suited for advanced users. I
1668             suggest (in no particular order):
1669              
1670             =over
1671              
1672             =item L. It's written in Perl but you'll
1673             still need to be on a Linux, *BSD, et al. system to use it.
1674              
1675             =item L ...in the future. I I suggest using either
1676             the current stable or unstable versions found on CPAN. The next version is
1677             being worked on and will be based on L.
1678              
1679             =back
1680              
1681             If you're working on a Perl based client and would like me to link to it, send
1682             a bug report to the tracker L.
1683              
1684             =head1 Subclassing AnyEvent::BitTorrent
1685              
1686             TODO
1687              
1688             If you subclass this module and change the way it functions to that which in
1689             any way proves harmful to individual peers or the swarm at large, rather than
1690             damage L's reputation, override the peerid attribute.
1691             Thanks.
1692              
1693             =head1 PeerID Specification
1694              
1695             L may be identified in a swarm by its peer id. As of
1696             this version, our peer id is in 'Azureus style' with a single digit for the
1697             Major version, two digits for the minor version, and a single character to
1698             indicate stability (stable releases marked with C, unstable releases marked
1699             with C). It looks sorta like:
1700              
1701             -AB110S- Stable v1.10.0 relese (typically found on CPAN, tagged in repo)
1702             -AB110U- Unstable v1.10.X release (private builds, early testing, etc.)
1703              
1704             =head1 Bug Reports
1705              
1706             If email is better for you, L but I
1707             would rather have bugs sent through the issue tracker found at
1708             http://github.com/sanko/anyevent-bittorrent/issues.
1709              
1710             Please check the ToDo file included with this distribution in case your bug
1711             is already known (...I probably won't file bug reports to myself).
1712              
1713             =head1 See Also
1714              
1715             L - The package which does all of the wire protocol
1716             level heavy lifting.
1717              
1718             =head1 Author
1719              
1720             Sanko Robinson - http://sankorobinson.com/
1721              
1722             CPAN ID: SANKO
1723              
1724             =head1 License and Legal
1725              
1726             Copyright (C) 2011-2016 by Sanko Robinson
1727              
1728             This program is free software; you can redistribute it and/or modify it under
1729             the terms of
1730             L.
1731             See the F file included with this distribution or
1732             L
1733             for clarification.
1734              
1735             When separated from the distribution, all original POD documentation is
1736             covered by the
1737             L.
1738             See the
1739             L.
1740              
1741             Neither this module nor the L is affiliated with BitTorrent,
1742             Inc.
1743              
1744             =cut