File Coverage

blib/lib/AnyEvent/MultiDownload.pm
Criterion Covered Total %
statement 131 144 90.9
branch 27 48 56.2
condition 23 45 51.1
subroutine 20 22 90.9
pod 1 7 14.2
total 202 266 75.9


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2             package AnyEvent::MultiDownload;
3 4     4   130369 use strict;
  4         7  
  4         142  
4 4     4   2067 use utf8;
  4         31  
  4         17  
5 4     4   1788 use Moo;
  4         34790  
  4         21  
6 4     4   5789 use AE;
  4         22138  
  4         155  
7 4     4   1599 use Asset::File;
  4         129478  
  4         122  
8 4     4   2151 use AnyEvent::Digest;
  4         32035  
  4         144  
9 4     4   31 use List::Util qw/shuffle/;
  4         6  
  4         377  
10 4     4   2807 use AnyEvent::HTTP qw/http_get/;
  4         95840  
  4         7561  
11              
12             our $VERSION = '1.12';
13              
14             has path => (
15             is => 'ro',
16             isa => sub {
17             die "文件存在" if -e $_[0];
18             },
19             required => 1,
20             );
21              
22             has url => (
23             is => 'ro',
24             required => 1,
25             );
26              
27              
28             has mirror => (
29             is => 'rw',
30             predicate => 1,
31             isa => sub {
32             return 1 if ref $_[0] eq 'ARRAY';
33             },
34             );
35              
36             has digest => (
37             is => 'rw',
38             isa => sub {
39             return 1 if $_[0] =~ /Digest::(SHA|MD5)/;
40             }
41             );
42              
43              
44             has on_finish => (
45             is => 'rw',
46             required => 1,
47             isa => sub {
48             return 2 if ref $_[0] eq 'CODE';
49             }
50             );
51              
52             has on_error => (
53             is => 'rw',
54             isa => sub {
55             return 2 if ref $_[0] eq 'CODE';
56             },
57             default => sub { sub { 1 } },
58             );
59              
60             has on_block_finish => (
61             is => 'rw',
62             isa => sub {
63             return 2 if ref $_[0] eq 'CODE';
64             },
65             default => sub { return sub {1} }
66             );
67              
68             has cv => (
69             is => 'rw',
70             lazy => 1,
71             default => sub { AE::cv },
72             );
73              
74             has fh => (
75             is => 'rw',
76             lazy => 1,
77             default => sub {
78             return Asset::File->new;
79             },
80             );
81              
82             has retry_interval => is => 'rw', default => sub { 10 };
83             has max_retries => is => 'rw', default => sub { 3 };
84             has block_size => is => 'rw', default => sub { 1 * 1024 * 1024 };
85             has timeout => is => 'rw', default => sub { 10 };
86             has recurse => is => 'rw', default => sub { 6 };
87             has headers => is => 'rw', default => sub {{}};
88             has tasks => is => 'rw', default => sub { [] };
89             has error => is => 'rw', default => sub {};
90             has task_lists => is => 'rw', default => sub {[]};
91             has max_per_host => is => 'rw', default => sub { 8 };
92             has url_status => (
93             is => 'rw',
94             lazy => 1,
95             default => sub {
96             my $self = shift;
97             my %hash;
98             if ($self->has_mirror) {
99             %hash = map {
100             $_ => 0
101             } @{ $self->mirror }, $self->url;
102             }
103             else {
104             $hash{$self->url} = 0;
105             }
106             return \%hash;
107             }
108             );
109              
110             sub start {
111 3     3 1 48 my $self = shift;
112 3         5 my $cb = shift;
113            
114             $self->cv->cb(sub{
115 3     3   102 my $cv = shift;
116 3         41 my ($info, $hdr) = $cv->recv;
117 3 100       42 if ($info) {
118 2         14 AE::log debug => $info;
119 2         4628 return $self->on_error->($info, $hdr);
120             }
121 1         30 $self->fh->move_to($self->path);
122 1         467 $self->on_finish->($self->fh->size);
123 3         14 });
124              
125 3         352 $self->cv->begin;
126 3         51 $self->first_request(0);
127             }
128              
129              
130             sub first_request {
131 4     4 0 15 my ($self, $retry) = @_;
132 4         41 my $url = $self->shuffle_url;
133              
134 4         7 my $first_task;
135             my $ev; $ev = http_get $url,
136             headers => $self->headers,
137             timeout => $self->timeout,
138             recurse => $self->recurse,
139             on_header => sub {
140 2     2   70511 my ($hdr) = @_;
141 2 100       15 if ( $hdr->{Status} == 200 ) {
142 1         4 my $len = $hdr->{'content-length'};
143 1 50 33     21 return $self->cv->send(("Cannot find a content-length header.", $hdr))
144             if !defined($len) or $len <= 0;
145              
146             # 准备开始下载的信息
147 1         7 my $ranges = $self->split_range($len);
148              
149             # 除了第一个块, 其它块现在开始下载
150             # 事件开始, 但这个回调会在最后才调用.
151 1         4 $first_task = shift @{ $self->tasks };
  1         4  
152 1   50     10 $first_task->{block} = $first_task->{block} || 0;
153 1   50     8 $first_task->{ofs} = $first_task->{ofs} || 0;
154 1 50       6 return 1 if $len <= $self->block_size;
155              
156 1         6 for ( 1 .. $self->max_per_host ) {
157 8         3451 my $block_task = shift @{ $self->tasks };
  8         26  
158 8 100       22 last unless defined $block_task;
159 7         231 $self->cv->begin;
160 7         292 $self->fetch_block($block_task) ;
161              
162             }
163             }
164             1
165 2         7 },
166             on_body => sub {
167 2     2   187 my ($partial_body, $hdr) = @_;
168 2 100       13 if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
169             # 如果是第一个块的话, 下载到指定的大小就需要断开
170 1 50 33     24 if ( ( $hdr->{'content-length'} <= $self->block_size and $first_task->{size} == $hdr->{'content-length'} )
      33        
171             or
172             $first_task->{size} >= $self->block_size
173             ) {
174              
175 1 50       30 $self->cv->send(("The 0 block the compared failure", $hdr))
    50          
176             if !$self->on_block_finish->( $hdr, $first_task, $self->digest ? $first_task->{ctx}->hexdigest : '');
177 1         2816 $self->cv->end;
178 1         15 return 0
179             }
180             }
181 1         8 return 1;
182             },
183             sub {
184 4     4   38476 my (undef, $hdr) = @_;
185 4         10 undef $ev;
186 4         9 my $status = $hdr->{Status};
187             # on_body 正常的下载
188 4 50 66     152 return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200 or $hdr->{Status} == 416;
      66        
      66        
189              
190 3 100 66     69 if ( ($status == 500 or $status == 503 or $status =~ /^59/) and $retry < $self->max_retries ) {
      100        
191 1         2 my $w; $w = AE::timer( $self->retry_interval, 0, sub {
192 1         995452 $first_task->{pos} = $first_task->{ofs}; # 重下本块时要 seek 回零
193 1         8 $first_task->{size} = 0;
194 1         4 $first_task->{ctx} = undef;
195 1         9 $self->first_request(++$retry);
196 1         801 undef $w;
197 1         15 });
198 1         9 AE::log debug => "地址 $url 的块 0 下载出错, 重试";
199 1         4708 return;
200             }
201              
202 2 50       70 return $self->cv->send((
    50          
203             sprintf("Status: %s, Reason: %s.", $status ? $status : '500', $hdr->{Reason} ? $hdr->{Reason} : ' '),
204             $hdr)
205             );
206             }
207 4         198 }
208              
209              
210             sub shuffle_url {
211 11     11 0 29 my $self = shift;
212 11         246 my $urls = $self->url_status;
213 11         127 return (shuffle keys %$urls)[0];
214             }
215              
216             sub on_body {
217 9     9 0 12 my ($self, $task) = @_;
218             return sub {
219 9     9   12605 my ($partial_body, $hdr) = @_;
220 9 100 100     70 return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);
221              
222 8         20 my $len = length($partial_body);
223             # 主要是用于解决第一个块会超过写的位置
224 8 50       51 if ( $task->{size} + $len > $self->block_size ) {
225 8         34 my $spsize = $len - ( $task->{size} + $len - $self->block_size );
226 8         37 $partial_body = substr($partial_body, 0, $spsize);
227 8         13 $len = $spsize;
228             }
229              
230 8         254 $self->fh->start_range($task->{pos});
231 8         1520 $self->fh->add_chunk($partial_body);
232              
233 8 50       2758 if ( $self->digest ) {
234 8   33     837 $task->{ctx} ||= AnyEvent::Digest->new($self->digest);
235 8         1020 $task->{ctx}->add_async($partial_body);
236             }
237              
238 8         690 $task->{pos} += $len;
239 8         15 $task->{size} += $len;
240 8         29 return 1;
241             }
242 9         123 }
243              
244             sub fetch_block {
245 7     7 0 10 my ($self, $task, $retry) = @_;
246 7   50     29 $retry ||= 0;
247 7         17 my $url = $self->shuffle_url;
248              
249 7         9 my $ev; $ev = http_get $url,
  7         40  
250             timeout => $self->timeout,
251             recurse => $self->recurse,
252             persistent => 1,
253             keepalive => 1,
254             headers => {
255             %{ $self->headers },
256             Range => $task->{range}
257             },
258             on_body => $self->on_body($task),
259             sub {
260 7     7   3080 my ($hdl, $hdr) = @_;
261 7         25 my $status = $hdr->{Status};
262 7         11 undef $ev;
263              
264             # 成功下载到的流程
265             # 1. 需要对比大小是否一致, 接着对比块较检
266             # 2. 开始下一个任务的下载
267             # 3. 当前块就退出, 不然下面会重试
268 7 50 33     27 if ( $status == 200 || $status == 206 ) { # 第一个块, 这二个都有可能
269             # not ok 块较检不相等 | 直接失败
270 7 50 33     390 return $self->cv->send(("The $task->{block} block the compared failure", $hdr))
    50          
271             if ($task->{size} != ( $task->{tail} -$task->{ofs} + 1 )
272             or !$self->on_block_finish->($hdl, $task, $self->digest ? $task->{ctx}->hexdigest : ''));
273 7         21000 my $block_task = shift @{ $self->tasks };
  7         42  
274             # 完成, 标记结束本次请求
275             # ok 大小相等, 块较检相等, 当前块下载完成, 开始下载新的
276 7         75 AE::log debug => "地址 $url 的块 $task->{block} 下载完成 $$";
277             # 处理接下来的一个请求
278 7 50       7934 $block_task ? $self->fetch_block($block_task) : $self->cv->end;
279 7         3606 return;
280             }
281              
282             # 是否重试的流程
283 0 0       0 my $error = sprintf(
    0          
284             "Block %s the size is wrong, expect the size: %s actual size: %s, The %s try again, Status: %s, Reason: %s.",
285             $task->{block},
286             $self->block_size,
287             $task->{size},
288             $retry,
289             $status ? $status : '500',
290             $hdr->{Reason} ? $hdr->{Reason} : ' ', );
291 0         0 AE::log warn => $error;
292            
293             # 失败
294             # 如果有可能还连接上的响应, 就需要重试, 直到达到重试, 如果不可能连接的响应, 就直接快速的退出
295 0 0 0     0 return $self->cv->send(($error, $hdr))
296             if $status !~ /^(59.|503|500|502|200|206|)$/ or $retry > $self->max_retries;
297            
298 0         0 $self->retry($task, $retry);
299             }
300 7         27 };
301              
302             sub retry {
303 0     0 0 0 my ($self, $task, $retry) = @_;
304 0         0 my $w;$w = AE::timer( $self->retry_interval, 0, sub {
305 0     0   0 $task->{pos} = $task->{ofs}; # 重下本块时要 seek 回零
306 0         0 $task->{size} = 0;
307 0         0 $task->{ctx} = undef;
308 0         0 $self->fetch_block( $task, ++$retry );
309 0         0 undef $w;
310 0         0 });
311             }
312              
313             sub split_range {
314 1     1 0 2 my $self = shift;
315 1         4 my $length = shift;
316              
317             # 每个请求的段大小的范围,字节
318 1         11 my $block_size = $self->block_size;
319 1         5 my $segments = int($length / $block_size);
320              
321             # 要处理的字节的总数
322 1         3 my $len_remain = $length;
323              
324 1         2 my @ranges;
325 1         2 my $block = 0;
326 1         7 while ( $len_remain > 0 ) {
327             # 每个 segment 的大小
328 8         8 my $seg_len = $block_size;
329              
330             # 偏移长度
331 8         10 my $ofs = $length - $len_remain;
332            
333             # 剩余字节
334 8         9 $len_remain -= $seg_len;
335              
336 8         11 my $tail = $ofs + $seg_len - 1;
337 8 50       16 if ( $length-1 < $tail) {
338 0         0 $tail = $length-1;
339             }
340              
341 8         55 my $task = {
342             block => $block, # 当前块编号
343             ofs => $ofs, # 当前的偏移量
344             pos => $ofs, # 本块的起点
345             tail => $tail, # 本块的结束
346             range => 'bytes=' . $ofs . '-' . $tail,
347             size => 0, # 总共下载的长度
348             };
349              
350 8         24 $self->tasks->[$block] = $task;
351 8         22 $block++;
352             }
353             }
354              
355             1;
356              
357             __END__