File Coverage

blib/lib/AnyEvent/BitTorrent.pm
Criterion Covered Total %
statement 193 617 31.2
branch 28 248 11.2
condition 13 105 12.3
subroutine 45 99 45.4
pod 10 13 76.9
total 289 1082 26.7


line stmt bran cond sub pod time code
1             package AnyEvent::BitTorrent;
2 4     4   20036 use AnyEvent;
  4         3624  
  4         89  
3 4     4   2359 use AnyEvent::Handle;
  4         57213  
  4         416  
4 4     4   2153 use AnyEvent::Socket;
  4         40558  
  4         401  
5 4     4   2213 use AnyEvent::HTTP;
  4         25010  
  4         280  
6 4     4   1912 use Moo;
  4         36538  
  4         20  
7 4     4   6183 use Type::Tiny;
  4         51594  
  4         128  
8 4     4   1978 use Types::Standard qw[ArrayRef CodeRef Enum HashRef Int Ref Str];
  4         136585  
  4         41  
9 4     4   3951 use Fcntl qw[/SEEK_/ /O_/ :flock];
  4         7  
  4         1509  
10 4     4   1906 use Digest::SHA qw[sha1];
  4         9759  
  4         247  
11 4     4   44 use File::Spec;
  4         5  
  4         71  
12 4     4   14 use File::Path;
  4         4  
  4         190  
13 4     4   1708 use Net::BitTorrent::Protocol qw[:all];
  4         46199  
  4         1598  
14 4     4   26 use Scalar::Util qw[/weak/];
  4         5  
  4         28824  
