File Coverage

blib/lib/AnyEvent/BitTorrent.pm
Criterion Covered Total %
statement 193 617 31.2
branch 28 248 11.2
condition 13 108 12.0
subroutine 45 99 45.4
pod 10 13 76.9
total 289 1085 26.6


line stmt bran cond sub pod time code
1             package AnyEvent::BitTorrent;
2 4     4   19545 use AnyEvent;
  4         3663  
  4         87  
3 4     4   2416 use AnyEvent::Handle;
  4         55178  
  4         116  
4 4     4   2032 use AnyEvent::Socket;
  4         39645  
  4         347  
5 4     4   2225 use AnyEvent::HTTP;
  4         24195  
  4         280  
6 4     4   1704 use Moo;
  4         35905  
  4         20  
7 4     4   6363 use Type::Tiny;
  4         50446  
  4         114  
8 4     4   1830 use Types::Standard qw[ArrayRef CodeRef Enum HashRef Int Ref Str];
  4         132181  
  4         32  
9 4     4   3631 use Fcntl qw[/SEEK_/ /O_/ :flock];
  4         7  
  4         1479  
10 4     4   1745 use Digest::SHA qw[sha1];
  4         9303  
  4         234  
11 4     4   45 use File::Spec;
  4         5  
  4         67  
12 4     4   11 use File::Path;
  4         4  
  4         181  
13 4     4   1565 use Net::BitTorrent::Protocol qw[:all];
  4         45187  
  4         1601  
14 4     4   22 use Scalar::Util qw[/weak/];
  4         4  
  4         28108  
