File Coverage

blib/lib/AnyEvent/MultiDownload.pm
Criterion Covered Total %
statement 133 148 89.8
branch 28 50 56.0
condition 23 45 51.1
subroutine 20 22 90.9
pod 1 7 14.2
total 205 272 75.3


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2             package AnyEvent::MultiDownload;
3 4     4   190114 use strict;
  4         11  
  4         209  
4 4     4   3574 use utf8;
  4         48  
  4         24  
5 4     4   6154 use Moo;
  4         70125  
  4         69  
6 4     4   10268 use AE;
  4         35318  
  4         204  
7 4     4   2557 use Asset::File;
  4         221545  
  4         183  
8 4     4   3145 use AnyEvent::Digest;
  4         40847  
  4         179  
9 4     4   40 use List::Util qw/shuffle/;
  4         6  
  4         525  
10 4     4   3567 use AnyEvent::HTTP qw/http_get/;
  4         126891  
  4         10906  
11              
12             our $VERSION = '1.13';
13              
14             has path => (
15             is => 'ro',
16             isa => sub {
17             die "文件存在" if -e $_[0];
18             },
19             required => 1,
20             );
21              
22             has url => (
23             is => 'ro',
24             required => 1,
25             );
26              
27              
28             has mirror => (
29             is => 'rw',
30             predicate => 1,
31             isa => sub {
32             return 1 if ref $_[0] eq 'ARRAY';
33             },
34             );
35              
36             has digest => (
37             is => 'rw',
38             isa => sub {
39             return 1 if $_[0] =~ /Digest::(SHA|MD5)/;
40             }
41             );
42              
43              
44             has on_finish => (
45             is => 'rw',
46             required => 1,
47             isa => sub {
48             return 2 if ref $_[0] eq 'CODE';
49             }
50             );
51              
52             has on_error => (
53             is => 'rw',
54             isa => sub {
55             return 2 if ref $_[0] eq 'CODE';
56             },
57             default => sub { sub { 1 } },
58             );
59              
60             has on_block_finish => (
61             is => 'rw',
62             isa => sub {
63             return 2 if ref $_[0] eq 'CODE';
64             },
65             default => sub { return sub {1} }
66             );
67              
68             has cv => (
69             is => 'rw',
70             lazy => 1,
71             default => sub { AE::cv },
72             );
73              
74             has fh => (
75             is => 'rw',
76             lazy => 1,
77             default => sub {
78             return Asset::File->new;
79             },
80             );
81              
82             has retry_interval => is => 'rw', default => sub { 10 };
83             has max_retries => is => 'rw', default => sub { 3 };
84             has block_size => is => 'rw', default => sub { 1 * 1024 * 1024 };
85             has timeout => is => 'rw', default => sub { 10 };
86             has recurse => is => 'rw', default => sub { 6 };
87             has headers => is => 'rw', default => sub {{}};
88             has tasks => is => 'rw', default => sub { [] };
89             has error => is => 'rw', default => sub {};
90             has task_lists => is => 'rw', default => sub {[]};
91             has max_per_host => is => 'rw', default => sub { 8 };
92             has url_status => (
93             is => 'rw',
94             lazy => 1,
95             default => sub {
96             my $self = shift;
97             my %hash;
98             if ($self->has_mirror) {
99             %hash = map {
100             $_ => 0
101             } @{ $self->mirror }, $self->url;
102             }
103             else {
104             $hash{$self->url} = 0;
105             }
106             return \%hash;
107             }
108             );
109              
110             sub start {
111 3     3 1 166 my $self = shift;
112 3         8 my $cb = shift;
113            
114             $self->cv->cb(sub{
115 3     3   129 my $cv = shift;
116 3         49 my ($info, $hdr) = $cv->recv;
117             # 出错的回调
118 3 100       47 if ($info) {
119 2         19 AE::log debug => $info;
120 2         4867 return $self->on_error->($info, $hdr);
121             }
122             # 移文件
123 1         3 eval {
124 1         33 $self->fh->move_to($self->path);
125             };
126 1 50       476 if ($@) {
127 0         0 AE::log debug => $@;
128 0         0 return $self->on_error->("$@", []);
129             }
130 1         39 $self->on_finish->($self->fh->size);
131 3         18 });
132              
133 3         361 $self->cv->begin;
134 3         68 $self->first_request(0);
135             }
136              
137              
138             sub first_request {
139 4     4 0 14 my ($self, $retry) = @_;
140 4         18 my $url = $self->shuffle_url;
141              
142 4         31 my $first_task;
143             my $ev; $ev = http_get $url,
144             headers => $self->headers,
145             timeout => $self->timeout,
146             recurse => $self->recurse,
147             on_header => sub {
148 2     2   95374 my ($hdr) = @_;
149 2 100       15 if ( $hdr->{Status} == 200 ) {
150 1         4 my $len = $hdr->{'content-length'};
151 1 50 33     26 return $self->cv->send(("Cannot find a content-length header.", $hdr))
152             if !defined($len) or $len <= 0;
153              
154             # 准备开始下载的信息
155 1         7 my $ranges = $self->split_range($len);
156              
157             # 除了第一个块, 其它块现在开始下载
158             # 事件开始, 但这个回调会在最后才调用.
159 1         3 $first_task = shift @{ $self->tasks };
  1         7  
160 1   50     11 $first_task->{block} = $first_task->{block} || 0;
161 1   50     9 $first_task->{ofs} = $first_task->{ofs} || 0;
162 1 50       9 return 1 if $len <= $self->block_size;
163              
164 1         6 for ( 1 .. $self->max_per_host ) {
165 8         2722 my $block_task = shift @{ $self->tasks };
  8         31  
166 8 100       26 last unless defined $block_task;
167 7         266 $self->cv->begin;
168 7         129 $self->fetch_block($block_task) ;
169              
170             }
171             }
172             1
173 2         7 },
174             on_body => sub {
175 2     2   193 my ($partial_body, $hdr) = @_;
176 2 100       12 if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
177             # 如果是第一个块的话, 下载到指定的大小就需要断开
178 1 50 33     30 if ( ( $hdr->{'content-length'} <= $self->block_size and $first_task->{size} == $hdr->{'content-length'} )
      33        
179             or
180             $first_task->{size} >= $self->block_size
181             ) {
182              
183 1 50       38 $self->cv->send(("The 0 block the compared failure", $hdr))
    50          
184             if !$self->on_block_finish->( $hdr, $first_task, $self->digest ? $first_task->{ctx}->hexdigest : '');
185 1         4112 $self->cv->end;
186 1         27 return 0
187             }
188             }
189 1         8 return 1;
190             },
191             sub {
192 4     4   60520 my (undef, $hdr) = @_;
193 4         14 undef $ev;
194 4         14 my $status = $hdr->{Status};
195             # on_body 正常的下载
196 4 50 66     213 return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200 or $hdr->{Status} == 416;
      66        
      66        
197              
198 3 100 66     61 if ( ($status == 500 or $status == 503 or $status =~ /^59/) and $retry < $self->max_retries ) {
      100        
199 1         2 my $w; $w = AE::timer( $self->retry_interval, 0, sub {
200 1         994448 $first_task->{pos} = $first_task->{ofs}; # 重下本块时要 seek 回零
201 1         13 $first_task->{size} = 0;
202 1         5 $first_task->{ctx} = undef;
203 1         12 $self->first_request(++$retry);
204 1         1460 undef $w;
205 1         18 });
206 1         10 AE::log debug => "地址 $url 的块 0 下载出错, 重试";
207 1         5702 return;
208             }
209              
210 2 50       80 return $self->cv->send((
    50          
211             sprintf("Status: %s, Reason: %s.", $status ? $status : '500', $hdr->{Reason} ? $hdr->{Reason} : ' '),
212             $hdr)
213             );
214             }
215 4         271 }
216              
217              
218             sub shuffle_url {
219 11     11 0 33 my $self = shift;
220 11         350 my $urls = $self->url_status;
221 11         252 return (shuffle keys %$urls)[0];
222             }
223              
224             sub on_body {
225 9     9 0 16 my ($self, $task) = @_;
226             return sub {
227 9     9   11721 my ($partial_body, $hdr) = @_;
228 9 100 100     87 return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);
229              
230 8         15 my $len = length($partial_body);
231             # 主要是用于解决第一个块会超过写的位置
232 8 50       59 if ( $task->{size} + $len > $self->block_size ) {
233 8         39 my $spsize = $len - ( $task->{size} + $len - $self->block_size );
234 8         48 $partial_body = substr($partial_body, 0, $spsize);
235 8         17 $len = $spsize;
236             }
237              
238 8         295 $self->fh->start_range($task->{pos});
239 8         2348 $self->fh->add_chunk($partial_body);
240              
241 8 50       3416 if ( $self->digest ) {
242 8   33     1181 $task->{ctx} ||= AnyEvent::Digest->new($self->digest);
243 8         1325 $task->{ctx}->add_async($partial_body);
244             }
245              
246 8         855 $task->{pos} += $len;
247 8         14 $task->{size} += $len;
248 8         31 return 1;
249             }
250 9         181 }
251              
252             sub fetch_block {
253 7     7 0 13 my ($self, $task, $retry) = @_;
254 7   50     42 $retry ||= 0;
255 7         17 my $url = $self->shuffle_url;
256              
257 7         11 my $ev; $ev = http_get $url,
  7         49  
258             timeout => $self->timeout,
259             recurse => $self->recurse,
260             persistent => 1,
261             keepalive => 1,
262             headers => {
263             %{ $self->headers },
264             Range => $task->{range}
265             },
266             on_body => $self->on_body($task),
267             sub {
268 7     7   3664 my ($hdl, $hdr) = @_;
269 7         23 my $status = $hdr->{Status};
270 7         17 undef $ev;
271              
272             # 成功下载到的流程
273             # 1. 需要对比大小是否一致, 接着对比块较检
274             # 2. 开始下一个任务的下载
275             # 3. 当前块就退出, 不然下面会重试
276 7 50 33     38 if ( $status == 200 || $status == 206 ) { # 第一个块, 这二个都有可能
277             # not ok 块较检不相等 | 直接失败
278 7 50 33     492 return $self->cv->send(("The $task->{block} block the compared failure", $hdr))
    50          
279             if ($task->{size} != ( $task->{tail} -$task->{ofs} + 1 )
280             or !$self->on_block_finish->($hdl, $task, $self->digest ? $task->{ctx}->hexdigest : ''));
281 7         23670 my $block_task = shift @{ $self->tasks };
  7         62  
282             # 完成, 标记结束本次请求
283             # ok 大小相等, 块较检相等, 当前块下载完成, 开始下载新的
284 7         97 AE::log debug => "地址 $url 的块 $task->{block} 下载完成 $$";
285             # 处理接下来的一个请求
286 7 50       8058 $block_task ? $self->fetch_block($block_task) : $self->cv->end;
287 7         4163 return;
288             }
289              
290             # 是否重试的流程
291 0 0       0 my $error = sprintf(
    0          
292             "Block %s the size is wrong, expect the size: %s actual size: %s, The %s try again, Status: %s, Reason: %s.",
293             $task->{block},
294             $self->block_size,
295             $task->{size},
296             $retry,
297             $status ? $status : '500',
298             $hdr->{Reason} ? $hdr->{Reason} : ' ', );
299 0         0 AE::log warn => $error;
300            
301             # 失败
302             # 如果有可能还连接上的响应, 就需要重试, 直到达到重试, 如果不可能连接的响应, 就直接快速的退出
303 0 0 0     0 return $self->cv->send(($error, $hdr))
304             if $status !~ /^(59.|503|500|502|200|206|)$/ or $retry > $self->max_retries;
305            
306 0         0 $self->retry($task, $retry);
307             }
308 7         27 };
309              
310             sub retry {
311 0     0 0 0 my ($self, $task, $retry) = @_;
312 0         0 my $w;$w = AE::timer( $self->retry_interval, 0, sub {
313 0     0   0 $task->{pos} = $task->{ofs}; # 重下本块时要 seek 回零
314 0         0 $task->{size} = 0;
315 0         0 $task->{ctx} = undef;
316 0         0 $self->fetch_block( $task, ++$retry );
317 0         0 undef $w;
318 0         0 });
319             }
320              
321             sub split_range {
322 1     1 0 3 my $self = shift;
323 1         4 my $length = shift;
324              
325             # 每个请求的段大小的范围,字节
326 1         8 my $block_size = $self->block_size;
327 1         5 my $segments = int($length / $block_size);
328              
329             # 要处理的字节的总数
330 1         3 my $len_remain = $length;
331              
332 1         2 my @ranges;
333 1         3 my $block = 0;
334 1         8 while ( $len_remain > 0 ) {
335             # 每个 segment 的大小
336 8         10 my $seg_len = $block_size;
337              
338             # 偏移长度
339 8         11 my $ofs = $length - $len_remain;
340            
341             # 剩余字节
342 8         11 $len_remain -= $seg_len;
343              
344 8         12 my $tail = $ofs + $seg_len - 1;
345 8 50       60 if ( $length-1 < $tail) {
346 0         0 $tail = $length-1;
347             }
348              
349 8         67 my $task = {
350             block => $block, # 当前块编号
351             ofs => $ofs, # 当前的偏移量
352             pos => $ofs, # 本块的起点
353             tail => $tail, # 本块的结束
354             range => 'bytes=' . $ofs . '-' . $tail,
355             size => 0, # 总共下载的长度
356             };
357              
358 8         22 $self->tasks->[$block] = $task;
359 8         29 $block++;
360             }
361             }
362              
363             1;
364              
365             __END__