15             #
16             our $VERSION = "1.0.1";
17             #
18             # XXX - These should be ro attributes w/o init args:
19             my $block_size = 2**14;
20              
21             # Custom types
22             my $FILE = Type::Tiny->new(name => 'File',
23             parent => Str,
24             constraint => sub { -f $_ },
25             message => sub {"$_ isn't an existing file"},
26             );
27             my $RESERVED = Type::Tiny->new(name => 'Reserved',
28             parent => Str,
29             constraint => sub { length $_ == 8 },
30             message => sub {'reserved data is malformed'}
31             );
32             my $PEERID = Type::Tiny->new(
33             name => 'PeerID',
34             parent => Str,
35             constraint => sub { length $_ == 20 },
36             message => sub {
37             'Peer ID must be 20 chars in length';
38             }
39             );
40             my $INFOHASH = Type::Tiny->new(
41             name => 'Infohash',
42             parent => Str,
43             constraint => sub { length $_ == 20 },
44             message => sub {
45             'Infohashes are 20 bytes in length';
46             }
47             );
48             #
49             has port => (is => 'ro',
50             isa => Int,
51             default => sub {0},
52             writer => '_set_port'
53             );
54             has socket => (is => 'ro',
55             isa => Ref,
56             init_arg => undef,
57             predicate => '_has_socket',
58             builder => '_build_socket'
59             );
60              
61             sub _build_socket {
62 4     4   105 my $s = shift;
63             tcp_server undef, $s->port, sub {
64 0     0   0 my ($fh, $host, $port) = @_;
65 0         0 AE::log info => 'Accepted connection from %s:%d', $host, $port;
66 0 0       0 return $fh->destroy if $s->state eq 'stopped';
67             my $handle = AnyEvent::Handle->new(
68             fh => $fh,
69             on_error => sub {
70 0         0 my ($hdl, $fatal, $msg) = @_;
71              
72             # XXX - callback
73 0         0 AE::log error => 'Socket error: ' . $msg;
74 0         0 $s->_del_peer($hdl);
75             },
76             on_eof => sub {
77 0         0 my $h = shift;
78 0         0 AE::log info => 'Socket EOF';
79 0         0 $s->_del_peer($h);
80             },
81             on_read => sub {
82 0         0 AE::log debug => 'Read Socket';
83 0         0 $s->_on_read_incoming(@_);
84             }
85 0         0 );
86 0         0 $s->_add_peer($handle);
87             }, sub {
88 4     4   573 my ($fh, $thishost, $thisport) = @_;
89 4         35 $s->_set_port($thisport);
90 4         1570 AE::log info => "bound to $thishost, port $thisport";
91 4         62 };
92             }
93             has path => (is => 'ro',
94             isa => $FILE,
95             required => 1
96             );
97             has reserved => (is => 'ro',
98             builder => '_build_reserved',
99             isa => $RESERVED
100             );
101              
102             sub _build_reserved {
103 4     4   67 my $reserved = "\0" x 8;
104              
105             #vec($reserved, 5, 8) = 0x10; # Ext Protocol
106 4         14 vec($reserved, 7, 8) = 0x04; # Fast Ext
107 4         17 AE::log debug => '_build_reserved() => ' . $reserved;
108 4         27052 $reserved;
109             }
110             has peerid => (is => 'ro',
111             isa => $PEERID,
112             init_arg => undef,
113             required => 1,
114             builder => '_build_peerid'
115             );
116              
117             sub _build_peerid {
118             return pack(
119             'a20',
120             (sprintf(
121             '-AB%01d%01d%01d%1s-%7s%-5s',
122             ($VERSION =~ m[^v?(\d+)\.(\d+)\.(\d+)]),
123             ($VERSION =~ m[[^\d\.^v]] ? 'U' : 'S'),
124             (join '',
125             map {
126 4 50   4   143 ['A' .. 'Z', 'a' .. 'z', 0 .. 9, qw[- . _ ~]]->[rand(66)]
  28         323  
127             } 1 .. 7
128             ),
129             [qw[KaiLi April Aaron]]->[rand 3]
130             )
131             )
132             );
133             }
134             has bitfield => (is => 'ro',
135             lazy => 1,
136             builder => '_build_bitfield',
137             isa => Str,
138             init_arg => undef,
139             );
140 4     4   1364 sub _build_bitfield { pack 'b*', "\0" x shift->piece_count }
141              
142             sub wanted {
143 4     4 1 36 my $s = shift;
144 4         8 my $wanted = '';
145 4         6 for my $findex (0 .. $#{$s->files}) {
  4         48  
146 4         176 my $prio = !!$s->files->[$findex]{priority};
147 4         36 for my $index ($s->_file_to_range($findex)) {
148 8416   66     152400 vec($wanted, $index, 1) = $prio && !vec($s->bitfield, $index, 1);
149             }
150             }
151 4         252 AE::log debug => '->wanted() => ' . unpack 'b*', $wanted;
152 4         317 $wanted;
153             }
154              
155             sub complete {
156 0     0 1 0 my $s = shift;
157 0         0 -1 == index substr(unpack('b*', $s->wanted), 0, $s->piece_count + 1), 1;
158             }
159              
160             sub seed {
161 0     0 1 0 my $s = shift;
162 0         0 -1 == index substr(unpack('b*', $s->bitfield), 0, $s->piece_count + 1), 0;
163             }
164              
165             sub _left {
166 4     4   6 my $s = shift;
167 4         11 $s->piece_length * scalar grep {$_} split '',
  8416         6132  
168             substr unpack('b*', $s->wanted), 0, $s->piece_count + 1;
169             }
170             has $_ => (is => 'ro',
171             isa => Int,
172             default => sub {0},
173             writer => '_set_' . $_
174             ) for qw[uploaded downloaded];
175             has infohash => (is => 'ro',
176             lazy => 1,
177             builder => '_build_infohash',
178             isa => $INFOHASH,
179             init_arg => undef
180             );
181 3     3   1427 sub _build_infohash { sha1(bencode(shift->metadata->{info})) }
182             has metadata => (is => 'ro',
183             lazy_build => 1,
184             builder => '_build_metadata',
185             lazy => 1,
186             isa => HashRef,
187             init_arg => undef
188             );
189              
190             sub _build_metadata {
191 4     4   1341 my $s = shift;
192              
193             #return if ref $s ne __PACKAGE__; # Applying roles makes deep rec
194 4         181 open my $fh, '<', $s->path;
195 4         156 sysread $fh, my $raw, -s $fh;
196 4         24 my $metadata = bdecode $raw;
197             AE::log debug => sub {
198 0     0   0 require Data::Dump;
199 0         0 '_build_metadata() => ' . Data::Dump::dump($metadata);
200 4         2629 };
201 4         302 $metadata;
202             }
203 5     5 1 3238 sub name { shift->metadata->{info}{name} }
204 6312     6312 1 93443 sub pieces { shift->metadata->{info}{pieces} }
205 25271     25271 1 370276 sub piece_length { shift->metadata->{info}{'piece length'} }
206              
207             sub piece_count {
208 12635     12635 0 9800 my $s = shift;
209 12635         162498 my $count = $s->size / $s->piece_length;
210 12635 50       95566 int($count) + (($count == int $count) ? 1 : 0);
211             }
212             has basedir => (
213             is => 'ro',
214             lazy => 1,
215             isa => Str,
216             required => 1,
217             default => sub { File::Spec->rel2abs(File::Spec->curdir) },
218             trigger => sub {
219             my ($s, $n, $o) = @_;
220             $o // return;
221             $s->_clear_files; # So they can be rebuilt with the new basedir
222             }
223             );
224             has files => (is => 'ro',
225             lazy => 1,
226             builder => '_build_files',
227             isa => ArrayRef [HashRef],
228             init_arg => undef,
229             clearer => '_clear_files'
230             );
231              
232             sub _build_files {
233 4     4   1348 my $s = shift;
234             defined $s->metadata->{info}{files} ?
235             [
236             map {
237             {priority => 1,
238             fh => undef,
239             mode => 'c',
240             length => $_->{length},
241             timeout => undef,
242             path =>
243             File::Spec->rel2abs(
244 0         0 File::Spec->catfile($s->basedir, $s->name, @{$_->{path}})
  0         0  
245             )
246             }
247 0         0 } @{$s->metadata->{info}{files}}
248             ]
249             : [
250             {priority => 1,
251             fh => undef,
252             mode => 'c',
253             length => $s->metadata->{info}{length},
254 4 50       36 timeout => undef,
255             path =>
256             File::Spec->rel2abs(File::Spec->catfile($s->basedir, $s->name))
257             }
258             ];
259             }
260             has size => (is => 'ro',
261             lazy => 1,
262             builder => '_build_size',
263             isa => Int,
264             init_arg => undef
265             );
266              
267             sub _build_size {
268 4     4   1523 my $s = shift;
269 4         6 my $ret = 0;
270 4         4 $ret += $_->{length} for @{$s->files};
  4         33  
271 4         227 AE::log debug => '_build_size() => ' . $ret;
272 4         172 $ret;
273             }
274              
275             sub _open {
276 3     3   42 my ($s, $i, $m) = @_;
277             AE::log
278             trace => 'Opening file #%d (%s) for %s',
279 3         44 $i, $s->files->[$i]->{path}, $m;
280 3 50       133 return 1 if $s->files->[$i]->{mode} eq $m;
281 0 0       0 if (defined $s->files->[$i]->{fh}) {
282 0         0 AE::log trace => 'Closing %s', $s->files->[$i]->{fh};
283 0         0 flock $s->files->[$i]->{fh}, LOCK_UN;
284 0         0 close $s->files->[$i]->{fh};
285 0         0 $s->files->[$i]->{fh} = ();
286             }
287 0 0       0 if ($m eq 'r') {
    0          
    0          
288 0         0 AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
289 0 0       0 sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
290             || return;
291 0 0       0 flock($s->files->[$i]->{fh}, LOCK_SH) || return;
292 0 0       0 weaken $s unless isweak $s;
293 0         0 my $x = $i;
294             $s->files->[$x]->{timeout}
295 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
296             }
297             elsif ($m eq 'w') {
298 0         0 AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
299 0         0 my @split = File::Spec->splitdir($s->files->[$i]->{path});
300 0         0 pop @split; # File name itself
301 0         0 my $dir = File::Spec->catdir(@split);
302 0 0       0 File::Path::mkpath($dir) if !-d $dir;
303             sysopen($s->files->[$i]->{fh},
304             $s->files->[$i]->{path},
305 0 0       0 O_WRONLY | O_CREAT)
306             || return;
307 0         0 flock $s->files->[$i]->{fh}, LOCK_EX;
308             truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
309             if -s $s->files->[$i]->{fh}
310 0 0       0 != $s->files->[$i]->{length}; # XXX - pre-allocate files
311 0 0       0 weaken $s unless isweak $s;
312 0         0 my $x = $i;
313             $s->files->[$x]->{timeout}
314 0   0 0   0 = AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
315             }
316 0         0 elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
317 0         0 else {return}
318 0         0 return $s->files->[$i]->{mode} = $m;
319             }
320             has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });
321              
322             sub _cache_path {
323 0     0   0 my $s = shift;
324             File::Spec->catfile($s->basedir,
325 0 0       0 (scalar @{$s->files} == 1 ? () : $s->name),
  0         0  
326             '~ABPartFile_-'
327             . uc(substr(unpack('H*', $s->infohash), 0, 10))
328             . '.dat'
329             );
330             }
331              
332             sub _write_cache {
333 0     0   0 my ($s, $f, $o, $d) = @_;
334 0         0 my $path = $s->_cache_path;
335 0         0 AE::log
336             debug =>
337             'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
338             length($d), $path, $f, $o;
339 0         0 my @split = File::Spec->splitdir($path);
340 0         0 pop @split; # File name itself
341 0         0 my $dir = File::Spec->catdir(@split);
342 0 0       0 File::Path::mkpath($dir) if !-d $dir;
343 0 0       0 sysopen(my ($fh), $path, O_WRONLY | O_CREAT)
344             || return;
345 0         0 flock $fh, LOCK_EX;
346 0         0 my $pos = sysseek $fh, 0, SEEK_CUR;
347 0         0 my $w = syswrite $fh, $d;
348 0         0 flock $fh, LOCK_UN;
349 0         0 close $fh;
350 0         0 $s->piece_cache->{$f}{$o} = $pos;
351 0         0 AE::log debug => 'Wrote %d bytes to cache file', $w;
352 0         0 return $w;
353             }
354              
355             sub _read_cache {
356 6312     6312   7602 my ($s, $f, $o, $l) = @_;
357 6312   50     615349 $s->piece_cache->{$f} // return;
358 0   0     0 $s->piece_cache->{$f}{$o} // return;
359 0         0 my $path = $s->_cache_path;
360 0         0 AE::log
361             debug =>
362             'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
363             $l, $path, $f, $o;
364 0 0       0 sysopen(my ($fh), $path, O_RDONLY)
365             || return;
366 0         0 flock $fh, LOCK_SH;
367 0         0 sysseek $fh, $s->piece_cache->{$f}{$o}, SEEK_SET;
368 0         0 my $w = sysread $fh, my ($d), $l;
369 0         0 flock $fh, LOCK_UN;
370 0         0 close $fh;
371 0         0 return $d;
372             }
373              
374             sub _read {
375 6312     6312   30600 my ($s, $index, $offset, $length) = @_;
376 6312         11054 AE::log
377             debug =>
378             'Attempting to read %d bytes from piece %d starting at %d bytes',
379             $length, $index, $offset;
380 6312         164074 my $data = '';
381 6312         4813 my $file_index = 0;
382 6312         8005 my $total_offset = ($index * $s->piece_length) + $offset;
383             SEARCH:
384 6312         107457 while ($total_offset > $s->files->[$file_index]->{length}) {
385 0         0 $total_offset -= $s->files->[$file_index]->{length};
386 0         0 $file_index++;
387 0         0 AE::log
388             trace =>
389             'Searching for location... $total_offset = %d, $file_index = %d',
390             $total_offset, $file_index;
391             last SEARCH # XXX - return?
392 0 0       0 if not defined $s->files->[$file_index]->{length};
393             }
394 6312   33     50707 READ: while ((defined $length) && ($length > 0)) {
395             my $this_read
396             = (
397             ($total_offset + $length) >= $s->files->[$file_index]->{length})
398             ?
399 6312 100       82237 ($s->files->[$file_index]->{length} - $total_offset)
400             : $length;
401             AE::log
402             trace =>
403             'Attempting to read %d bytes from file #%d (%s), starting at %d',
404             $this_read,
405 6312         100806 $file_index, $s->files->[$file_index]->{path}, $total_offset;
406 6312 50 33     263742 if ( (!-f $s->files->[$file_index]->{path})
407             || (!$s->_open($file_index, 'r')))
408 6312   33     96001 { $data .= $s->_read_cache($file_index, $total_offset, $this_read)
409             // ("\0" x $this_read);
410 6312         15527 AE::log note => 'Failed to open file. Using null chars instead.';
411             }
412             else {
413 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
414 0         0 sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
415 0 0       0 $data .= $_data if $_data;
416 0         0 AE::log
417             trace =>
418             'Read %d bytes of data from file (%d bytes collected so far)',
419             length $_data, length $data;
420 0 0       0 weaken $s unless isweak $s;
421 0         0 my $x = $file_index;
422             $s->files->[$x]->{timeout}
423 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
424             }
425 6312         187084 $file_index++;
426 6312         6386 $length -= $this_read;
427 6312         10183 AE::log
428             trace => 'Still need to read %d bytes',
429             $length;
430 6312 50       256348 last READ if not defined $s->files->[$file_index];
431 0         0 $total_offset = 0;
432             }
433 6312         42759 AE::log trace => 'Returning %d bytes of data', length $data;
434 6312         160658 return $data;
435             }
436              
437             sub _write {
438 0     0   0 my ($s, $index, $offset, $data) = @_;
439 0         0 AE::log
440             debug =>
441             'Attempting to write %d bytes from piece %d starting at %d bytes',
442             length($data), $index, $offset;
443 0         0 my $file_index = 0;
444 0   0     0 my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
445 0         0 AE::log
446             debug => '...calculated offset == %d',
447             $total_offset;
448             SEARCH:
449 0         0 while ($total_offset > $s->files->[$file_index]->{length}) {
450 0         0 $total_offset -= $s->files->[$file_index]->{length};
451 0         0 $file_index++;
452 0         0 AE::log
453             trace =>
454             'Searching for location... $total_offset = %d, $file_index = %d',
455             $total_offset, $file_index;
456             last SEARCH # XXX - return?
457 0 0       0 if not defined $s->files->[$file_index]->{length};
458             }
459 0   0     0 WRITE: while ((defined $data) && (length $data > 0)) {
460             my $this_write
461             = (($total_offset + length $data)
462             >= $s->files->[$file_index]->{length})
463             ?
464 0 0       0 ($s->files->[$file_index]->{length} - $total_offset)
465             : length $data;
466             AE::log
467             trace =>
468             'Attempting to write %d bytes from file #%d (%s), starting at %d',
469             $this_write,
470 0         0 $file_index, $s->files->[$file_index]->{path}, $total_offset;
471 0 0       0 if ($s->files->[$file_index]->{priority} == 0) {
472 0         0 $s->_write_cache($file_index, $total_offset, substr $data, 0,
473             $this_write, '');
474 0         0 AE::log trace => 'Wrote data to cache...';
475             }
476             else {
477 0         0 $s->_open($file_index, 'w');
478 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
479 0         0 my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
480             $this_write, '';
481 0         0 AE::log
482             trace => 'Wrote %d bytes of data to file (%d bytes left)',
483             $w, length $data;
484 0 0       0 weaken $s unless isweak $s;
485 0         0 my $x = $file_index;
486             $s->files->[$x]->{timeout}
487 0   0 0   0 = AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
488             }
489 0         0 $file_index++;
490 0 0       0 last WRITE if not defined $s->files->[$file_index];
491 0         0 $total_offset = 0;
492             }
493 0         0 return 1;
494             }
495              
496             sub hashcheck (;@) {
497 3     3 1 511 my $s = shift;
498 3 50       16 my @indexes = @_ ? @_ : (0 .. $s->piece_count);
499             AE::log trace => sub {
500 0     0   0 require Data::Dump;
501 0         0 'Hashcheck of : ' . Data::Dump::dump(\@indexes);
502 3         97 };
503 3         114 $s->bitfield; # Makes sure it's built
504 3         89 my $total_size = $s->size;
505 3         34 for my $index (@indexes) {
506 6312 50 33     17470 next if $index < 0 || $index > $s->piece_count;
507 6312 100       9233 my $piece = $s->_read($index,
508             0,
509             $index == $s->piece_count
510             ?
511             $total_size % $s->piece_length
512             : $s->piece_length
513             );
514 6312         10095 my $expected = substr($s->pieces, $index * 20, 20);
515 6312         14867557 my $reality = sha1($piece);
516 6312   66     27607 my $ok = defined($piece)
517             && ($expected eq $reality);
518 6312         18380 vec($s->{bitfield}, $index, 1) = $ok;
519             AE::log trace => sub {
520 0 0   0   0 "Validate piece #%06d %s, Expected: %s\n"
521             . " Reality: %s",
522             $index, ($ok ? 'PASS' : 'FAIL'), unpack('H*', $expected),
523             unpack('H*', $reality);
524 6312         33639 };
525 6312 100       200518 $ok ?
526             $s->_trigger_hash_pass($index)
527             : $s->_trigger_hash_fail($index);
528             }
529             }
530             has peers => (is => 'ro',
531             lazy => 1,
532             isa => HashRef,
533             clearer => '_clear_peers',
534             builder => '_build_peers'
535             );
536 4     4   1451 sub _build_peers { {} }
537              
538             sub _add_peer {
539 0     0   0 my ($s, $h) = @_;
540             $s->{peers}{+$h} = {
541             handle => $h,
542             peerid => '',
543             bitfield => (pack 'b*', "\0" x $s->piece_count),
544             remote_choked => 1,
545             remote_interested => 0,
546             remote_requests => [],
547             local_choked => 1,
548             local_interested => 0,
549             local_requests => [],
550 0     0   0 timeout => AE::timer(20, 0, sub { $s->_del_peer($h) }),
551             keepalive => AE::timer(
552             30, 120,
553             sub {
554 0     0   0 $s->_send_encrypted($h, build_keepalive());
555             }
556 0         0 ),
557              
558             # BEP06
559             local_allowed => [],
560             remote_allowed => [],
561             local_suggest => [],
562             remote_suggest => [],
563             #
564             encryption => '?'
565             };
566             }
567              
568             sub _del_peer {
569 0     0   0 my ($s, $h) = @_;
570 0   0     0 $s->peers->{$h} // return;
571 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
572 0         0 my ($i, $o, $l) = @$req;
573 0         0 $s->working_pieces->{$i}{$o}[3] = ();
574             }
575 0         0 delete $s->peers->{$h};
576 0         0 $h->destroy;
577             }
578             my $shuffle;
579             has trackers => (is => 'ro',
580             lazy => 1,
581             builder => '_build_trackers',
582             isa => ArrayRef [HashRef],
583             init_arg => undef
584             );
585              
586             sub _build_trackers {
587 4     4   1832 my $s = shift;
588             $shuffle //= sub {
589 8     8   11 my $deck = shift; # $deck is a reference to an array
590 8 50       22 return unless @$deck; # must not be empty!
591 8         10 my $i = @$deck;
592 8         22 while (--$i) {
593 0         0 my $j = int rand($i + 1);
594 0         0 @$deck[$i, $j] = @$deck[$j, $i];
595             }
596 4   100     30 };
597             my $trackers = [
598             map {
599             {urls => $_,
600             complete => 0,
601             incomplete => 0,
602             peers => '',
603             peers6 => '',
604             announcer => undef,
605             ticker => AE::timer(
606             1,
607             15 * 60,
608             sub {
609 0 0   0   0 return if $s->state eq 'stopped';
610 0         0 $s->announce('started');
611             }
612 4         186 ),
613             failures => 0
614             }
615             } defined $s->metadata->{announce} ? [$s->metadata->{announce}]
616             : (),
617             defined $s->metadata->{'announce-list'}
618 4 50       52 ? @{$s->metadata->{'announce-list'}}
  0 50       0  
619             : ()
620             ];
621             AE::log trace => sub {
622 0     0   0 require Data::Dump;
623 0         0 '$trackers before shuffle => ' . Data::Dump::dump($trackers);
624 4         19 };
625 4         126 $shuffle->($trackers);
626 4         14 $shuffle->($_->{urls}) for @$trackers;
627             AE::log trace => sub {
628 0     0   0 require Data::Dump;
629 0         0 '$trackers after shuffle => ' . Data::Dump::dump($trackers);
630 4         47 };
631 4         204 $trackers;
632             }
633              
634             sub announce {
635 3     3 0 6 my ($s, $e) = @_;
636 3 50       11 return if $a++ > 10; # Retry attempts
637 3         39 for my $tier (@{$s->trackers}) {
  3         31  
638 4   33     4045 $tier->{announcer} //= $s->_announce_tier($e, $tier);
639             }
640             }
641              
642             sub _announce_tier {
643 4     4   8 my ($s, $e, $tier) = @_;
644 4         6 my @urls = grep {m[^https?://]} @{$tier->{urls}};
  4         19  
  4         14  
645 4 50       15 return if $tier->{failures} > 5;
646 4 50       6 return if $#{$tier->{urls}} < 0; # Empty tier?
  4         14  
647 4 50       17 return if $tier->{urls}[0] !~ m[^https?://.+];
648 4         20 local $AnyEvent::HTTP::USERAGENT
649             = 'AnyEvent::BitTorrent/' . $AnyEvent::BitTorrent::VERSION;
650             my $_url = $tier->{urls}[0] . '?info_hash=' . sub {
651 4     4   119 local $_ = shift;
652 4         25 s/([\W])/"%" . uc(sprintf("%2.2x",ord($1)))/eg;
  56         118  
653 4         68 $_;
654             }
655 4 50       41 ->($s->infohash)
656             . ('&peer_id=' . $s->peerid)
657             . ('&uploaded=' . $s->uploaded)
658             . ('&downloaded=' . $s->downloaded)
659             . ('&left=' . $s->_left)
660             . ('&port=' . $s->port)
661             . '&compact=1'
662             . ($e ? '&event=' . $e : '');
663 4         312 AE::log debug => 'Announce URL: ' . $_url;
664             http_get $_url, sub {
665 0     0   0 my ($body, $hdr) = @_;
666             AE::log trace => sub {
667 0         0 require Data::Dump;
668 0         0 'Announce response: ' . Data::Dump::dump($body, $hdr);
669 0         0 };
670 0         0 $tier->{announcer} = ();
671 0 0       0 if ($hdr->{Status} =~ /^2/) {
672 0         0 my $reply = bdecode($body);
673 0 0       0 if (defined $reply->{'failure reason'}) { # XXX - Callback?
674 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
675 0         0 $s->_announce_tier($e, $tier);
676 0         0 $tier->{'failure reason'} = $reply->{'failure reason'};
677 0         0 $tier->{failures}++;
678             }
679             else { # XXX - Callback?
680 0         0 $tier->{failures} = $tier->{'failure reason'} = 0;
681             $tier->{peers}
682             = compact_ipv4(
683             uncompact_ipv4($tier->{peers} . $reply->{peers}))
684 0 0       0 if $reply->{peers};
685             $tier->{peers6}
686             = compact_ipv6(
687             uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
688 0 0       0 if $reply->{peers6};
689 0         0 $tier->{complete} = $reply->{complete};
690 0         0 $tier->{incomplete} = $reply->{incomplete};
691             $tier->{ticker} = AE::timer(
692             $reply->{interval} // (15 * 60),
693             $reply->{interval} // (15 * 60),
694             sub {
695 0 0       0 return if $s->state eq 'stopped';
696 0         0 $s->_announce_tier($e, $tier);
697             }
698 0   0     0 );
      0        
699             }
700             }
701             else { # XXX - Callback?
702 0         0 $tier->{'failure reason'}
703             = "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
704 0         0 $tier->{failures}++;
705 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
706 0         0 $s->_announce_tier($e, $tier);
707             }
708             }
709 4         243 }
710             has _choke_timer => (
711             is => 'bare',
712             isa => Ref,
713             init_arg => undef,
714             required => 1,
715             default => sub {
716             my $s = shift;
717             AE::timer(
718             15, 45,
719             sub {
720             return if $s->state ne 'active';
721             AE::log trace => 'Choke timer...';
722             my @interested
723             = grep { $_->{remote_interested} && $_->{remote_choked} }
724             values %{$s->peers};
725              
726             # XXX - Limit the number of upload slots
727             for my $p (@interested) {
728             $p->{remote_choked} = 0;
729             $s->_send_encrypted($p->{handle}, build_unchoke());
730             AE::log trace => 'Choked %s', $p->{peerid};
731             }
732              
733             # XXX - Send choke to random peer
734             }
735             );
736             }
737             );
738             has _fill_requests_timer => (
739             is => 'bare',
740             isa => Ref,
741             init_arg => undef,
742             required => 1,
743             default => sub {
744             my $s = shift;
745             AE::timer(
746             15, 10,
747             sub { # XXX - Limit by time/bandwidth
748             return if $s->state ne 'active';
749             AE::log trace => 'Request fill timer...';
750             my @waiting
751             = grep { defined && scalar @{$_->{remote_requests}} }
752             values %{$s->peers};
753             return if !@waiting;
754             my $total_sent = 0;
755             while (@waiting && $total_sent < 2**20) {
756             my $p = splice(@waiting, rand @waiting, 1, ());
757             AE::log trace => 'Chosen peer: %s...', $p->{peerid};
758             while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
759             my $req = shift @{$p->{remote_requests}};
760             AE::log
761             trace =>
762             'Filling request i:%d, o:%d, l:%d for %s',
763             @$req;
764              
765             # XXX - If piece is bad locally
766             # if remote supports fast ext
767             # send reject
768             # else
769             # simply return
770             # else...
771             $s->_send_encrypted(
772             $p->{handle},
773             build_piece(
774             $req->[0], $req->[1],
775             $s->_read($req->[0], $req->[1], $req->[2])
776             )
777             );
778             $total_sent += $req->[2];
779             }
780             }
781             $s->_set_uploaded($s->uploaded + $total_sent);
782             }
783             );
784             }
785             );
786             has _peer_timer => (is => 'ro',
787             lazy => 1,
788             isa => Ref,
789             init_arg => undef,
790             clearer => '_clear_peer_timer',
791             builder => '_build_peer_timer'
792             );
793              
794             sub _build_peer_timer {
795 4     4   1366 my $s = shift;
796             AE::timer(
797             1, 15,
798             sub {
799 0 0   0   0 return if !$s->_left;
800 0         0 AE::log trace => 'Attempting to connect to new peer...';
801              
802             # XXX - Initiate connections when we are in Super seed mode?
803             my @cache = map {
804             $_->{peers} ? uncompact_ipv4($_->{peers}) : (),
805             $_->{peers6} ?
806             uncompact_ipv6($_->{peers6})
807 0 0       0 : ()
    0          
808 0         0 } @{$s->trackers};
  0         0  
809 0 0       0 return if !@cache;
810 0         0 for my $i (1 .. @cache) {
811 0 0       0 last if $i > 10; # XXX - Max half open
812             last
813 0 0       0 if scalar(keys %{$s->peers}) > 100; # XXX - Max peers
  0         0  
814 0         0 my $addr = splice @cache, rand $#cache, 1;
815 0         0 $s->_new_peer($addr);
816             }
817             }
818 4         80 );
819             }
820              
821             sub _new_peer {
822 0     0   0 my ($s, $addr) = @_;
823 0         0 AE::log trace => 'Connecting to %s:%d', @$addr;
824 0         0 my $handle;
825             $handle = AnyEvent::Handle->new(
826             connect => $addr,
827 0     0   0 on_prepare => sub {60},
828             on_error => sub {
829 0     0   0 my ($hdl, $fatal, $msg) = @_;
830              
831             # XXX - callback
832 0         0 AE::log
833             error => 'Socket error: %s (Removing peer)',
834             $msg;
835 0         0 $s->_del_peer($hdl);
836             },
837             on_connect_error => sub {
838 0     0   0 my ($hdl, $fatal, $msg) = @_;
839 0         0 $s->_del_peer($hdl);
840              
841             # XXX - callback
842 0 0 0     0 AE::log
843             error => sprintf "%sfatal error (%s)\n",
844             $fatal ? '' : 'non-',
845             $msg // 'Connection timed out';
846 0 0       0 return if !$fatal;
847             },
848             on_connect => sub {
849 0     0   0 my ($h, $host, $port, $retry) = @_;
850 0         0 AE::log
851             trace => 'Connection established with %s:%d',
852             $host, $port;
853 0         0 $s->_add_peer($handle);
854 0         0 $s->_send_handshake($handle);
855             },
856             on_eof => sub {
857 0     0   0 my $h = shift;
858 0         0 AE::log trace => 'EOF from peer';
859 0         0 $s->_del_peer($h);
860             },
861             on_read => sub {
862 0     0   0 $s->_on_read(@_);
863             }
864 0         0 );
865 0         0 return $handle;
866             }
867              
868             sub _on_read_incoming {
869 0     0   0 my ($s, $h) = @_;
870 0   0     0 $h->rbuf // return;
871              
872             # XXX - Handle things if the stream is encrypted
873 0         0 my $packet = parse_packet(\$h->rbuf);
874 0 0       0 return if !$packet;
875             AE::log trace => sub {
876 0     0   0 require Data::Dump;
877 0         0 'Incoming packet: ' . Data::Dump::dump($packet);
878 0         0 };
879 0 0       0 if (defined $packet->{error}) {
    0          
880 0         0 return $s->_del_peer($h);
881             }
882             elsif ($packet->{type} == $HANDSHAKE) {
883 0   0     0 ref $packet->{payload} // return;
884 0 0       0 return if ref $packet->{payload} ne 'ARRAY';
885 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
886             return $s->_del_peer($h)
887 0 0       0 if $packet->{payload}[1] ne $s->infohash;
888 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
889 0         0 $s->_send_handshake($h);
890 0         0 $s->_send_bitfield($h);
891             $s->peers->{$h}{timeout}
892 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
893 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
894 0     0   0 $h->on_read(sub { $s->_on_read(@_) });
  0         0  
895             }
896             else { # This should never happen
897             }
898 0         0 1;
899             }
900              
901             sub _on_read {
902 0     0   0 my ($s, $h) = @_;
903 0         0 while (my $packet = parse_packet(\$h->rbuf)) {
904 0 0       0 last if !$packet;
905             AE::log debug => sub {
906 0     0   0 require Data::Dump;
907 0         0 'Incoming packet: ' . Data::Dump::dump($packet->{error});
908 0         0 };
909 0 0       0 if (defined $packet->{error}) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
910 0         0 $s->_del_peer($h);
911 0         0 return;
912             }
913             elsif ($packet->{type} eq $KEEPALIVE) {
914              
915             # Do nothing!
916             }
917             elsif ($packet->{type} == $HANDSHAKE) {
918 0   0     0 ref $packet->{payload} // return;
919 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
920             return $s->_del_peer($h)
921 0 0       0 if $packet->{payload}[1] ne $s->infohash;
922 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
923 0         0 $s->_send_bitfield($h);
924             $s->peers->{$h}{timeout}
925 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
926 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
927             }
928             elsif ($packet->{type} == $INTERESTED) {
929 0         0 $s->peers->{$h}{remote_interested} = 1;
930             }
931             elsif ($packet->{type} == $NOT_INTERESTED) {
932 0         0 $s->peers->{$h}{remote_interested} = 0;
933              
934             # XXX - Clear any requests in queue
935             # XXX - Send choke just to be sure
936             }
937             elsif ($packet->{type} == $CHOKE) {
938 0         0 $s->peers->{$h}{local_choked} = 1;
939 0 0       0 if (!(vec($s->peers->{$h}{reserved}, 7, 1) & 0x04)) {
940 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
941             $s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
942             unless
943 0 0       0 defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
944             }
945             }
946 0         0 $s->_consider_peer($s->peers->{$h});
947             }
948             elsif ($packet->{type} == $UNCHOKE) {
949 0         0 $s->peers->{$h}{local_choked} = 0;
950             $s->peers->{$h}{timeout}
951 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
952 0         0 $s->_request_pieces($s->peers->{$h});
953             }
954             elsif ($packet->{type} == $HAVE) {
955 0         0 vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
956 0         0 $s->_consider_peer($s->peers->{$h});
957             $s->peers->{$h}{timeout}
958 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
959             }
960             elsif ($packet->{type} == $BITFIELD) {
961 0         0 $s->peers->{$h}{bitfield} = $packet->{payload};
962 0         0 $s->_consider_peer($s->peers->{$h});
963             }
964             elsif ($packet->{type} == $REQUEST) {
965             $s->peers->{$h}{timeout}
966 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
967              
968             # XXX - Make sure (index + offset + length) < $s->size
969             # if not, send reject if remote supports fast ext
970             # either way, ignore the request
971 0         0 push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
  0         0  
972             }
973             elsif ($packet->{type} == $PIECE) {
974             $s->peers->{$h}{timeout}
975 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
976 0         0 my ($index, $offset, $data) = @{$packet->{payload}};
  0         0  
977              
978             # Make sure $index is a working piece
979 0   0     0 $s->working_pieces->{$index} // return;
980              
981             # Make sure we req from this peer
982             return
983             if !grep {
984 0 0 0     0 $_->[0] == $index
985             && $_->[1] == $offset
986             && $_->[2] == length $data
987 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
988             $s->peers->{$h}{local_requests} = [
989             grep {
990 0 0 0     0 ($_->[0] != $index)
991             || ($_->[1] != $offset)
992             || ($_->[2] != length($data))
993 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
994             ];
995 0         0 $s->working_pieces->{$index}{$offset}[4] = $data;
996 0         0 $s->working_pieces->{$index}{$offset}[5] = ();
997 0         0 $s->_set_downloaded($s->downloaded + length $data);
998 0 0       0 if (0 == scalar grep { !defined $_->[4] }
  0         0  
999 0         0 values %{$s->working_pieces->{$index}})
1000             { my $piece = join '',
1001 0         0 map { $s->working_pieces->{$index}{$_}[4] }
1002 0         0 sort { $a <=> $b }
1003 0         0 keys %{$s->working_pieces->{$index}};
  0         0  
1004 0 0       0 if ((substr($s->pieces, $index * 20, 20) eq sha1($piece))) {
1005 0         0 for my $attempt (1 .. 5) { # XXX = 5 == failure callback
1006             last
1007 0 0       0 if $s->_write($index, 0, $piece) == length $piece;
1008             }
1009 0         0 vec($s->{bitfield}, $index, 1) = 1;
1010             $s->_broadcast(
1011             build_have($index),
1012             sub {
1013 0     0   0 !!!index substr(unpack('b*', $_->{bitfield}),
1014             0, $s->piece_count + 1),
1015             0, 0;
1016             }
1017 0         0 );
1018             $s->announce('complete')
1019 0 0       0 if !scalar grep {$_} split '',
  0         0  
1020             substr unpack('b*', ~$s->bitfield), 0,
1021             $s->piece_count + 1;
1022             $s->_consider_peer($_)
1023 0         0 for grep { $_->{local_interested} }
  0         0  
1024 0         0 values %{$s->peers};
1025 0         0 $s->_trigger_hash_pass($index);
1026             }
1027             else {
1028 0         0 $s->_trigger_hash_fail($index);
1029              
1030             # XXX - Not sure what to do... I'd
1031             # ban the peers involved and
1032             # try the same piece again.
1033             }
1034 0         0 delete $s->working_pieces->{$index};
1035             }
1036 0         0 $s->_request_pieces($s->peers->{$h});
1037             }
1038             elsif ($packet->{type} == $CANCEL) {
1039 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1040             return # XXX - error callback if this block is not in the queue
1041             if !grep {
1042 0 0 0     0 $_->[0] == $index
1043             && $_->[1] == $offset
1044             && $_->[2] == $length
1045 0 0       0 } @{$s->peers->{$h}{remote_requests}};
  0         0  
1046             $s->peers->{$h}{remote_requests} = [
1047             grep {
1048 0 0 0     0 ($_->[0] != $index)
1049             || ($_->[1] != $offset)
1050             || ($_->[2] != $length)
1051 0         0 } @{$s->peers->{$h}{remote_requests}}
  0         0  
1052             ];
1053             }
1054             elsif ($packet->{type} == $PORT) {
1055              
1056             # Do nothing... as we don't have a DHT node. Yet?
1057             }
1058             elsif ($packet->{type} == $SUGGEST) {
1059 0         0 push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
  0         0  
1060             }
1061             elsif ($packet->{type} == $HAVE_ALL) {
1062 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
1063 0         0 $s->_consider_peer($s->peers->{$h});
1064             $s->peers->{$h}{timeout}
1065 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
1066             }
1067             elsif ($packet->{type} == $HAVE_NONE) {
1068 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
1069             $s->peers->{$h}{timeout}
1070 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1071             }
1072             elsif ($packet->{type} == $REJECT) {
1073 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1074             return # XXX - error callback if this block is not in the queue
1075             if !grep {
1076 0 0 0     0 $_->[0] == $index
1077             && $_->[1] == $offset
1078             && $_->[2] == $length
1079 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
1080 0         0 $s->working_pieces->{$index}{$offset}->[3] = ();
1081             $s->peers->{$h}{local_requests} = [
1082             grep {
1083 0 0 0     0 ($_->[0] != $index)
1084             || ($_->[1] != $offset)
1085             || ($_->[2] != $length)
1086 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
1087             ];
1088             $s->peers->{$h}{timeout}
1089 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1090             }
1091             elsif ($packet->{type} == $ALLOWED_FAST) {
1092 0         0 push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
  0         0  
1093             }
1094             else {
1095             # use Data::Dump qw[pp];
1096             # die 'Unhandled packet: ' . pp $packet;
1097             }
1098             last
1099 0 0 0     0 if 5 > length($h->rbuf // ''); # Min size for protocol
1100             }
1101             }
1102              
1103             sub _send_bitfield {
1104 0     0   0 my ($s, $h) = @_;
1105 0 0       0 if (vec($s->peers->{$h}{reserved}, 7, 1) & 0x04) {
1106 0 0       0 if ($s->seed) {
    0          
1107 0         0 return $s->_send_encrypted($h, build_haveall());
1108             }
1109             elsif ($s->bitfield() !~ m[[^\0]]) {
1110 0         0 return $s->_send_encrypted($h, build_havenone());
1111             }
1112             }
1113              
1114             # XXX - If it's cheaper to send HAVE packets than a full BITFIELD, do it
1115 0         0 $s->_send_encrypted($h, build_bitfield($s->bitfield));
1116             }
1117              
1118             sub _broadcast {
1119 0     0   0 my ($s, $data, $qualifier) = @_;
1120 0   0 0   0 $qualifier //= sub {1};
  0         0  
1121             $s->_send_encrypted($_->{handle}, $data)
1122 0         0 for grep { $qualifier->() } values %{$s->peers};
  0         0  
  0         0  
1123             }
1124              
1125             sub _consider_peer { # Figure out whether or not we find a peer interesting
1126 0     0   0 my ($s, $p) = @_;
1127 0 0       0 return if $s->state ne 'active';
1128 0 0       0 return if $s->complete;
1129 0         0 my $relevence = $p->{bitfield} & $s->wanted;
1130 0 0       0 my $interesting
1131             = (
1132             index(substr(unpack('b*', $relevence), 0, $s->piece_count + 1), 1, 0)
1133             != -1) ? 1 : 0;
1134 0 0       0 if ($interesting) {
1135 0 0       0 if (!$p->{local_interested}) {
1136 0         0 $p->{local_interested} = 1;
1137 0         0 $s->_send_encrypted($p->{handle}, build_interested());
1138             }
1139             }
1140             else {
1141 0 0       0 if ($p->{local_interested}) {
1142 0         0 $p->{local_interested} = 0;
1143 0         0 $s->_send_encrypted($p->{handle}, build_not_interested());
1144             }
1145             }
1146             }
1147             has working_pieces => (is => 'ro',
1148             lazy => 1,
1149             isa => HashRef,
1150             init_arg => undef,
1151             default => sub { {} }
1152             );
1153              
1154             sub _file_to_range {
1155 4     4   7 my ($s, $file) = @_;
1156 4         6 my $start = 0;
1157 4         14 for (0 .. $file - 1) {
1158 0         0 $start += $s->files->[$_]->{length};
1159             }
1160 4         55 my $end = $start + $s->files->[$file]->{length};
1161 4         27 $start = $start / $s->piece_length;
1162 4         27 $end = $end / $s->piece_length;
1163 4 50       212 (int($start) .. int $end + ($end != int($end) ? 0 : +1));
1164             }
1165              
1166             sub _request_pieces {
1167 0     0   0 my ($s, $p) = @_;
1168 0 0       0 return if $s->state ne 'active';
1169 0 0       0 weaken $p unless isweak $p;
1170 0   0     0 $p // return;
1171 0   0     0 $p->{handle} // return;
1172 0         0 my @indexes;
1173 0 0       0 if (scalar keys %{$s->working_pieces} < 10) { # XXX - Max working pieces
  0         0  
1174 0         0 for my $findex (0 .. $#{$s->files}) {
  0         0  
1175 0         0 for my $index ($s->_file_to_range($findex)) {
1176             push @indexes, map {
1177 0 0 0     0 vec($p->{bitfield}, $index, 1)
1178             && !vec($s->bitfield, $index, 1) ?
1179             $index
1180             : ()
1181 0         0 } 1 .. $s->{files}[$findex]{priority};
1182             }
1183             }
1184             }
1185             else {
1186 0         0 @indexes = keys %{$s->working_pieces};
  0         0  
1187             }
1188 0 0       0 return if !@indexes;
1189 0         0 my $index = $indexes[rand @indexes]; # XXX - Weighted random/Rarest first
1190 0 0       0 my $piece_size
1191             = $index == $s->piece_count ?
1192             $s->size % $s->piece_length
1193             : $s->piece_length;
1194 0         0 my $block_count = $piece_size / $block_size;
1195 0 0       0 my @offsets = map { $_ * $block_size }
  0         0  
1196             0 .. $block_count - ((int($block_count) == $block_count) ? 1 : 0);
1197 0   0     0 $s->working_pieces->{$index} //= {map { $_ => undef } @offsets};
  0         0  
1198 0         0 my @unrequested = sort { $a <=> $b }
1199             grep { # XXX - If there are no unrequested blocks, pick a new index
1200 0         0 (!ref $s->working_pieces->{$index}{$_})
1201             || ( (!defined $s->working_pieces->{$index}{$_}[4])
1202 0   0     0 && (!defined $s->working_pieces->{$index}{$_}[3]))
1203             } @offsets;
1204 0         0 for (scalar @{$p->{local_requests}} .. 12) {
  0         0  
1205 0         0 my $offset = shift @unrequested;
1206 0   0     0 $offset // return; # XXX - Start working on another piece
1207 0 0       0 my $_block_size
1208             = ($offset == $offsets[-1]) ?
1209             $piece_size % $block_size
1210             : $block_size;
1211              
1212             # XXX - Limit to x req per peer (future: based on bandwidth)
1213 0         0 AE::log
1214             trace => 'Requesting %d, %d, %d',
1215             $index, $offset, $_block_size;
1216             $s->_send_encrypted($p->{handle},
1217 0         0 build_request($index, $offset, $_block_size))
1218             ; # XXX - len for last piece
1219             $s->working_pieces->{$index}{$offset} = [
1220             $index, $offset,
1221             $_block_size,
1222             $p, undef,
1223             AE::timer(
1224             60, 0,
1225             sub {
1226 0   0 0   0 $p // return;
1227 0   0     0 $p->{handle} // return;
1228             $s->_send_encrypted($p->{handle},
1229 0         0 build_cancel($index, $offset, $_block_size));
1230 0         0 $s->working_pieces->{$index}{$offset}[3] = ();
1231             $p->{local_requests} = [
1232             grep {
1233 0 0 0     0 $_->[0] != $index
1234             || $_->[1] != $offset
1235             || $_->[2] != $_block_size
1236 0         0 } @{$p->{local_requests}}
  0         0  
1237             ];
1238             $p->{timeout} = AE::timer(45, 0,
1239 0         0 sub { $s->_del_peer($p->{handle}) });
  0         0  
1240              
1241             #$s->_request_pieces( $p) # XXX - Ask a different peer
1242             }
1243             )
1244 0         0 ];
1245             weaken($s->working_pieces->{$index}{$offset}[3])
1246 0 0       0 unless isweak($s->working_pieces->{$index}{$offset}[3]);
1247 0         0 push @{$p->{local_requests}}, [$index, $offset, $_block_size];
  0         0  
1248             }
1249             }
1250              
1251             # Cheap callback system
1252             has on_hash_pass => (
1253             is => 'rw',
1254             isa => CodeRef,
1255             default => sub {
1256             sub { !!1 }
1257             },
1258             clearer => '_no_hash_pass'
1259             );
1260 9     9   159 sub _trigger_hash_pass { shift->on_hash_pass()->(@_) }
1261             has on_hash_fail => (
1262             is => 'rw',
1263             isa => CodeRef,
1264             default => sub {
1265             sub { !!1 }
1266             },
1267             clearer => '_no_hash_fail'
1268             );
1269 6303     6303   119570 sub _trigger_hash_fail { shift->on_hash_fail()->(@_) }
1270             #
1271             has state => (is => 'ro',
1272             isa => Enum [qw[active stopped paused]],
1273             writer => '_set_state',
1274             default => sub {'active'}
1275             );
1276              
1277             sub stop {
1278 7     7 1 3950 my $s = shift;
1279 7         24 AE::log debug => 'Stopping...';
1280 7 100       254 return if $s->state eq 'stopped';
1281 3         10 AE::log trace => 'Announcing "stopped" event to trackers...';
1282 3         76 $s->announce('stopped');
1283 3         8333 AE::log trace => 'Disconnecting peers...';
1284 3         110 $s->_clear_peers;
1285 3         1270 AE::log trace => 'Stopping new peers ticker...';
1286 3         99 $s->_clear_peer_timer;
1287 3         890 AE::log trace => 'Closing files...';
1288 3         91 $s->_open($_, 'c') for 0 .. $#{$s->files};
  3         59  
1289 3         32 AE::log trace => 'Setting internal status...';
1290 3         121 $s->_set_state('stopped');
1291             }
1292              
1293             sub start {
1294 4     4 1 6 my $s = shift;
1295 4         11 AE::log debug => 'Starting...';
1296 4 50       119 $s->announce('started') unless $s->state eq 'active';
1297 4         27 $s->peers;
1298 4         96 AE::log trace => 'Starting new peers ticker...';
1299 4         137 $s->_peer_timer;
1300 4         67 AE::log trace => 'Setting internal status...';
1301 4         176 $s->_set_state('active');
1302             }
1303              
1304             sub pause {
1305 0     0 1 0 my $s = shift;
1306 0         0 AE::log debug => 'Pausing...';
1307 0         0 $s->peers;
1308 0         0 AE::log trace => 'Starting new peers ticker...';
1309 0         0 $s->_peer_timer;
1310 0         0 AE::log trace => 'Setting internal status...';
1311 0         0 $s->_set_state('paused');
1312             }
1313             #
1314             sub BUILD {
1315 4     4 0 80 my ($s, $a) = @_;
1316 4         11 AE::log debug => 'BUILD()';
1317 4 50 33     133 $s->start && AE::log debug => 'Calling ->start()'
1318             if $s->state eq 'active';
1319 4 50 0     1483 $s->paused && AE::log debug => 'Calling ->paused() '
1320             if $s->state eq 'paused';
1321             }
1322              
1323             # Testing stuff goes here
1324             sub _send_encrypted {
1325 0     0     my ($s, $h, $packet) = @_;
1326 0 0         return if !$h; # XXX - $s->_del_peer($p->{handle})
1327             # XXX - Currently doesn't do anything and may never do anything
1328             AE::log trace => sub {
1329 0     0     require Data::Dump;
1330 0           'Outgoing packet: ' . Data::Dump::dump($packet);
1331 0           };
1332 0           return $h->push_write($packet);
1333             }
1334              
1335             sub _send_handshake {
1336 0     0     my ($s, $h) = @_;
1337 0           AE::log trace => 'Outgoing handshake';
1338              
1339             # XXX - Send encrypted handshake if encryption status is unknown or true
1340 0           $h->push_write(build_handshake($s->reserved, $s->infohash, $s->peerid));
1341             }
1342             1;
1343              
1344             =pod
1345              
1346             =head1 NAME
1347              
1348             AnyEvent::BitTorrent - Yet Another BitTorrent Client Module
1349              
1350             =head1 Synopsis
1351              
1352             use AnyEvent::BitTorrent;
1353             my $client = AnyEvent::BitTorrent->new( path => 'some.torrent' );
1354             AE::cv->recv;
1355              
1356             =head1 Description
1357              
1358             This is a painfully simple BitTorrent client written on a whim that implements
1359             the absolute basics. For a full list of what's currently supported, what you
1360             will likely find in a future version, and what you'll never get from this, see
1361             the section entitled "L"
1362              
1363             =head1 Methods
1364              
1365             The API, much like the module itself, is simple.
1366              
1367             Anything you find by skimming the source is likely not ready for public use
1368             and will be subject to change before C. Here's the public interface as
1369             of this version:
1370              
1371             =head2 C
1372              
1373             my $c = AnyEvent::BitTorrent->new(
1374             path => 'some/legal.torrent',
1375             basedir => './storage/',
1376             port => 6881,
1377             on_hash_pass => sub { ... },
1378             on_hash_fail => sub { ... },
1379             state => 'stopped',
1380             piece_cache => $quick_restore
1381             );
1382              
1383             This constructor understands the following arguments:
1384              
1385             =over
1386              
1387             =item C
1388              
1389             This is the only required parameter. It's the path to a valid .torrent file.
1390              
1391             =item C
1392              
1393             This is the base directory all data will be stored in and/or read from.
1394             Multifile torrents will create another directory below this to store all
1395             files.
1396              
1397             By default, this is the current working directory when
1398             L|/"new( ... )"> is called.
1399              
1400             =item C
1401              
1402             This is the preferred port local host binds and expects incoming peers to
1403             connect to.
1404              
1405             By default, this is a zero; the system will pick a port number randomly.
1406              
1407             =item C
1408              
1409             This is a subroutine called whenever a piece fails to pass
1410             L. The callback is handed the piece's index.
1411              
1412             =item C
1413              
1414             This is a subroutine called whenever a piece passes its
1415             L. The callback is handed the piece's index.
1416              
1417             =item C
1418              
1419             This must be one of the following:
1420              
1421             =over
1422              
1423             =item C
1424              
1425             This is the default. The client will attempt to create new connections, make
1426             and fill requests, etc. This is normal client behavior.
1427              
1428             =item C
1429              
1430             In this state, connections will be made and accepted but no piece requests
1431             will be made or filled. To resume full, normal behavior, you must call
1432             L|/"start( )">.
1433              
1434             =item C
1435              
1436             Everything is put on hold. No new outgoing connections are attempted and
1437             incoming connections are rejected. To resume full, normal behavior, you must
1438             call L|/"start( )">.
1439              
1440             =back
1441              
1442             =item C
1443              
1444             This is the index list returned by L|/"piece_cache( )"> in a
1445             previous instance. Using this should make a complete resume system a trivial
1446             task.
1447              
1448             =back
1449              
1450             =head2 C
1451              
1452             This method expects...
1453              
1454             =over
1455              
1456             =item ...a list of integers. You could use this to check a range of pieces (a
1457             single file, for example).
1458              
1459             $client->hashcheck( 1 .. 5, 34 .. 56 );
1460              
1461             =item ...a single integer. Only that specific piece is checked.
1462              
1463             $client->hashcheck( 17 );
1464              
1465             =item ...nothing. All data related to this torrent will be checked.
1466              
1467             $client->hashcheck( );
1468              
1469             =back
1470              
1471             As pieces pass or fail, your C and C callbacks are
1472             triggered.
1473              
1474             =head2 C
1475              
1476             Sends a 'started' event to trackers and starts performing as a client is
1477             expected. New connections are made and accepted, requests are made and filled,
1478             etc.
1479              
1480             =head2 C
1481              
1482             Sends a stopped event to trackers, closes all connections, stops attempting
1483             new outgoing connections, rejects incoming connections and closes all open
1484             files.
1485              
1486             =head2 C
1487              
1488             The client remains mostly active; new connections will be made and accepted,
1489             etc. but no requests will be made or filled while the client is paused.
1490              
1491             =head2 C
1492              
1493             Returns the 20-byte SHA1 hash of the value of the info key from the metadata
1494             file.
1495              
1496             =head2 C
1497              
1498             Returns the 20 byte string used to identify the client. Please see the
1499             L below.
1500              
1501             =head2 C
1502              
1503             Returns the port number the client is listening on.
1504              
1505             =head2 C
1506              
1507             Returns the total size of all L described in the torrent's
1508             metadata.
1509              
1510             =head2 C
1511              
1512             Returns the UTF-8 encoded string the metadata suggests we save the file (or
1513             directory, in the case of multi-file torrents) under.
1514              
1515             =head2 C
1516              
1517             Returns the total amount uploaded to remote peers.
1518              
1519             =head2 C
1520              
1521             Returns the total amount downloaded from other peers.
1522              
1523             =head2 C
1524              
1525             Returns the approximate amount based on the pieces we still
1526             L multiplied by the L.
1527              
1528             =head2 C
1529              
1530             Returns the number of bytes in each piece the file or files are split into.
1531             For the purposes of transfer, files are split into fixed-size pieces which are
1532             all the same length except for possibly the last one which may be truncated.
1533              
1534             =head2 C
1535              
1536             Returns a packed binary string in ascending order (ready for C). Each
1537             index that the client has is set to one and the rest are set to zero.
1538              
1539             =head2 C
1540              
1541             Returns a packed binary string in ascending order (ready for C). Each
1542             index that the client has or simply does not want is set to zero and the rest
1543             are set to one.
1544              
1545             This value is calculated every time the method is called. Keep that in mind.
1546              
1547             =head2 C
1548              
1549             Returns true if we have downloaded everything we L which
1550             is not to say that we have all data and can L.
1551              
1552             =head2 C
1553              
1554             Returns true if we have all data related to the torrent.
1555              
1556             =head2 C
1557              
1558             Returns a list of hash references with the following keys:
1559              
1560             =over
1561              
1562             =item C
1563              
1564             Which is the size of file in bytes.
1565              
1566             =item C
1567              
1568             Which is the absolute path of the file.
1569              
1570             =item C
1571              
1572             Download priority for this file. By default, all files have a priority of
1573             C<1>. There is no built in scale; the higher the priority, the better odds a
1574             piece from it will be downloaded first. Setting a file's priority to C<1000>
1575             while the rest are still at C<1> will likely force the file to complete before
1576             any other file is started.
1577              
1578             We do not download files with a priority of zero.
1579              
1580             =back
1581              
1582             =head2 C
1583              
1584             Returns the list of currently connected peers. The organization of these peers
1585             is not yet final so... don't write anything you don't expect to break before
1586             we hit C.
1587              
1588             =head2 C
1589              
1590             Returns C if the client is L, C if client
1591             is L, and C if the client is currently
1592             L.
1593              
1594             =head2 C
1595              
1596             Pieces which overlap files with zero priority are stored in a part file which
1597             is indexed internally. To save this index (for resume, etc.) store the values
1598             returned by this method and pass it to L.
1599              
1600             =head2 C
1601              
1602             Returns a list of hashes, each representing a single tier of trackers as
1603             defined by L. The hashes contain the
1604             following keys:
1605              
1606             =over
1607              
1608             =item C
1609              
1610             The is a count of complete peers (seeds) as returned by the most recent
1611             announce.
1612              
1613             =item C
1614              
1615             This is a running total of the number of failed announces we've had in a row.
1616             This value is reset when we have a successful announce.
1617              
1618             =item C
1619              
1620             The is a count of incomplete peers (leechers) as returned by the most recent
1621             announce.
1622              
1623             =item C
1624              
1625             Which is a compact collection of IPv4 peers returned by the tracker. See
1626             L.
1627              
1628             =item C
1629              
1630             Which is a compact collection of IPv6 peers returned by the tracker. See
1631             L.
1632              
1633             =item C
1634              
1635             Which is a list of URLs.
1636              
1637             =back
1638              
1639             =head1 This Module is Lame!
1640              
1641             Yeah, I said it.
1642              
1643             There are a few things a BitTorrent client must implement (to some degree) in
1644             order to interact with other clients in a modern day swarm.
1645             L is meant to meet that bare
1646             minimum but it's based on L so you could always subclass it to add more
1647             advanced functionality. Hint, hint!
1648              
1649             =head2 What is currently supported?
1650              
1651             Basic stuff. We can make and handle piece requests. Deal with cancels,
1652             disconnect idle peers, unchoke folks, fast extensions, file download
1653             priorities. Normal... stuff. HTTP trackers.
1654              
1655             =head2 What will probably be supported in the future?
1656              
1657             DHT (which will likely be in a separate dist), IPv6 stuff... I'll get around
1658             to those.
1659              
1660             Long term, UDP trackers may be supported.
1661              
1662             For a detailed list, see the TODO file included with this distribution.
1663              
1664             =head2 What will likely never be supported?
1665              
1666             We can't have nice things. Protocol encryption, uTP, endgame tricks, ...these
1667             will probably never be included in L.
1668              
1669             =head2 What should I use instead?
1670              
1671             If you're reading all of this with a scowl, there are many alternatives to
1672             this module, most of which are sure to be better suited for advanced users. I
1673             suggest (in no particular order):
1674              
1675             =over
1676              
1677             =item L. It's written in Perl but you'll
1678             still need to be on a Linux, *BSD, et al. system to use it.
1679              
1680             =item L ...in the future. I I suggest using either
1681             the current stable or unstable versions found on CPAN. The next version is
1682             being worked on and will be based on L.
1683              
1684             =back
1685              
1686             If you're working on a Perl based client and would like me to link to it, send
1687             a bug report to the tracker L.
1688              
1689             =head1 Subclassing AnyEvent::BitTorrent
1690              
1691             TODO
1692              
1693             If you subclass this module and change the way it functions to that which in
1694             any way proves harmful to individual peers or the swarm at large, rather than
1695             damage L's reputation, override the peerid attribute.
1696             Thanks.
1697              
1698             =head1 PeerID Specification
1699              
1700             L may be identified in a swarm by its peer id. As of
1701             this version, our peer id is in 'Azureus style' with a single digit for the
1702             Major version, two digits for the minor version, and a single character to
1703             indicate stability (stable releases marked with C, unstable releases marked
1704             with C). It looks sorta like:
1705              
1706             -AB110S- Stable v1.10.0 relese (typically found on CPAN, tagged in repo)
1707             -AB110U- Unstable v1.10.X release (private builds, early testing, etc.)
1708              
1709             =head1 Bug Reports
1710              
1711             If email is better for you, L but I
1712             would rather have bugs sent through the issue tracker found at
1713             http://github.com/sanko/anyevent-bittorrent/issues.
1714              
1715             Please check the ToDo file included with this distribution in case your bug
1716             is already known (...I probably won't file bug reports to myself).
1717              
1718             =head1 See Also
1719              
1720             L - The package which does all of the wire protocol
1721             level heavy lifting.
1722              
1723             =head1 Author
1724              
1725             Sanko Robinson - http://sankorobinson.com/
1726              
1727             CPAN ID: SANKO
1728              
1729             =head1 License and Legal
1730              
1731             Copyright (C) 2011-2016 by Sanko Robinson
1732              
1733             This program is free software; you can redistribute it and/or modify it under
1734             the terms of
1735             L.
1736             See the F file included with this distribution or
1737             L
1738             for clarification.
1739              
1740             When separated from the distribution, all original POD documentation is
1741             covered by the
1742             L.
1743             See the
1744             L.
1745              
1746             Neither this module nor the L is affiliated with BitTorrent,
1747             Inc.
1748              
1749             =cut