15             #
16             our $VERSION = "1.0.0";
17             #
18             # XXX - These should be ro attributes w/o init args:
19             my $block_size = 2**14;
20              
21             # Custom types
22             my $FILE = Type::Tiny->new(name => 'File',
23             parent => Str,
24             constraint => sub { -f $_ },
25             message => sub {"$_ isn't an existing file"},
26             );
27             my $RESERVED = Type::Tiny->new(name => 'Reserved',
28             parent => Str,
29             constraint => sub { length $_ == 8 },
30             message => sub {'reserved data is malformed'}
31             );
32             my $PEERID = Type::Tiny->new(
33             name => 'PeerID',
34             parent => Str,
35             constraint => sub { length $_ == 20 },
36             message => sub {
37             'Peer ID must be 20 chars in length';
38             }
39             );
40             my $INFOHASH = Type::Tiny->new(
41             name => 'Infohash',
42             parent => Str,
43             constraint => sub { length $_ == 20 },
44             message => sub {
45             'Infohashes are 20 bytes in length';
46             }
47             );
48             #
49             has port => (is => 'ro',
50             isa => Int,
51             default => sub {0},
52             writer => '_set_port'
53             );
54             has socket => (is => 'ro',
55             isa => Ref,
56             init_arg => undef,
57             predicate => '_has_socket',
58             builder => '_build_socket'
59             );
60              
61             sub _build_socket {
62 4     4   99 my $s = shift;
63             tcp_server undef, $s->port, sub {
64 0     0   0 my ($fh, $host, $port) = @_;
65 0         0 AE::log info => 'Accepted connection from %s:%d', $host, $port;
66 0 0       0 return $fh->destroy if $s->state eq 'stopped';
67             my $handle = AnyEvent::Handle->new(
68             fh => $fh,
69             on_error => sub {
70 0         0 my ($hdl, $fatal, $msg) = @_;
71              
72             # XXX - callback
73 0         0 AE::log error => 'Socket error: ' . $msg;
74 0         0 $s->_del_peer($hdl);
75             },
76             on_eof => sub {
77 0         0 my $h = shift;
78 0         0 AE::log info => 'Socket EOF';
79 0         0 $s->_del_peer($h);
80             },
81             on_read => sub {
82 0         0 AE::log debug => 'Read Socket';
83 0         0 $s->_on_read_incoming(@_);
84             }
85 0         0 );
86 0         0 $s->_add_peer($handle);
87             }, sub {
88 4     4   578 my ($fh, $thishost, $thisport) = @_;
89 4         31 $s->_set_port($thisport);
90 4         1520 AE::log info => "bound to $thishost, port $thisport";
91 4         61 };
92             }
93             has path => (is => 'ro',
94             isa => $FILE,
95             required => 1
96             );
97             has reserved => (is => 'ro',
98             builder => '_build_reserved',
99             isa => $RESERVED
100             );
101              
102             sub _build_reserved {
103 4     4   65 my $reserved = "\0" x 8;
104              
105             #vec($reserved, 5, 8) = 0x10; # Ext Protocol
106 4         10 vec($reserved, 7, 8) = 0x04; # Fast Ext
107 4         16 AE::log debug => '_build_reserved() => ' . $reserved;
108 4         29444 $reserved;
109             }
110             has peerid => (is => 'ro',
111             isa => $PEERID,
112             init_arg => undef,
113             required => 1,
114             builder => '_build_peerid'
115             );
116              
117             sub _build_peerid {
118             return pack(
119             'a20',
120             (sprintf(
121             '-AB%01d%01d%01d%1s-%7s%-5s',
122             ($VERSION =~ m[^v?(\d+)\.(\d+)\.(\d+)]),
123             ($VERSION =~ m[[^\d\.^v]] ? 'U' : 'S'),
124             (join '',
125             map {
126 4 50   4   135 ['A' .. 'Z', 'a' .. 'z', 0 .. 9, qw[- . _ ~]]->[rand(66)]
  28         334  
127             } 1 .. 7
128             ),
129             [qw[KaiLi April Aaron]]->[rand 3]
130             )
131             )
132             );
133             }
134             has bitfield => (is => 'ro',
135             lazy => 1,
136             builder => '_build_bitfield',
137             isa => Str,
138             init_arg => undef,
139             );
140 4     4   1396 sub _build_bitfield { pack 'b*', "\0" x shift->piece_count }
141              
142             sub wanted {
143 4     4 1 30 my $s = shift;
144 4         5 my $wanted = '';
145 4         5 for my $findex (0 .. $#{$s->files}) {
  4         43  
146 4         126 my $prio = !!$s->files->[$findex]{priority};
147 4         29 for my $index ($s->_file_to_range($findex)) {
148 8416   66     141268 vec($wanted, $index, 1) = $prio && !vec($s->bitfield, $index, 1);
149             }
150             }
151 4         203 AE::log debug => '->wanted() => ' . unpack 'b*', $wanted;
152 4         201 $wanted;
153             }
154              
155             sub complete {
156 0     0 1 0 my $s = shift;
157 0         0 -1 == index substr(unpack('b*', $s->wanted), 0, $s->piece_count + 1), 1;
158             }
159              
160             sub seed {
161 0     0 1 0 my $s = shift;
162 0         0 -1 == index substr(unpack('b*', $s->bitfield), 0, $s->piece_count + 1), 0;
163             }
164              
165             sub _left {
166 4     4   6 my $s = shift;
167 4         50 $s->piece_length * scalar grep {$_} split '',
  8416         5726  
168             substr unpack('b*', $s->wanted), 0, $s->piece_count + 1;
169             }
170             has $_ => (is => 'ro',
171             isa => Int,
172             default => sub {0},
173             writer => '_set_' . $_
174             ) for qw[uploaded downloaded];
175             has infohash => (is => 'ro',
176             lazy => 1,
177             builder => '_build_infohash',
178             isa => $INFOHASH,
179             init_arg => undef
180             );
181 3     3   1340 sub _build_infohash { sha1(bencode(shift->metadata->{info})) }
182             has metadata => (is => 'ro',
183             lazy_build => 1,
184             builder => '_build_metadata',
185             lazy => 1,
186             isa => HashRef,
187             init_arg => undef
188             );
189              
190             sub _build_metadata {
191 4     4   1366 my $s = shift;
192              
193             #return if ref $s ne __PACKAGE__; # Applying roles makes deep rec
194 4         178 open my $fh, '<', $s->path;
195 4         156 sysread $fh, my $raw, -s $fh;
196 4         21 my $metadata = bdecode $raw;
197             AE::log debug => sub {
198 0     0   0 require Data::Dump;
199 0         0 '_build_metadata() => ' . Data::Dump::dump($metadata);
200 4         2499 };
201 4         344 $metadata;
202             }
203 5     5 1 2810 sub name { shift->metadata->{info}{name} }
204 6312     6312 1 89373 sub pieces { shift->metadata->{info}{pieces} }
205 25271     25271 1 349794 sub piece_length { shift->metadata->{info}{'piece length'} }
206              
207             sub piece_count {
208 12635     12635 0 9567 my $s = shift;
209 12635         158668 my $count = $s->size / $s->piece_length;
210 12635 50       88855 int($count) + (($count == int $count) ? 1 : 0);
211             }
212             has basedir => (
213             is => 'ro',
214             lazy => 1,
215             isa => Str,
216             required => 1,
217             default => sub { File::Spec->rel2abs(File::Spec->curdir) },
218             trigger => sub {
219             my ($s, $n, $o) = @_;
220             $o // return;
221             $s->_clear_files; # So they can be rebuilt with the new basedir
222             }
223             );
224             has files => (is => 'ro',
225             lazy => 1,
226             builder => '_build_files',
227             isa => ArrayRef [HashRef],
228             init_arg => undef,
229             clearer => '_clear_files'
230             );
231              
232             sub _build_files {
233 4     4   1402 my $s = shift;
234             defined $s->metadata->{info}{files} ?
235             [
236             map {
237             {priority => 1,
238             fh => undef,
239             mode => 'c',
240             length => $_->{length},
241             timeout => undef,
242             path =>
243             File::Spec->rel2abs(
244 0         0 File::Spec->catfile($s->basedir, $s->name, @{$_->{path}})
  0         0  
245             )
246             }
247 0         0 } @{$s->metadata->{info}{files}}
248             ]
249             : [
250             {priority => 1,
251             fh => undef,
252             mode => 'c',
253             length => $s->metadata->{info}{length},
254 4 50       40 timeout => undef,
255             path =>
256             File::Spec->rel2abs(File::Spec->catfile($s->basedir, $s->name))
257             }
258             ];
259             }
260             has size => (is => 'ro',
261             lazy => 1,
262             builder => '_build_size',
263             isa => Int,
264             init_arg => undef
265             );
266              
267             sub _build_size {
268 4     4   1472 my $s = shift;
269 4         6 my $ret = 0;
270 4         5 $ret += $_->{length} for @{$s->files};
  4         37  
271 4         227 AE::log debug => '_build_size() => ' . $ret;
272 4         169 $ret;
273             }
274              
275             sub _open {
276 3     3   35 my ($s, $i, $m) = @_;
277             AE::log
278             trace => 'Opening file #%d (%s) for %s',
279 3         41 $i, $s->files->[$i]->{path}, $m;
280 3 50       127 return 1 if $s->files->[$i]->{mode} eq $m;
281 0 0       0 if (defined $s->files->[$i]->{fh}) {
282 0         0 AE::log trace => 'Closing %s', $s->files->[$i]->{fh};
283 0         0 flock $s->files->[$i]->{fh}, LOCK_UN;
284 0         0 close $s->files->[$i]->{fh};
285 0         0 $s->files->[$i]->{fh} = ();
286             }
287 0 0       0 if ($m eq 'r') {
    0          
    0          
288 0         0 AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
289 0 0       0 sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
290             || return;
291 0 0       0 flock($s->files->[$i]->{fh}, LOCK_SH) || return;
292 0 0       0 weaken $s unless isweak $s;
293 0         0 my $x = $i;
294             $s->files->[$x]->{timeout}
295 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
296             }
297             elsif ($m eq 'w') {
298 0         0 AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
299 0         0 my @split = File::Spec->splitdir($s->files->[$i]->{path});
300 0         0 pop @split; # File name itself
301 0         0 my $dir = File::Spec->catdir(@split);
302 0 0       0 File::Path::mkpath($dir) if !-d $dir;
303             sysopen($s->files->[$i]->{fh},
304             $s->files->[$i]->{path},
305 0 0       0 O_WRONLY | O_CREAT)
306             || return;
307 0         0 flock $s->files->[$i]->{fh}, LOCK_EX;
308             truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
309             if -s $s->files->[$i]->{fh}
310 0 0       0 != $s->files->[$i]->{length}; # XXX - pre-allocate files
311 0 0       0 weaken $s unless isweak $s;
312 0         0 my $x = $i;
313             $s->files->[$x]->{timeout}
314 0   0 0   0 = AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
315             }
316 0         0 elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
317 0         0 else {return}
318 0         0 return $s->files->[$i]->{mode} = $m;
319             }
320             has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });
321              
322             sub _cache_path {
323 0     0   0 my $s = shift;
324             File::Spec->catfile($s->basedir,
325 0 0       0 (scalar @{$s->files} == 1 ? () : $s->name),
  0         0  
326             '~ABPartFile_-'
327             . uc(substr(unpack('H*', $s->infohash), 0, 10))
328             . '.dat'
329             );
330             }
331              
332             sub _write_cache {
333 0     0   0 my ($s, $f, $o, $d) = @_;
334 0         0 my $path = $s->_cache_path;
335 0         0 AE::log
336             debug =>
337             'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
338             length($d), $path, $f, $o;
339 0         0 my @split = File::Spec->splitdir($path);
340 0         0 pop @split; # File name itself
341 0         0 my $dir = File::Spec->catdir(@split);
342 0 0       0 File::Path::mkpath($dir) if !-d $dir;
343 0 0       0 sysopen(my ($fh), $path, O_WRONLY | O_CREAT)
344             || return;
345 0         0 flock $fh, LOCK_EX;
346 0         0 my $pos = sysseek $fh, 0, SEEK_CUR;
347 0         0 my $w = syswrite $fh, $d;
348 0         0 flock $fh, LOCK_UN;
349 0         0 close $fh;
350 0         0 $s->piece_cache->{$f}{$o} = $pos;
351 0         0 AE::log debug => 'Wrote %d bytes to cache file', $w;
352 0         0 return $w;
353             }
354              
355             sub _read_cache {
356 6312     6312   6646 my ($s, $f, $o, $l) = @_;
357 6312   50     633288 $s->piece_cache->{$f} // return;
358 0   0     0 $s->piece_cache->{$f}{$o} // return;
359 0         0 my $path = $s->_cache_path;
360 0         0 AE::log
361             debug =>
362             'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
363             $l, $path, $f, $o;
364 0 0       0 sysopen(my ($fh), $path, O_RDONLY)
365             || return;
366 0         0 flock $fh, LOCK_SH;
367 0         0 sysseek $fh, $s->piece_cache->{$f}{$o}, SEEK_SET;
368 0         0 my $w = sysread $fh, my ($d), $l;
369 0         0 flock $fh, LOCK_UN;
370 0         0 close $fh;
371 0         0 return $d;
372             }
373              
374             sub _read {
375 6312     6312   28120 my ($s, $index, $offset, $length) = @_;
376 6312         10622 AE::log
377             debug =>
378             'Attempting to read %d bytes from piece %d starting at %d bytes',
379             $length, $index, $offset;
380 6312         152711 my $data = '';
381 6312         4399 my $file_index = 0;
382 6312         7060 my $total_offset = ($index * $s->piece_length) + $offset;
383             SEARCH:
384 6312         100840 while ($total_offset > $s->files->[$file_index]->{length}) {
385 0         0 $total_offset -= $s->files->[$file_index]->{length};
386 0         0 $file_index++;
387 0         0 AE::log
388             trace =>
389             'Searching for location... $total_offset = %d, $file_index = %d',
390             $total_offset, $file_index;
391             last SEARCH # XXX - return?
392 0 0       0 if not defined $s->files->[$file_index]->{length};
393             }
394 6312   33     47814 READ: while ((defined $length) && ($length > 0)) {
395             my $this_read
396             = (
397             ($total_offset + $length) >= $s->files->[$file_index]->{length})
398             ?
399 6312 100       78208 ($s->files->[$file_index]->{length} - $total_offset)
400             : $length;
401             AE::log
402             trace =>
403             'Attempting to read %d bytes from file #%d (%s), starting at %d',
404             $this_read,
405 6312         95410 $file_index, $s->files->[$file_index]->{path}, $total_offset;
406 6312 50 33     244489 if ( (!-f $s->files->[$file_index]->{path})
407             || (!$s->_open($file_index, 'r')))
408 6312   33     95361 { $data .= $s->_read_cache($file_index, $total_offset, $this_read)
409             // ("\0" x $this_read);
410 6312         14653 AE::log note => 'Failed to open file. Using null chars instead.';
411             }
412             else {
413 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
414 0         0 sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
415 0 0       0 $data .= $_data if $_data;
416 0         0 AE::log
417             trace =>
418             'Read %d bytes of data from file (%d bytes collected so far)',
419             length $_data, length $data;
420 0 0       0 weaken $s unless isweak $s;
421 0         0 my $x = $file_index;
422             $s->files->[$x]->{timeout}
423 0   0 0   0 = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
424             }
425 6312         175837 $file_index++;
426 6312         6013 $length -= $this_read;
427 6312         9267 AE::log
428             trace => 'Still need to read %d bytes',
429             $length;
430 6312 50       239754 last READ if not defined $s->files->[$file_index];
431 0         0 $total_offset = 0;
432             }
433 6312         39322 AE::log trace => 'Returning %d bytes of data', length $data;
434 6312         148313 return $data;
435             }
436              
437             sub _write {
438 0     0   0 my ($s, $index, $offset, $data) = @_;
439 0         0 AE::log
440             debug =>
441             'Attempting to write %d bytes from piece %d starting at %d bytes',
442             length($data), $index, $offset;
443 0         0 my $file_index = 0;
444 0   0     0 my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
445 0         0 AE::log
446             debug => '...calculated offset == %d',
447             $total_offset;
448             SEARCH:
449 0         0 while ($total_offset > $s->files->[$file_index]->{length}) {
450 0         0 $total_offset -= $s->files->[$file_index]->{length};
451 0         0 $file_index++;
452 0         0 AE::log
453             trace =>
454             'Searching for location... $total_offset = %d, $file_index = %d',
455             $total_offset, $file_index;
456             last SEARCH # XXX - return?
457 0 0       0 if not defined $s->files->[$file_index]->{length};
458             }
459 0   0     0 WRITE: while ((defined $data) && (length $data > 0)) {
460             my $this_write
461             = (($total_offset + length $data)
462             >= $s->files->[$file_index]->{length})
463             ?
464 0 0       0 ($s->files->[$file_index]->{length} - $total_offset)
465             : length $data;
466             AE::log
467             trace =>
468             'Attempting to write %d bytes from file #%d (%s), starting at %d',
469             $this_write,
470 0         0 $file_index, $s->files->[$file_index]->{path}, $total_offset;
471 0 0       0 if ($s->files->[$file_index]->{priority} == 0) {
472 0         0 $s->_write_cache($file_index, $total_offset, substr $data, 0,
473             $this_write, '');
474 0         0 AE::log trace => 'Wrote data to cache...';
475             }
476             else {
477 0         0 $s->_open($file_index, 'w');
478 0         0 sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
479 0         0 my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
480             $this_write, '';
481 0         0 AE::log
482             trace => 'Wrote %d bytes of data to file (%d bytes left)',
483             $w, length $data;
484 0 0       0 weaken $s unless isweak $s;
485 0         0 my $x = $file_index;
486             $s->files->[$x]->{timeout}
487 0   0 0   0 = AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
  0         0  
  0         0  
488             }
489 0         0 $file_index++;
490 0 0       0 last WRITE if not defined $s->files->[$file_index];
491 0         0 $total_offset = 0;
492             }
493 0         0 return 1;
494             }
495              
496             sub hashcheck (;@) {
497 3     3 1 491 my $s = shift;
498 3 50       17 my @indexes = @_ ? @_ : (0 .. $s->piece_count);
499             AE::log trace => sub {
500 0     0   0 require Data::Dump;
501 0         0 'Hashcheck of : ' . Data::Dump::dump(\@indexes);
502 3         101 };
503 3         111 $s->bitfield; # Makes sure it's built
504 3         84 my $total_size = $s->size;
505 3         16 for my $index (@indexes) {
506 6312 50 33     17617 next if $index < 0 || $index > $s->piece_count;
507 6312 100       8591 my $piece = $s->_read($index,
508             0,
509             $index == $s->piece_count
510             ?
511             $total_size % $s->piece_length
512             : $s->piece_length
513             );
514 6312         10139 my $expected = substr($s->pieces, $index * 20, 20);
515 6312         14923886 my $reality = sha1($piece);
516 6312   66     26610 my $ok = defined($piece)
517             && ($expected eq $reality);
518 6312         18761 vec($s->{bitfield}, $index, 1) = $ok;
519             AE::log trace => sub {
520 0 0   0   0 "Validate piece #%06d %s, Expected: %s\n"
521             . " Reality: %s",
522             $index, ($ok ? 'PASS' : 'FAIL'), unpack('H*', $expected),
523             unpack('H*', $reality);
524 6312         35171 };
525 6312 100       191572 $ok ?
526             $s->_trigger_hash_pass($index)
527             : $s->_trigger_hash_fail($index);
528             }
529             }
530             has peers => (is => 'ro',
531             lazy => 1,
532             isa => HashRef,
533             clearer => '_clear_peers',
534             builder => '_build_peers'
535             );
536 4     4   1474 sub _build_peers { {} }
537              
538             sub _add_peer {
539 0     0   0 my ($s, $h) = @_;
540             $s->{peers}{+$h} = {
541             handle => $h,
542             peerid => '',
543             bitfield => (pack 'b*', "\0" x $s->piece_count),
544             remote_choked => 1,
545             remote_interested => 0,
546             remote_requests => [],
547             local_choked => 1,
548             local_interested => 0,
549             local_requests => [],
550 0     0   0 timeout => AE::timer(20, 0, sub { $s->_del_peer($h) }),
551             keepalive => AE::timer(
552             30, 120,
553             sub {
554 0     0   0 $s->_send_encrypted($h, build_keepalive());
555             }
556 0         0 ),
557              
558             # BEP06
559             local_allowed => [],
560             remote_allowed => [],
561             local_suggest => [],
562             remote_suggest => [],
563             #
564             encryption => '?'
565             };
566             }
567              
568             sub _del_peer {
569 0     0   0 my ($s, $h) = @_;
570 0   0     0 $s->peers->{$h} // return;
571 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
572 0         0 my ($i, $o, $l) = @$req;
573 0         0 $s->working_pieces->{$i}{$o}[3] = ();
574             }
575 0         0 delete $s->peers->{$h};
576 0         0 $h->destroy;
577             }
578             my $shuffle;
579             has trackers => (is => 'ro',
580             lazy => 1,
581             builder => '_build_trackers',
582             isa => ArrayRef [HashRef],
583             init_arg => undef
584             );
585              
586             sub _build_trackers {
587 4     4   2060 my $s = shift;
588             $shuffle //= sub {
589 8     8   9 my $deck = shift; # $deck is a reference to an array
590 8 50       21 return unless @$deck; # must not be empty!
591 8         9 my $i = @$deck;
592 8         21 while (--$i) {
593 0         0 my $j = int rand($i + 1);
594 0         0 @$deck[$i, $j] = @$deck[$j, $i];
595             }
596 4   100     31 };
597             my $trackers = [
598             map {
599             {urls => $_,
600             complete => 0,
601             incomplete => 0,
602             peers => '',
603             peers6 => '',
604             announcer => undef,
605             ticker => AE::timer(
606             1,
607             15 * 60,
608             sub {
609 0 0   0   0 return if $s->state eq 'stopped';
610 0         0 $s->announce('started');
611             }
612 4         173 ),
613             failures => 0
614             }
615             } defined $s->metadata->{announce} ? [$s->metadata->{announce}]
616             : (),
617             defined $s->metadata->{'announce-list'}
618 4 50       49 ? @{$s->metadata->{'announce-list'}}
  0 50       0  
619             : ()
620             ];
621             AE::log trace => sub {
622 0     0   0 require Data::Dump;
623 0         0 '$trackers before shuffle => ' . Data::Dump::dump($trackers);
624 4         18 };
625 4         120 $shuffle->($trackers);
626 4         14 $shuffle->($_->{urls}) for @$trackers;
627             AE::log trace => sub {
628 0     0   0 require Data::Dump;
629 0         0 '$trackers after shuffle => ' . Data::Dump::dump($trackers);
630 4         42 };
631 4         154 $trackers;
632             }
633              
634             sub announce {
635 3     3 0 5 my ($s, $e) = @_;
636 3 50       10 return if $a++ > 10; # Retry attempts
637 3         34 for my $tier (@{$s->trackers}) {
  3         28  
638 4   33     3409 $tier->{announcer} //= $s->_announce_tier($e, $tier);
639             }
640             }
641              
642             sub _announce_tier {
643 4     4   7 my ($s, $e, $tier) = @_;
644 4         6 my @urls = grep {m[^https?://]} @{$tier->{urls}};
  4         18  
  4         9  
645 4 50       14 return if $tier->{failures} > 5;
646 4 50       4 return if $#{$tier->{urls}} < 0; # Empty tier?
  4         10  
647 4 50       17 return if $tier->{urls}[0] !~ m[^https?://.+];
648 4         17 local $AnyEvent::HTTP::USERAGENT
649             = 'AnyEvent::BitTorrent/' . $AnyEvent::BitTorrent::VERSION;
650             my $_url = $tier->{urls}[0] . '?info_hash=' . sub {
651 4     4   87 local $_ = shift;
652 4         17 s/([\W])/"%" . uc(sprintf("%2.2x",ord($1)))/eg;
  56         107  
653 4         57 $_;
654             }
655 4 50       33 ->($s->infohash)
656             . ('&peer_id=' . $s->peerid)
657             . ('&uploaded=' . $s->uploaded)
658             . ('&downloaded=' . $s->downloaded)
659             . ('&left=' . $s->_left)
660             . ('&port=' . $s->port)
661             . '&compact=1'
662             . ($e ? '&event=' . $e : '');
663 4         302 AE::log debug => 'Announce URL: ' . $_url;
664             http_get $_url, sub {
665 0     0   0 my ($body, $hdr) = @_;
666             AE::log trace => sub {
667 0         0 require Data::Dump;
668 0         0 'Announce response: ' . Data::Dump::dump($body, $hdr);
669 0         0 };
670 0         0 $tier->{announcer} = ();
671 0 0       0 if ($hdr->{Status} =~ /^2/) {
672 0         0 my $reply = bdecode($body);
673 0 0       0 if (defined $reply->{'failure reason'}) { # XXX - Callback?
674 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
675 0         0 $s->_announce_tier($e, $tier);
676 0         0 $tier->{'failure reason'} = $reply->{'failure reason'};
677 0         0 $tier->{failures}++;
678             }
679             else { # XXX - Callback?
680 0         0 $tier->{failures} = $tier->{'failure reason'} = 0;
681             $tier->{peers}
682             = compact_ipv4(
683             uncompact_ipv4($tier->{peers} . $reply->{peers}))
684 0 0       0 if $reply->{peers};
685             $tier->{peers6}
686             = compact_ipv6(
687             uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
688 0 0       0 if $reply->{peers6};
689 0         0 $tier->{complete} = $reply->{complete};
690 0         0 $tier->{incomplete} = $reply->{incomplete};
691             $tier->{ticker} = AE::timer(
692             $reply->{interval} // (15 * 60),
693             $reply->{interval} // (15 * 60),
694             sub {
695 0 0       0 return if $s->state eq 'stopped';
696 0         0 $s->_announce_tier($e, $tier);
697             }
698 0   0     0 );
      0        
699             }
700             }
701             else { # XXX - Callback?
702 0         0 $tier->{'failure reason'}
703             = "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
704 0         0 $tier->{failures}++;
705 0         0 push @{$tier->{urls}}, shift @{$tier->{urls}};
  0         0  
  0         0  
706 0         0 $s->_announce_tier($e, $tier);
707             }
708             }
709 4         197 }
710             has _choke_timer => (
711             is => 'bare',
712             isa => Ref,
713             init_arg => undef,
714             required => 1,
715             default => sub {
716             my $s = shift;
717             AE::timer(
718             15, 45,
719             sub {
720             return if $s->state ne 'active';
721             AE::log trace => 'Choke timer...';
722             my @interested
723             = grep { $_->{remote_interested} && $_->{remote_choked} }
724             values %{$s->peers};
725              
726             # XXX - Limit the number of upload slots
727             for my $p (@interested) {
728             $p->{remote_choked} = 0;
729             $s->_send_encrypted($p->{handle}, build_unchoke());
730             AE::log trace => 'Choked %s', $p->{peerid};
731             }
732              
733             # XXX - Send choke to random peer
734             }
735             );
736             }
737             );
738             has _fill_requests_timer => (
739             is => 'bare',
740             isa => Ref,
741             init_arg => undef,
742             required => 1,
743             default => sub {
744             my $s = shift;
745             AE::timer(
746             15, 10,
747             sub { # XXX - Limit by time/bandwidth
748             return if $s->state ne 'active';
749             AE::log trace => 'Request fill timer...';
750             my @waiting
751             = grep { defined && scalar @{$_->{remote_requests}} }
752             values %{$s->peers};
753             return if !@waiting;
754             my $total_sent = 0;
755             while (@waiting && $total_sent < 2**20) {
756             my $p = splice(@waiting, rand @waiting, 1, ());
757             AE::log trace => 'Chosen peer: %s...', $p->{peerid};
758             while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
759             my $req = shift @{$p->{remote_requests}};
760             AE::log
761             trace =>
762             'Filling request i:%d, o:%d, l:%d for %s',
763             @$req;
764              
765             # XXX - If piece is bad locally
766             # if remote supports fast ext
767             # send reject
768             # else
769             # simply return
770             # else...
771             $s->_send_encrypted(
772             $p->{handle},
773             build_piece(
774             $req->[0], $req->[1],
775             $s->_read($req->[0], $req->[1], $req->[2])
776             )
777             );
778             $total_sent += $req->[2];
779             }
780             }
781             $s->_set_uploaded($s->uploaded + $total_sent);
782             }
783             );
784             }
785             );
786             has _peer_timer => (is => 'ro',
787             lazy => 1,
788             isa => Ref,
789             init_arg => undef,
790             clearer => '_clear_peer_timer',
791             builder => '_build_peer_timer'
792             );
793              
794             sub _build_peer_timer {
795 4     4   1356 my $s = shift;
796             AE::timer(
797             1, 15,
798             sub {
799 0 0   0   0 return if !$s->_left;
800 0         0 AE::log trace => 'Attempting to connect to new peer...';
801              
802             # XXX - Initiate connections when we are in Super seed mode?
803             my @cache = map {
804             $_->{peers} ? uncompact_ipv4($_->{peers}) : (),
805             $_->{peers6} ?
806             uncompact_ipv6($_->{peers6})
807 0 0       0 : ()
    0          
808 0         0 } @{$s->trackers};
  0         0  
809 0 0       0 return if !@cache;
810 0         0 for my $i (1 .. @cache) {
811 0 0       0 last if $i > 10; # XXX - Max half open
812             last
813 0 0       0 if scalar(keys %{$s->peers}) > 100; # XXX - Max peers
  0         0  
814 0         0 my $addr = splice @cache, rand $#cache, 1;
815 0         0 $s->_new_peer($addr);
816             }
817             }
818 4         77 );
819             }
820              
821             sub _new_peer {
822 0     0   0 my ($s, $addr) = @_;
823 0         0 AE::log trace => 'Connecting to %s:%d', @$addr;
824 0         0 my $handle;
825             $handle = AnyEvent::Handle->new(
826             connect => $addr,
827 0     0   0 on_prepare => sub {60},
828             on_error => sub {
829 0     0   0 my ($hdl, $fatal, $msg) = @_;
830              
831             # XXX - callback
832 0         0 AE::log
833             error => 'Socket error: %s (Removing peer)',
834             $msg;
835 0         0 $s->_del_peer($hdl);
836             },
837             on_connect_error => sub {
838 0     0   0 my ($hdl, $fatal, $msg) = @_;
839 0         0 $s->_del_peer($hdl);
840              
841             # XXX - callback
842 0 0 0     0 AE::log
843             error => sprintf "%sfatal error (%s)\n",
844             $fatal ? '' : 'non-',
845             $msg // 'Connection timed out';
846 0 0       0 return if !$fatal;
847             },
848             on_connect => sub {
849 0     0   0 my ($h, $host, $port, $retry) = @_;
850 0         0 AE::log
851             trace => 'Connection established with %s:%d',
852             $host, $port;
853 0         0 $s->_add_peer($handle);
854 0         0 $s->_send_handshake($handle);
855             },
856             on_eof => sub {
857 0     0   0 my $h = shift;
858 0         0 AE::log trace => 'EOF from peer';
859 0         0 $s->_del_peer($h);
860             },
861             on_read => sub {
862 0     0   0 $s->_on_read(@_);
863             }
864 0         0 );
865 0         0 return $handle;
866             }
867              
868             sub _on_read_incoming {
869 0     0   0 my ($s, $h) = @_;
870 0   0     0 $h->rbuf // return;
871              
872             # XXX - Handle things if the stream is encrypted
873 0         0 my $packet = parse_packet(\$h->rbuf);
874 0 0       0 return if !$packet;
875             AE::log trace => sub {
876 0     0   0 require Data::Dump;
877 0         0 'Incoming packet: ' . Data::Dump::dump($packet);
878 0         0 };
879 0 0       0 if (defined $packet->{error}) {
    0          
880 0         0 return $s->_del_peer($h);
881             }
882             elsif ($packet->{type} == $HANDSHAKE) {
883 0   0     0 ref $packet->{payload} // return;
884 0 0       0 return if ref $packet->{payload} ne 'ARRAY';
885 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
886             return $s->_del_peer($h)
887 0 0       0 if $packet->{payload}[1] ne $s->infohash;
888 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
889 0         0 $s->_send_handshake($h);
890 0         0 $s->_send_bitfield($h);
891             $s->peers->{$h}{timeout}
892 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
893 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
894 0     0   0 $h->on_read(sub { $s->_on_read(@_) });
  0         0  
895             }
896             else { # This should never happen
897             }
898 0         0 1;
899             }
900              
901             sub _on_read {
902 0     0   0 my ($s, $h) = @_;
903 0         0 while (my $packet = parse_packet(\$h->rbuf)) {
904 0 0       0 last if !$packet;
905             AE::log debug => sub {
906 0     0   0 require Data::Dump;
907 0         0 'Incoming packet: ' . Data::Dump::dump($packet->{error});
908 0         0 };
909 0 0       0 if (defined $packet->{error}) {
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
910 0         0 $s->_del_peer($h);
911 0         0 return;
912             }
913             elsif ($packet->{type} eq $KEEPALIVE) {
914              
915             # Do nothing!
916             }
917             elsif ($packet->{type} == $HANDSHAKE) {
918 0   0     0 ref $packet->{payload} // return;
919 0         0 $s->peers->{$h}{reserved} = $packet->{payload}[0];
920             return $s->_del_peer($h)
921 0 0       0 if $packet->{payload}[1] ne $s->infohash;
922 0         0 $s->peers->{$h}{peerid} = $packet->{payload}[2];
923 0         0 $s->_send_bitfield($h);
924             $s->peers->{$h}{timeout}
925 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
926 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
927             }
928             elsif ($packet->{type} == $INTERESTED) {
929 0         0 $s->peers->{$h}{remote_interested} = 1;
930             }
931             elsif ($packet->{type} == $NOT_INTERESTED) {
932 0         0 $s->peers->{$h}{remote_interested} = 0;
933              
934             # XXX - Clear any requests in queue
935             # XXX - Send choke just to be sure
936             }
937             elsif ($packet->{type} == $CHOKE) {
938 0         0 $s->peers->{$h}{local_choked} = 1;
939 0 0       0 if (!(vec($s->peers->{$h}{reserved}, 7, 1) & 0x04)) {
940 0         0 for my $req (@{$s->peers->{$h}{local_requests}}) {
  0         0  
941             $s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
942             unless
943 0 0       0 defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
944             }
945             }
946 0         0 $s->_consider_peer($s->peers->{$h});
947             }
948             elsif ($packet->{type} == $UNCHOKE) {
949 0         0 $s->peers->{$h}{local_choked} = 0;
950             $s->peers->{$h}{timeout}
951 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
952 0         0 $s->_request_pieces($s->peers->{$h});
953             }
954             elsif ($packet->{type} == $HAVE) {
955 0         0 vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
956 0         0 $s->_consider_peer($s->peers->{$h});
957             $s->peers->{$h}{timeout}
958 0     0   0 = AE::timer(60, 0, sub { $s->_del_peer($h) });
  0         0  
959             }
960             elsif ($packet->{type} == $BITFIELD) {
961 0         0 $s->peers->{$h}{bitfield} = $packet->{payload};
962 0         0 $s->_consider_peer($s->peers->{$h});
963             }
964             elsif ($packet->{type} == $REQUEST) {
965             $s->peers->{$h}{timeout}
966 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
967              
968             # XXX - Make sure (index + offset + length) < $s->size
969             # if not, send reject if remote supports fast ext
970             # either way, ignore the request
971 0         0 push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
  0         0  
972             }
973             elsif ($packet->{type} == $PIECE) {
974             $s->peers->{$h}{timeout}
975 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
976 0         0 my ($index, $offset, $data) = @{$packet->{payload}};
  0         0  
977              
978             # Make sure $index is a working piece
979 0   0     0 $s->working_pieces->{$index} // return;
980              
981             # Make sure we req from this peer
982             return
983             if !grep {
984 0 0 0     0 $_->[0] == $index
985             && $_->[1] == $offset
986             && $_->[2] == length $data
987 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
988             $s->peers->{$h}{local_requests} = [
989             grep {
990 0 0 0     0 ($_->[0] != $index)
991             || ($_->[1] != $offset)
992             || ($_->[2] != length($data))
993 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
994             ];
995 0         0 $s->working_pieces->{$index}{$offset}[4] = $data;
996 0         0 $s->working_pieces->{$index}{$offset}[5] = ();
997 0         0 $s->_set_downloaded($s->downloaded + length $data);
998 0 0       0 if (0 == scalar grep { !defined $_->[4] }
  0         0  
999 0         0 values %{$s->working_pieces->{$index}})
1000             { my $piece = join '',
1001 0         0 map { $s->working_pieces->{$index}{$_}[4] }
1002 0         0 sort { $a <=> $b }
1003 0         0 keys %{$s->working_pieces->{$index}};
  0         0  
1004 0 0       0 if ((substr($s->pieces, $index * 20, 20) eq sha1($piece))) {
1005 0         0 for my $attempt (1 .. 5) { # XXX = 5 == failure callback
1006             last
1007 0 0       0 if $s->_write($index, 0, $piece) == length $piece;
1008             }
1009 0         0 vec($s->{bitfield}, $index, 1) = 1;
1010             $s->_broadcast(
1011             build_have($index),
1012             sub {
1013 0     0   0 !!!index substr(unpack('b*', $_->{bitfield}),
1014             0, $s->piece_count + 1),
1015             0, 0;
1016             }
1017 0         0 );
1018             $s->announce('complete')
1019 0 0       0 if !scalar grep {$_} split '',
  0         0  
1020             substr unpack('b*', ~$s->bitfield), 0,
1021             $s->piece_count + 1;
1022             $s->_consider_peer($_)
1023 0         0 for grep { $_->{local_interested} }
  0         0  
1024 0         0 values %{$s->peers};
1025 0         0 $s->_trigger_hash_pass($index);
1026             }
1027             else {
1028 0         0 $s->_trigger_hash_fail($index);
1029              
1030             # XXX - Not sure what to do... I'd
1031             # ban the peers involved and
1032             # try the same piece again.
1033             }
1034 0         0 delete $s->working_pieces->{$index};
1035             }
1036 0         0 $s->_request_pieces($s->peers->{$h});
1037             }
1038             elsif ($packet->{type} == $CANCEL) {
1039 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1040             return # XXX - error callback if this block is not in the queue
1041             if !grep {
1042 0 0 0     0 $_->[0] == $index
1043             && $_->[1] == $offset
1044             && $_->[2] == $length
1045 0 0       0 } @{$s->peers->{$h}{remote_requests}};
  0         0  
1046             $s->peers->{$h}{remote_requests} = [
1047             grep {
1048 0 0 0     0 ($_->[0] != $index)
1049             || ($_->[1] != $offset)
1050             || ($_->[2] != $length)
1051 0         0 } @{$s->peers->{$h}{remote_requests}}
  0         0  
1052             ];
1053             }
1054             elsif ($packet->{type} == $PORT) {
1055              
1056             # Do nothing... as we don't have a DHT node. Yet?
1057             }
1058             elsif ($packet->{type} == $SUGGEST) {
1059 0         0 push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
  0         0  
1060             }
1061             elsif ($packet->{type} == $HAVE_ALL) {
1062 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
1063 0         0 $s->_consider_peer($s->peers->{$h});
1064             $s->peers->{$h}{timeout}
1065 0     0   0 = AE::timer(120, 0, sub { $s->_del_peer($h) });
  0         0  
1066             }
1067             elsif ($packet->{type} == $HAVE_NONE) {
1068 0         0 $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
1069             $s->peers->{$h}{timeout}
1070 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1071             }
1072             elsif ($packet->{type} == $REJECT) {
1073 0         0 my ($index, $offset, $length) = @{$packet->{payload}};
  0         0  
1074             return # XXX - error callback if this block is not in the queue
1075             if !grep {
1076 0 0 0     0 $_->[0] == $index
1077             && $_->[1] == $offset
1078             && $_->[2] == $length
1079 0 0       0 } @{$s->peers->{$h}{local_requests}};
  0         0  
1080 0         0 $s->working_pieces->{$index}{$offset}->[3] = ();
1081             $s->peers->{$h}{local_requests} = [
1082             grep {
1083 0 0 0     0 ($_->[0] != $index)
1084             || ($_->[1] != $offset)
1085             || ($_->[2] != $length)
1086 0         0 } @{$s->peers->{$h}{local_requests}}
  0         0  
1087             ];
1088             $s->peers->{$h}{timeout}
1089 0     0   0 = AE::timer(30, 0, sub { $s->_del_peer($h) });
  0         0  
1090             }
1091             elsif ($packet->{type} == $ALLOWED_FAST) {
1092 0         0 push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
  0         0  
1093             }
1094             else {
1095             # use Data::Dump qw[pp];
1096             # die 'Unhandled packet: ' . pp $packet;
1097             }
1098             last
1099 0 0 0     0 if 5 > length($h->rbuf // ''); # Min size for protocol
1100             }
1101             }
1102              
1103             sub _send_bitfield {
1104 0     0   0 my ($s, $h) = @_;
1105 0 0       0 if (vec($s->peers->{$h}{reserved}, 7, 1) & 0x04) {
1106 0 0       0 if ($s->seed) {
    0          
1107 0         0 return $s->_send_encrypted($h, build_haveall());
1108             }
1109             elsif ($s->bitfield() !~ m[[^\0]]) {
1110 0         0 return $s->_send_encrypted($h, build_havenone());
1111             }
1112             }
1113              
1114             # XXX - If it's cheaper to send HAVE packets than a full BITFIELD, do it
1115 0         0 $s->_send_encrypted($h, build_bitfield($s->bitfield));
1116             }
1117              
1118             sub _broadcast {
1119 0     0   0 my ($s, $data, $qualifier) = @_;
1120 0   0 0   0 $qualifier //= sub {1};
  0         0  
1121             $s->_send_encrypted($_->{handle}, $data)
1122 0         0 for grep { $qualifier->() } values %{$s->peers};
  0         0  
  0         0  
1123             }
1124              
1125             sub _consider_peer { # Figure out whether or not we find a peer interesting
1126 0     0   0 my ($s, $p) = @_;
1127 0 0       0 return if $s->state ne 'active';
1128 0 0       0 return if $s->complete;
1129 0         0 my $relevence = $p->{bitfield} & $s->wanted;
1130 0 0       0 my $interesting
1131             = (
1132             index(substr(unpack('b*', $relevence), 0, $s->piece_count + 1), 1, 0)
1133             != -1) ? 1 : 0;
1134 0 0       0 if ($interesting) {
1135 0 0       0 if (!$p->{local_interested}) {
1136 0         0 $p->{local_interested} = 1;
1137 0         0 $s->_send_encrypted($p->{handle}, build_interested());
1138             }
1139             }
1140             else {
1141 0 0       0 if ($p->{local_interested}) {
1142 0         0 $p->{local_interested} = 0;
1143 0         0 $s->_send_encrypted($p->{handle}, build_not_interested());
1144             }
1145             }
1146             }
1147             has working_pieces => (is => 'ro',
1148             lazy => 1,
1149             isa => HashRef,
1150             init_arg => undef,
1151             default => sub { {} }
1152             );
1153              
1154             sub _file_to_range {
1155 4     4   6 my ($s, $file) = @_;
1156 4         5 my $start = 0;
1157 4         11 for (0 .. $file - 1) {
1158 0         0 $start += $s->files->[$_]->{length};
1159             }
1160 4         53 my $end = $start + $s->files->[$file]->{length};
1161 4         22 $start = $start / $s->piece_length;
1162 4         26 $end = $end / $s->piece_length;
1163 4 50       196 (int($start) .. int $end + ($end != int($end) ? 0 : +1));
1164             }
1165              
1166             sub _request_pieces {
1167 0     0   0 my ($s, $p) = @_;
1168 0 0       0 return if $s->state ne 'active';
1169 0 0       0 weaken $p unless isweak $p;
1170 0   0     0 $p // return;
1171 0   0     0 $p->{handle} // return;
1172 0         0 my @indexes;
1173 0 0       0 if (scalar keys %{$s->working_pieces} < 10) { # XXX - Max working pieces
  0         0  
1174 0         0 for my $findex (0 .. $#{$s->files}) {
  0         0  
1175 0         0 for my $index ($s->_file_to_range($findex)) {
1176             push @indexes, map {
1177 0 0 0     0 vec($p->{bitfield}, $index, 1)
1178             && !vec($s->bitfield, $index, 1) ?
1179             $index
1180             : ()
1181 0         0 } 1 .. $s->{files}[$findex]{priority};
1182             }
1183             }
1184             }
1185             else {
1186 0         0 @indexes = keys %{$s->working_pieces};
  0         0  
1187             }
1188 0 0       0 return if !@indexes;
1189 0         0 my $index = $indexes[rand @indexes]; # XXX - Weighted random/Rarest first
1190 0 0       0 my $piece_size
1191             = $index == $s->piece_count ?
1192             $s->size % $s->piece_length
1193             : $s->piece_length;
1194 0         0 my $block_count = $piece_size / $block_size;
1195 0 0       0 my @offsets = map { $_ * $block_size }
  0         0  
1196             0 .. $block_count - ((int($block_count) == $block_count) ? 1 : 0);
1197 0   0     0 $s->working_pieces->{$index} //= {map { $_ => undef } @offsets};
  0         0  
1198 0         0 my @unrequested = sort { $a <=> $b }
1199             grep { # XXX - If there are no unrequested blocks, pick a new index
1200 0         0 (!ref $s->working_pieces->{$index}{$_})
1201             || ( (!defined $s->working_pieces->{$index}{$_}[4])
1202 0   0     0 && (!defined $s->working_pieces->{$index}{$_}[3]))
1203             } @offsets;
1204 0         0 for (scalar @{$p->{local_requests}} .. 12) {
  0         0  
1205 0         0 my $offset = shift @unrequested;
1206 0   0     0 $offset // return; # XXX - Start working on another piece
1207 0 0 0     0 my $_block_size
1208             = ($index == $s->piece_count && ($offset == $offsets[-1]))
1209             ?
1210             $piece_size % $block_size
1211             : $block_size;
1212              
1213             # XXX - Limit to x req per peer (future: based on bandwidth)
1214 0         0 AE::log
1215             trace => 'Requesting %d, %d, %d',
1216             $index, $offset, $_block_size;
1217             $s->_send_encrypted($p->{handle},
1218 0         0 build_request($index, $offset, $_block_size))
1219             ; # XXX - len for last piece
1220             $s->working_pieces->{$index}{$offset} = [
1221             $index, $offset,
1222             $_block_size,
1223             $p, undef,
1224             AE::timer(
1225             60, 0,
1226             sub {
1227 0   0 0   0 $p // return;
1228 0   0     0 $p->{handle} // return;
1229             $s->_send_encrypted($p->{handle},
1230 0         0 build_cancel($index, $offset, $_block_size));
1231 0         0 $s->working_pieces->{$index}{$offset}[3] = ();
1232             $p->{local_requests} = [
1233             grep {
1234 0 0 0     0 $_->[0] != $index
1235             || $_->[1] != $offset
1236             || $_->[2] != $_block_size
1237 0         0 } @{$p->{local_requests}}
  0         0  
1238             ];
1239             $p->{timeout} = AE::timer(45, 0,
1240 0         0 sub { $s->_del_peer($p->{handle}) });
  0         0  
1241              
1242             #$s->_request_pieces( $p) # XXX - Ask a different peer
1243             }
1244             )
1245 0         0 ];
1246             weaken($s->working_pieces->{$index}{$offset}[3])
1247 0 0       0 unless isweak($s->working_pieces->{$index}{$offset}[3]);
1248 0         0 push @{$p->{local_requests}}, [$index, $offset, $_block_size];
  0         0  
1249             }
1250             }
1251              
1252             # Cheap callback system
1253             has on_hash_pass => (
1254             is => 'rw',
1255             isa => CodeRef,
1256             default => sub {
1257             sub { !!1 }
1258             },
1259             clearer => '_no_hash_pass'
1260             );
1261 9     9   154 sub _trigger_hash_pass { shift->on_hash_pass()->(@_) }
1262             has on_hash_fail => (
1263             is => 'rw',
1264             isa => CodeRef,
1265             default => sub {
1266             sub { !!1 }
1267             },
1268             clearer => '_no_hash_fail'
1269             );
1270 6303     6303   114973 sub _trigger_hash_fail { shift->on_hash_fail()->(@_) }
1271             #
1272             has state => (is => 'ro',
1273             isa => Enum [qw[active stopped paused]],
1274             writer => '_set_state',
1275             default => sub {'active'}
1276             );
1277              
1278             sub stop {
1279 7     7 1 2773 my $s = shift;
1280 7         22 AE::log debug => 'Stopping...';
1281 7 100       218 return if $s->state eq 'stopped';
1282 3         8 AE::log trace => 'Announcing "stopped" event to trackers...';
1283 3         71 $s->announce('stopped');
1284 3         7332 AE::log trace => 'Disconnecting peers...';
1285 3         98 $s->_clear_peers;
1286 3         1093 AE::log trace => 'Stopping new peers ticker...';
1287 3         95 $s->_clear_peer_timer;
1288 3         879 AE::log trace => 'Closing files...';
1289 3         88 $s->_open($_, 'c') for 0 .. $#{$s->files};
  3         55  
1290 3         28 AE::log trace => 'Setting internal status...';
1291 3         116 $s->_set_state('stopped');
1292             }
1293              
1294             sub start {
1295 4     4 1 6 my $s = shift;
1296 4         10 AE::log debug => 'Starting...';
1297 4 50       112 $s->announce('started') unless $s->state eq 'active';
1298 4         24 $s->peers;
1299 4         62 AE::log trace => 'Starting new peers ticker...';
1300 4         140 $s->_peer_timer;
1301 4         66 AE::log trace => 'Setting internal status...';
1302 4         144 $s->_set_state('active');
1303             }
1304              
1305             sub pause {
1306 0     0 1 0 my $s = shift;
1307 0         0 AE::log debug => 'Pausing...';
1308 0         0 $s->peers;
1309 0         0 AE::log trace => 'Starting new peers ticker...';
1310 0         0 $s->_peer_timer;
1311 0         0 AE::log trace => 'Setting internal status...';
1312 0         0 $s->_set_state('paused');
1313             }
1314             #
1315             sub BUILD {
1316 4     4 0 75 my ($s, $a) = @_;
1317 4         10 AE::log debug => 'BUILD()';
1318 4 50 33     128 $s->start && AE::log debug => 'Calling ->start()'
1319             if $s->state eq 'active';
1320 4 50 0     1481 $s->paused && AE::log debug => 'Calling ->paused() '
1321             if $s->state eq 'paused';
1322             }
1323              
1324             # Testing stuff goes here
1325             sub _send_encrypted {
1326 0     0     my ($s, $h, $packet) = @_;
1327 0 0         return if !$h; # XXX - $s->_del_peer($p->{handle})
1328             # XXX - Currently doesn't do anything and may never do anything
1329             AE::log trace => sub {
1330 0     0     require Data::Dump;
1331 0           'Outgoing packet: ' . Data::Dump::dump($packet);
1332 0           };
1333 0           return $h->push_write($packet);
1334             }
1335              
1336             sub _send_handshake {
1337 0     0     my ($s, $h) = @_;
1338 0           AE::log trace => 'Outgoing handshake';
1339              
1340             # XXX - Send encrypted handshake if encryption status is unknown or true
1341 0           $h->push_write(build_handshake($s->reserved, $s->infohash, $s->peerid));
1342             }
1343             1;
1344              
1345             =pod
1346              
1347             =head1 NAME
1348              
1349             AnyEvent::BitTorrent - Yet Another BitTorrent Client Module
1350              
1351             =head1 Synopsis
1352              
1353             use AnyEvent::BitTorrent;
1354             my $client = AnyEvent::BitTorrent->new( path => 'some.torrent' );
1355             AE::cv->recv;
1356              
1357             =head1 Description
1358              
1359             This is a painfully simple BitTorrent client written on a whim that implements
1360             the absolute basics. For a full list of what's currently supported, what you
1361             will likely find in a future version, and what you'll never get from this, see
1362             the section entitled "L"
1363              
1364             =head1 Methods
1365              
1366             The API, much like the module itself, is simple.
1367              
1368             Anything you find by skimming the source is likely not ready for public use
1369             and will be subject to change before C. Here's the public interface as
1370             of this version:
1371              
1372             =head2 C
1373              
1374             my $c = AnyEvent::BitTorrent->new(
1375             path => 'some/legal.torrent',
1376             basedir => './storage/',
1377             port => 6881,
1378             on_hash_pass => sub { ... },
1379             on_hash_fail => sub { ... },
1380             state => 'stopped',
1381             piece_cache => $quick_restore
1382             );
1383              
1384             This constructor understands the following arguments:
1385              
1386             =over
1387              
1388             =item C
1389              
1390             This is the only required parameter. It's the path to a valid .torrent file.
1391              
1392             =item C
1393              
1394             This is the base directory all data will be stored in and/or read from.
1395             Multifile torrents will create another directory below this to store all
1396             files.
1397              
1398             By default, this is the current working directory when
1399             L|/"new( ... )"> is called.
1400              
1401             =item C
1402              
1403             This is the preferred port local host binds and expects incoming peers to
1404             connect to.
1405              
1406             By default, this is a zero; the system will pick a port number randomly.
1407              
1408             =item C
1409              
1410             This is a subroutine called whenever a piece fails to pass
1411             L. The callback is handed the piece's index.
1412              
1413             =item C
1414              
1415             This is a subroutine called whenever a piece passes its
1416             L. The callback is handed the piece's index.
1417              
1418             =item C
1419              
1420             This must be one of the following:
1421              
1422             =over
1423              
1424             =item C
1425              
1426             This is the default. The client will attempt to create new connections, make
1427             and fill requests, etc. This is normal client behavior.
1428              
1429             =item C
1430              
1431             In this state, connections will be made and accepted but no piece requests
1432             will be made or filled. To resume full, normal behavior, you must call
1433             L|/"start( )">.
1434              
1435             =item C
1436              
1437             Everything is put on hold. No new outgoing connections are attempted and
1438             incoming connections are rejected. To resume full, normal behavior, you must
1439             call L|/"start( )">.
1440              
1441             =back
1442              
1443             =item C
1444              
1445             This is the index list returned by L|/"piece_cache( )"> in a
1446             previous instance. Using this should make a complete resume system a trivial
1447             task.
1448              
1449             =back
1450              
1451             =head2 C
1452              
1453             This method expects...
1454              
1455             =over
1456              
1457             =item ...a list of integers. You could use this to check a range of pieces (a
1458             single file, for example).
1459              
1460             $client->hashcheck( 1 .. 5, 34 .. 56 );
1461              
1462             =item ...a single integer. Only that specific piece is checked.
1463              
1464             $client->hashcheck( 17 );
1465              
1466             =item ...nothing. All data related to this torrent will be checked.
1467              
1468             $client->hashcheck( );
1469              
1470             =back
1471              
1472             As pieces pass or fail, your C and C callbacks are
1473             triggered.
1474              
1475             =head2 C
1476              
1477             Sends a 'started' event to trackers and starts performing as a client is
1478             expected. New connections are made and accepted, requests are made and filled,
1479             etc.
1480              
1481             =head2 C
1482              
1483             Sends a stopped event to trackers, closes all connections, stops attempting
1484             new outgoing connections, rejects incoming connections and closes all open
1485             files.
1486              
1487             =head2 C
1488              
1489             The client remains mostly active; new connections will be made and accepted,
1490             etc. but no requests will be made or filled while the client is paused.
1491              
1492             =head2 C
1493              
1494             Returns the 20-byte SHA1 hash of the value of the info key from the metadata
1495             file.
1496              
1497             =head2 C
1498              
1499             Returns the 20 byte string used to identify the client. Please see the
1500             L below.
1501              
1502             =head2 C
1503              
1504             Returns the port number the client is listening on.
1505              
1506             =head2 C
1507              
1508             Returns the total size of all L described in the torrent's
1509             metadata.
1510              
1511             =head2 C
1512              
1513             Returns the UTF-8 encoded string the metadata suggests we save the file (or
1514             directory, in the case of multi-file torrents) under.
1515              
1516             =head2 C
1517              
1518             Returns the total amount uploaded to remote peers.
1519              
1520             =head2 C
1521              
1522             Returns the total amount downloaded from other peers.
1523              
1524             =head2 C
1525              
1526             Returns the approximate amount based on the pieces we still
1527             L multiplied by the L.
1528              
1529             =head2 C
1530              
1531             Returns the number of bytes in each piece the file or files are split into.
1532             For the purposes of transfer, files are split into fixed-size pieces which are
1533             all the same length except for possibly the last one which may be truncated.
1534              
1535             =head2 C
1536              
1537             Returns a packed binary string in ascending order (ready for C). Each
1538             index that the client has is set to one and the rest are set to zero.
1539              
1540             =head2 C
1541              
1542             Returns a packed binary string in ascending order (ready for C). Each
1543             index that the client has or simply does not want is set to zero and the rest
1544             are set to one.
1545              
1546             This value is calculated every time the method is called. Keep that in mind.
1547              
1548             =head2 C
1549              
1550             Returns true if we have downloaded everything we L which
1551             is not to say that we have all data and can L.
1552              
1553             =head2 C
1554              
1555             Returns true if we have all data related to the torrent.
1556              
1557             =head2 C
1558              
1559             Returns a list of hash references with the following keys:
1560              
1561             =over
1562              
1563             =item C
1564              
1565             Which is the size of file in bytes.
1566              
1567             =item C
1568              
1569             Which is the absolute path of the file.
1570              
1571             =item C
1572              
1573             Download priority for this file. By default, all files have a priority of
1574             C<1>. There is no built in scale; the higher the priority, the better odds a
1575             piece from it will be downloaded first. Setting a file's priority to C<1000>
1576             while the rest are still at C<1> will likely force the file to complete before
1577             any other file is started.
1578              
1579             We do not download files with a priority of zero.
1580              
1581             =back
1582              
1583             =head2 C
1584              
1585             Returns the list of currently connected peers. The organization of these peers
1586             is not yet final so... don't write anything you don't expect to break before
1587             we hit C.
1588              
1589             =head2 C
1590              
1591             Returns C if the client is L, C if client
1592             is L, and C if the client is currently
1593             L.
1594              
1595             =head2 C
1596              
1597             Pieces which overlap files with zero priority are stored in a part file which
1598             is indexed internally. To save this index (for resume, etc.) store the values
1599             returned by this method and pass it to L.
1600              
1601             =head2 C
1602              
1603             Returns a list of hashes, each representing a single tier of trackers as
1604             defined by L. The hashes contain the
1605             following keys:
1606              
1607             =over
1608              
1609             =item C
1610              
1611             The is a count of complete peers (seeds) as returned by the most recent
1612             announce.
1613              
1614             =item C
1615              
1616             This is a running total of the number of failed announces we've had in a row.
1617             This value is reset when we have a successful announce.
1618              
1619             =item C
1620              
1621             The is a count of incomplete peers (leechers) as returned by the most recent
1622             announce.
1623              
1624             =item C
1625              
1626             Which is a compact collection of IPv4 peers returned by the tracker. See
1627             L.
1628              
1629             =item C
1630              
1631             Which is a compact collection of IPv6 peers returned by the tracker. See
1632             L.
1633              
1634             =item C
1635              
1636             Which is a list of URLs.
1637              
1638             =back
1639              
1640             =head1 This Module is Lame!
1641              
1642             Yeah, I said it.
1643              
1644             There are a few things a BitTorrent client must implement (to some degree) in
1645             order to interact with other clients in a modern day swarm.
1646             L is meant to meet that bare
1647             minimum but it's based on L so you could always subclass it to add more
1648             advanced functionality. Hint, hint!
1649              
1650             =head2 What is currently supported?
1651              
1652             Basic stuff. We can make and handle piece requests. Deal with cancels,
1653             disconnect idle peers, unchoke folks, fast extensions, file download
1654             priorities. Normal... stuff. HTTP trackers.
1655              
1656             =head2 What will probably be supported in the future?
1657              
1658             DHT (which will likely be in a separate dist), IPv6 stuff... I'll get around
1659             to those.
1660              
1661             Long term, UDP trackers may be supported.
1662              
1663             For a detailed list, see the TODO file included with this distribution.
1664              
1665             =head2 What will likely never be supported?
1666              
1667             We can't have nice things. Protocol encryption, uTP, endgame tricks, ...these
1668             will probably never be included in L.
1669              
1670             =head2 What should I use instead?
1671              
1672             If you're reading all of this with a scowl, there are many alternatives to
1673             this module, most of which are sure to be better suited for advanced users. I
1674             suggest (in no particular order):
1675              
1676             =over
1677              
1678             =item L. It's written in Perl but you'll
1679             still need to be on a Linux, *BSD, et al. system to use it.
1680              
1681             =item L ...in the future. I I suggest using either
1682             the current stable or unstable versions found on CPAN. The next version is
1683             being worked on and will be based on L.
1684              
1685             =back
1686              
1687             If you're working on a Perl based client and would like me to link to it, send
1688             a bug report to the tracker L.
1689              
1690             =head1 Subclassing AnyEvent::BitTorrent
1691              
1692             TODO
1693              
1694             If you subclass this module and change the way it functions to that which in
1695             any way proves harmful to individual peers or the swarm at large, rather than
1696             damage L's reputation, override the peerid attribute.
1697             Thanks.
1698              
1699             =head1 PeerID Specification
1700              
1701             L may be identified in a swarm by its peer id. As of
1702             this version, our peer id is in 'Azureus style' with a single digit for the
1703             Major version, two digits for the minor version, and a single character to
1704             indicate stability (stable releases marked with C, unstable releases marked
1705             with C). It looks sorta like:
1706              
1707             -AB110S- Stable v1.10.0 relese (typically found on CPAN, tagged in repo)
1708             -AB110U- Unstable v1.10.X release (private builds, early testing, etc.)
1709              
1710             =head1 Bug Reports
1711              
1712             If email is better for you, L but I
1713             would rather have bugs sent through the issue tracker found at
1714             http://github.com/sanko/anyevent-bittorrent/issues.
1715              
1716             Please check the ToDo file included with this distribution in case your bug
1717             is already known (...I probably won't file bug reports to myself).
1718              
1719             =head1 See Also
1720              
1721             L - The package which does all of the wire protocol
1722             level heavy lifting.
1723              
1724             =head1 Author
1725              
1726             Sanko Robinson - http://sankorobinson.com/
1727              
1728             CPAN ID: SANKO
1729              
1730             =head1 License and Legal
1731              
1732             Copyright (C) 2011-2013 by Sanko Robinson
1733              
1734             This program is free software; you can redistribute it and/or modify it under
1735             the terms of
1736             L.
1737             See the F file included with this distribution or
1738             L
1739             for clarification.
1740              
1741             When separated from the distribution, all original POD documentation is
1742             covered by the
1743             L.
1744             See the
1745             L.
1746              
1747             Neither this module nor the L is affiliated with BitTorrent,
1748             Inc.
1749              
1750             =cut