File Coverage

blib/lib/AnyEvent/MultiDownload.pm
Criterion Covered Total %
statement 131 144 90.9
branch 27 48 56.2
condition 22 42 52.3
subroutine 20 22 90.9
pod 1 7 14.2
total 201 263 76.4


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2             package AnyEvent::MultiDownload;
3 4     4   120247 use strict;
  4         8  
  4         142  
4 4     4   2578 use utf8;
  4         35  
  4         17  
5 4     4   1707 use Moo;
  4         36357  
  4         23  
6 4     4   5469 use AE;
  4         21634  
  4         110  
7 4     4   1497 use Asset::File;
  4         128491  
  4         127  
8 4     4   1942 use AnyEvent::Digest;
  4         27528  
  4         125  
9 4     4   26 use List::Util qw/shuffle/;
  4         6  
  4         394  
10 4     4   2412 use AnyEvent::HTTP qw/http_get/;
  4         88767  
  4         7633  
11              
12             our $VERSION = '1.11';
13              
14             has path => (
15             is => 'ro',
16             isa => sub {
17             die "文件存在" if -e $_[0];
18             },
19             required => 1,
20             );
21              
22             has url => (
23             is => 'ro',
24             required => 1,
25             );
26              
27              
28             has mirror => (
29             is => 'rw',
30             predicate => 1,
31             isa => sub {
32             return 1 if ref $_[0] eq 'ARRAY';
33             },
34             );
35              
36             has digest => (
37             is => 'rw',
38             isa => sub {
39             return 1 if $_[0] =~ /Digest::(SHA|MD5)/;
40             }
41             );
42              
43              
44             has on_finish => (
45             is => 'rw',
46             required => 1,
47             isa => sub {
48             return 2 if ref $_[0] eq 'CODE';
49             }
50             );
51              
52             has on_error => (
53             is => 'rw',
54             isa => sub {
55             return 2 if ref $_[0] eq 'CODE';
56             },
57             default => sub { sub { 1 } },
58             );
59              
60             has on_block_finish => (
61             is => 'rw',
62             isa => sub {
63             return 2 if ref $_[0] eq 'CODE';
64             },
65             default => sub { return sub {1} }
66             );
67              
68             has cv => (
69             is => 'rw',
70             lazy => 1,
71             default => sub { AE::cv },
72             );
73              
74             has fh => (
75             is => 'rw',
76             lazy => 1,
77             default => sub {
78             return Asset::File->new;
79             },
80             );
81              
82             has retry_interval => is => 'rw', default => sub { 10 };
83             has max_retries => is => 'rw', default => sub { 3 };
84             has block_size => is => 'rw', default => sub { 1 * 1024 * 1024 };
85             has timeout => is => 'rw', default => sub { 10 };
86             has recurse => is => 'rw', default => sub { 6 };
87             has headers => is => 'rw', default => sub {{}};
88             has tasks => is => 'rw', default => sub { [] };
89             has error => is => 'rw', default => sub {};
90             has task_lists => is => 'rw', default => sub {[]};
91             has max_per_host => is => 'rw', default => sub { 8 };
92             has url_status => (
93             is => 'rw',
94             lazy => 1,
95             default => sub {
96             my $self = shift;
97             my %hash;
98             if ($self->has_mirror) {
99             %hash = map {
100             $_ => 0
101             } @{ $self->mirror }, $self->url;
102             }
103             else {
104             $hash{$self->url} = 0;
105             }
106             return \%hash;
107             }
108             );
109              
110             sub start {
111 3     3 1 49 my $self = shift;
112 3         7 my $cb = shift;
113            
114             $self->cv->cb(sub{
115 3     3   107 my $cv = shift;
116 3         47 my ($info, $hdr) = $cv->recv;
117 3 100       45 if ($info) {
118 2         30 AE::log debug => $info;
119 2         4301 return $self->on_error->($info, $hdr);
120             }
121 1         30 $self->fh->move_to($self->path);
122 1         218 $self->on_finish->($self->fh->size);
123 3         10 });
124              
125 3         327 $self->cv->begin;
126 3         62 $self->first_request(0);
127             }
128              
129              
130             sub first_request {
131 4     4 0 25 my ($self, $retry) = @_;
132 4         30 my $url = $self->shuffle_url;
133              
134 4         8 my $first_task;
135             my $ev; $ev = http_get $url,
136             headers => $self->headers,
137             timeout => $self->timeout,
138             recurse => $self->recurse,
139             on_header => sub {
140 2     2   77614 my ($hdr) = @_;
141 2 100       13 if ( $hdr->{Status} == 200 ) {
142 1         4 my $len = $hdr->{'content-length'};
143 1 50       4 return $self->cv->send(("Cannot find a content-length header.", $hdr)) if !defined($len);
144              
145             # 准备开始下载的信息
146 1         7 my $ranges = $self->split_range($len);
147              
148             # 除了第一个块, 其它块现在开始下载
149             # 事件开始, 但这个回调会在最后才调用.
150 1         1 $first_task = shift @{ $self->tasks };
  1         4  
151 1   50     11 $first_task->{block} = $first_task->{block} || 0;
152 1   50     5 $first_task->{ofs} = $first_task->{ofs} || 0;
153 1 50       5 return 1 if $len <= $self->block_size;
154              
155 1         5 for ( 1 .. $self->max_per_host ) {
156 8         1842 my $block_task = shift @{ $self->tasks };
  8         24  
157 8 100       21 last unless defined $block_task;
158 7         190 $self->cv->begin;
159 7         89 $self->fetch_block($block_task) ;
160              
161             }
162             }
163             1
164 2         6 },
165             on_body => sub {
166 2     2   151 my ($partial_body, $hdr) = @_;
167 2 100       13 if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
168             # 如果是第一个块的话, 下载到指定的大小就需要断开
169 1 50 33     18 if ( ( $hdr->{'content-length'} <= $self->block_size and $first_task->{size} == $hdr->{'content-length'} )
      33        
170             or
171             $first_task->{size} >= $self->block_size
172             ) {
173              
174 1 50       21 $self->cv->send(("The 0 block the compared failure", $hdr))
    50          
175             if !$self->on_block_finish->( $hdr, $first_task, $self->digest ? $first_task->{ctx}->hexdigest : '');
176 1         2815 $self->cv->end;
177 1         16 return 0
178             }
179             }
180 1         9 return 1;
181             },
182             sub {
183 4     4   36369 my (undef, $hdr) = @_;
184 4         12 undef $ev;
185 4         16 my $status = $hdr->{Status};
186             # on_body 正常的下载
187 4 50 66     205 return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200 or $hdr->{Status} == 416;
      66        
      66        
188              
189 3 100 66     68 if ( ($status == 500 or $status == 503 or $status =~ /^59/) and $retry < $self->max_retries ) {
      100        
190 1         2 my $w; $w = AE::timer( $self->retry_interval, 0, sub {
191 1         993413 $first_task->{pos} = $first_task->{ofs}; # 重下本块时要 seek 回零
192 1         12 $first_task->{size} = 0;
193 1         60 $first_task->{ctx} = undef;
194 1         10 $self->first_request(++$retry);
195 1         1036 undef $w;
196 1         20 });
197 1         11 AE::log debug => "地址 $url 的块 0 下载出错, 重试";
198 1         6397 return;
199             }
200              
201 2 50       79 return $self->cv->send((
    50          
202             sprintf("Status: %s, Reason: %s.", $status ? $status : '500', $hdr->{Reason} ? $hdr->{Reason} : ' '),
203             $hdr)
204             );
205             }
206 4         229 }
207              
208              
209             sub shuffle_url {
210 11     11 0 28 my $self = shift;
211 11         201 my $urls = $self->url_status;
212 11         113 return (shuffle keys %$urls)[0];
213             }
214              
215             sub on_body {
216 9     9 0 12 my ($self, $task) = @_;
217             return sub {
218 9     9   6565 my ($partial_body, $hdr) = @_;
219 9 100 100     60 return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);
220              
221 8         9 my $len = length($partial_body);
222             # 主要是用于解决第一个块会超过写的位置
223 8 50       42 if ( $task->{size} + $len > $self->block_size ) {
224 8         22 my $spsize = $len - ( $task->{size} + $len - $self->block_size );
225 8         24 $partial_body = substr($partial_body, 0, $spsize);
226 8         10 $len = $spsize;
227             }
228              
229 8         180 $self->fh->start_range($task->{pos});
230 8         1842 $self->fh->add_chunk($partial_body);
231              
232 8 50       2095 if ( $self->digest ) {
233 8   33     697 $task->{ctx} ||= AnyEvent::Digest->new($self->digest);
234 8         801 $task->{ctx}->add_async($partial_body);
235             }
236              
237 8         508 $task->{pos} += $len;
238 8         13 $task->{size} += $len;
239 8         17 return 1;
240             }
241 9         109 }
242              
243             sub fetch_block {
244 7     7 0 9 my ($self, $task, $retry) = @_;
245 7   50     24 $retry ||= 0;
246 7         18 my $url = $self->shuffle_url;
247              
248 7         12 my $ev; $ev = http_get $url,
  7         27  
249             timeout => $self->timeout,
250             recurse => $self->recurse,
251             persistent => 1,
252             keepalive => 1,
253             headers => {
254             %{ $self->headers },
255             Range => $task->{range}
256             },
257             on_body => $self->on_body($task),
258             sub {
259 7     7   2215 my ($hdl, $hdr) = @_;
260 7         18 my $status = $hdr->{Status};
261 7         10 undef $ev;
262              
263             # 成功下载到的流程
264             # 1. 需要对比大小是否一致, 接着对比块较检
265             # 2. 开始下一个任务的下载
266             # 3. 当前块就退出, 不然下面会重试
267 7 50 33     19 if ( $status == 200 || $status == 206 ) { # 第一个块, 这二个都有可能
268             # not ok 块较检不相等 | 直接失败
269 7 50 33     293 return $self->cv->send(("The $task->{block} block the compared failure", $hdr))
    50          
270             if ($task->{size} != ( $task->{tail} -$task->{ofs} + 1 )
271             or !$self->on_block_finish->($hdl, $task, $self->digest ? $task->{ctx}->hexdigest : ''));
272 7         14318 my $block_task = shift @{ $self->tasks };
  7         31  
273             # 完成, 标记结束本次请求
274             # ok 大小相等, 块较检相等, 当前块下载完成, 开始下载新的
275 7         57 AE::log debug => "地址 $url 的块 $task->{block} 下载完成 $$";
276             # 处理接下来的一个请求
277 7 50       5182 $block_task ? $self->fetch_block($block_task) : $self->cv->end;
278 7         2228 return;
279             }
280              
281             # 是否重试的流程
282 0 0       0 my $error = sprintf(
    0          
283             "Block %s the size is wrong, expect the size: %s actual size: %s, The %s try again, Status: %s, Reason: %s.",
284             $task->{block},
285             $self->block_size,
286             $task->{size},
287             $retry,
288             $status ? $status : '500',
289             $hdr->{Reason} ? $hdr->{Reason} : ' ', );
290 0         0 AE::log warn => $error;
291            
292             # 失败
293             # 如果有可能还连接上的响应, 就需要重试, 直到达到重试, 如果不可能连接的响应, 就直接快速的退出
294 0 0 0     0 return $self->cv->send(($error, $hdr))
295             if $status !~ /^(59.|503|500|502|200|206|)$/ or $retry > $self->max_retries;
296            
297 0         0 $self->retry($task, $retry);
298             }
299 7         23 };
300              
301             sub retry {
302 0     0 0 0 my ($self, $task, $retry) = @_;
303 0         0 my $w;$w = AE::timer( $self->retry_interval, 0, sub {
304 0     0   0 $task->{pos} = $task->{ofs}; # 重下本块时要 seek 回零
305 0         0 $task->{size} = 0;
306 0         0 $task->{ctx} = undef;
307 0         0 $self->fetch_block( $task, ++$retry );
308 0         0 undef $w;
309 0         0 });
310             }
311              
312             sub split_range {
313 1     1 0 2 my $self = shift;
314 1         2 my $length = shift;
315              
316             # 每个请求的段大小的范围,字节
317 1         9 my $block_size = $self->block_size;
318 1         4 my $segments = int($length / $block_size);
319              
320             # 要处理的字节的总数
321 1         2 my $len_remain = $length;
322              
323 1         1 my @ranges;
324 1         2 my $block = 0;
325 1         9 while ( $len_remain > 0 ) {
326             # 每个 segment 的大小
327 8         7 my $seg_len = $block_size;
328              
329             # 偏移长度
330 8         5 my $ofs = $length - $len_remain;
331            
332             # 剩余字节
333 8         6 $len_remain -= $seg_len;
334              
335 8         7 my $tail = $ofs + $seg_len - 1;
336 8 50       13 if ( $length-1 < $tail) {
337 0         0 $tail = $length-1;
338             }
339              
340 8         34 my $task = {
341             block => $block, # 当前块编号
342             ofs => $ofs, # 当前的偏移量
343             pos => $ofs, # 本块的起点
344             tail => $tail, # 本块的结束
345             range => 'bytes=' . $ofs . '-' . $tail,
346             size => 0, # 总共下载的长度
347             };
348              
349 8         16 $self->tasks->[$block] = $task;
350 8         15 $block++;
351             }
352             }
353              
354             1;
355              
356             __END__