line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package MogileFS::Client::CallbackFile; |
2
|
2
|
|
|
2
|
|
71406
|
use strict; |
|
2
|
|
|
|
|
13
|
|
|
2
|
|
|
|
|
59
|
|
3
|
2
|
|
|
2
|
|
30
|
use warnings; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
61
|
|
4
|
2
|
|
|
2
|
|
511
|
use URI; |
|
2
|
|
|
|
|
7076
|
|
|
2
|
|
|
|
|
51
|
|
5
|
2
|
|
|
2
|
|
12
|
use Carp; |
|
2
|
|
|
|
|
3
|
|
|
2
|
|
|
|
|
155
|
|
6
|
2
|
|
|
2
|
|
470
|
use IO::Socket::INET; |
|
2
|
|
|
|
|
20769
|
|
|
2
|
|
|
|
|
24
|
|
7
|
2
|
|
|
2
|
|
2163
|
use File::Slurp qw/ slurp /; |
|
2
|
|
|
|
|
37128
|
|
|
2
|
|
|
|
|
122
|
|
8
|
2
|
|
|
2
|
|
489
|
use Try::Tiny; |
|
2
|
|
|
|
|
2012
|
|
|
2
|
|
|
|
|
113
|
|
9
|
2
|
|
|
2
|
|
15
|
use Socket qw/ SO_SNDBUF SOL_SOCKET IPPROTO_TCP /; |
|
2
|
|
|
|
|
5
|
|
|
2
|
|
|
|
|
308
|
|
10
|
2
|
|
|
2
|
|
507
|
use Time::HiRes qw/ gettimeofday tv_interval /; |
|
2
|
|
|
|
|
1327
|
|
|
2
|
|
|
|
|
14
|
|
11
|
2
|
|
|
2
|
|
1254
|
use Linux::PipeMagic qw/ syssendfile /; |
|
2
|
|
|
|
|
6134
|
|
|
2
|
|
|
|
|
124
|
|
12
|
2
|
|
|
2
|
|
739
|
use IO::AIO qw/ fadvise /; |
|
2
|
|
|
|
|
6051
|
|
|
2
|
|
|
|
|
139
|
|
13
|
2
|
|
|
2
|
|
889
|
use LWP::Simple qw/ head /; |
|
2
|
|
|
|
|
75968
|
|
|
2
|
|
|
|
|
14
|
|
14
|
|
|
|
|
|
|
|
15
|
2
|
|
|
2
|
|
338
|
use base qw/ MogileFS::Client::Async /; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
664
|
|
16
|
|
|
|
|
|
|
|
17
|
2
|
50
|
|
2
|
|
16
|
use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
116
|
|
18
|
|
|
|
|
|
|
|
19
|
2
|
|
|
2
|
|
12
|
use namespace::clean; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
13
|
|
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
=head1 NAME |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
MogileFS::Client::CallbackFile |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
=head1 SYNOPSIS |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
my $mogfs = MogileFS::Client::CallbackFile->new( ... ) |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
open(my $read_fh, "<", "...") or die ... |
30
|
|
|
|
|
|
|
my $eventual_length = -s $read_fh; |
31
|
|
|
|
|
|
|
my $f = $mogfs->store_file_from_fh($key, $class, $read_fh, $eventual_length, \%opts); |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
$f->($eventual_length, 0); # upload entire file |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
$f->($eventual_length, 1); # indicate EOF |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=head1 DESCRIPTION |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
This package inherits from L and provides an additional |
40
|
|
|
|
|
|
|
blocking API in which the data you wish to upload is read from a file when |
41
|
|
|
|
|
|
|
commanded by a callback function. This allows other processing to take place on |
42
|
|
|
|
|
|
|
data as you read it from disc or elsewhere. |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
The trackers, and storage backends, are tried repeatedly until the file is |
45
|
|
|
|
|
|
|
successfully stored, or an error is thrown. |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
The C<$key> parameter may be a closure. In this case, it is called every time |
48
|
|
|
|
|
|
|
before C is called, allowing a different key to be used if an |
49
|
|
|
|
|
|
|
upload fails, allowing for additional paranoia. |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
=head1 SEE ALSO |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=over |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=item L |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=back |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
=cut |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
sub store_file_from_fh { |
62
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
63
|
0
|
0
|
|
|
|
|
return undef if $self->{readonly}; |
64
|
|
|
|
|
|
|
|
65
|
0
|
|
|
|
|
|
my ($_key, $class, $read_fh, $eventual_length, $opts) = @_; |
66
|
0
|
|
0
|
|
|
|
$opts ||= {}; |
67
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
# Hint to Linux that doubling readahead will probably pay off. |
69
|
0
|
|
|
|
|
|
fadvise($read_fh, 0, 0, IO::AIO::FADV_SEQUENTIAL()); |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
# Extra args to be passed along with the create_open and create_close commands. |
72
|
|
|
|
|
|
|
# Any internally generated args of the same name will overwrite supplied ones in |
73
|
|
|
|
|
|
|
# these hashes. |
74
|
0
|
|
0
|
|
|
|
my $create_open_args = $opts->{create_open_args} || {}; |
75
|
0
|
|
0
|
|
|
|
my $create_close_args = $opts->{create_close_args} || {}; |
76
|
|
|
|
|
|
|
|
77
|
0
|
|
|
|
|
|
my @dests; # ( [devid,path,fid], [devid,path,fid], ... ) |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
my $key; |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
my $get_new_dest = sub { |
82
|
0
|
0
|
|
0
|
|
|
if (@dests) { |
83
|
0
|
|
|
|
|
|
return pop @dests; |
84
|
|
|
|
|
|
|
} |
85
|
|
|
|
|
|
|
|
86
|
0
|
|
|
|
|
|
foreach (1..5) { |
87
|
0
|
0
|
|
|
|
|
$key = ref($_key) eq 'CODE' ? $_key->() : $_key; |
88
|
|
|
|
|
|
|
|
89
|
0
|
|
|
|
|
|
$self->run_hook('store_file_start', $self, $key, $class, $opts); |
90
|
0
|
|
|
|
|
|
$self->run_hook('new_file_start', $self, $key, $class, $opts); |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
# Calls to the backend may be explodey. |
93
|
0
|
|
|
|
|
|
my $res; |
94
|
|
|
|
|
|
|
try { |
95
|
|
|
|
|
|
|
$res = $self->{backend}->do_request( |
96
|
|
|
|
|
|
|
create_open => { |
97
|
|
|
|
|
|
|
%$create_open_args, |
98
|
|
|
|
|
|
|
domain => $self->{domain}, |
99
|
|
|
|
|
|
|
class => $class, |
100
|
|
|
|
|
|
|
key => $key, |
101
|
0
|
|
0
|
|
|
|
fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one |
102
|
|
|
|
|
|
|
multi_dest => 1, |
103
|
|
|
|
|
|
|
size => $eventual_length, # not supported by current version |
104
|
|
|
|
|
|
|
} |
105
|
|
|
|
|
|
|
); |
106
|
|
|
|
|
|
|
} |
107
|
|
|
|
|
|
|
catch { |
108
|
0
|
|
|
|
|
|
warn "Mogile backend failed: $_"; |
109
|
0
|
0
|
|
|
|
|
$self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect'); |
110
|
0
|
|
|
|
|
|
}; |
111
|
|
|
|
|
|
|
|
112
|
0
|
0
|
|
|
|
|
unless ($res) { |
113
|
|
|
|
|
|
|
# Attempting to connect to the Mogile backend completely failed |
114
|
|
|
|
|
|
|
# let's sleep for a second to see if the problem clears. We |
115
|
|
|
|
|
|
|
# don't sleep for other errors as we'll arrive back here if the |
116
|
|
|
|
|
|
|
# network fails eventually. |
117
|
0
|
|
|
|
|
|
sleep 1; |
118
|
0
|
|
|
|
|
|
next; |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
|
121
|
0
|
|
|
|
|
|
for my $i (1..$res->{dev_count}) { |
122
|
|
|
|
|
|
|
push @dests, { |
123
|
|
|
|
|
|
|
devid => $res->{"devid_$i"}, |
124
|
|
|
|
|
|
|
path => $res->{"path_$i"}, |
125
|
|
|
|
|
|
|
fid => $res->{fid}, |
126
|
0
|
|
|
|
|
|
}; |
127
|
|
|
|
|
|
|
} |
128
|
0
|
0
|
|
|
|
|
if (@dests) { |
129
|
0
|
|
|
|
|
|
return pop @dests; |
130
|
|
|
|
|
|
|
} |
131
|
|
|
|
|
|
|
} |
132
|
0
|
|
|
|
|
|
die "Fail to get a destination to write to."; |
133
|
0
|
|
|
|
|
|
}; |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# When we have a hiccough in your connection, we mark $socket as undef to |
136
|
|
|
|
|
|
|
# indicate that we should reconnect. |
137
|
0
|
|
|
|
|
|
my $socket; |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
# We keep track of where we last wrote to. |
141
|
|
|
|
|
|
|
my $last_written_point; |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# The pointing to the arrayref we're currently writing to. |
144
|
0
|
|
|
|
|
|
my $current_dest; |
145
|
0
|
|
|
|
|
|
my $create_close_timed_out; |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
return sub { |
148
|
0
|
|
|
0
|
|
|
my ($available_to_read, $eof, $checksum) = @_; |
149
|
|
|
|
|
|
|
|
150
|
0
|
|
|
|
|
|
my $last_error; |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
my $fail_write_attempt = sub { |
153
|
0
|
|
|
|
|
|
my ($msg) = @_; |
154
|
0
|
|
0
|
|
|
|
$last_error = $msg || "unknown error"; |
155
|
|
|
|
|
|
|
|
156
|
0
|
0
|
|
|
|
|
if ($opts->{on_failure}) { |
157
|
|
|
|
|
|
|
$opts->{on_failure}->({ |
158
|
|
|
|
|
|
|
url => $current_dest ? $current_dest->{path} : undef, |
159
|
0
|
0
|
|
|
|
|
bytes_sent => $last_written_point, |
160
|
|
|
|
|
|
|
total_bytes => $eventual_length, |
161
|
|
|
|
|
|
|
client => 'callbackfile', |
162
|
|
|
|
|
|
|
error => $msg, |
163
|
|
|
|
|
|
|
}); |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
|
166
|
0
|
|
|
|
|
|
warn $msg; |
167
|
0
|
|
|
|
|
|
$socket = undef; |
168
|
0
|
|
|
|
|
|
$last_written_point = 0; |
169
|
0
|
|
|
|
|
|
}; |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
|
172
|
0
|
|
|
|
|
|
foreach (1..5) { |
173
|
0
|
|
|
|
|
|
$last_error = undef; |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Create a connection to the storage backend |
176
|
0
|
0
|
|
|
|
|
unless ($socket) { |
177
|
0
|
0
|
|
|
|
|
sysseek($read_fh, 0, 0) or die "seek failed: $!"; |
178
|
|
|
|
|
|
|
try { |
179
|
0
|
|
|
|
|
|
$last_written_point = 0; |
180
|
0
|
|
|
|
|
|
$current_dest = $get_new_dest->(); |
181
|
|
|
|
|
|
|
|
182
|
0
|
0
|
|
|
|
|
$opts->{on_new_attempt}->($current_dest) if $opts->{on_new_attempt}; |
183
|
|
|
|
|
|
|
|
184
|
0
|
|
|
|
|
|
my $uri = URI->new($current_dest->{path}); |
185
|
|
|
|
|
|
|
$socket = IO::Socket::INET->new( |
186
|
|
|
|
|
|
|
Timeout => 10, |
187
|
|
|
|
|
|
|
Proto => "tcp", |
188
|
|
|
|
|
|
|
PeerPort => $uri->port, |
189
|
|
|
|
|
|
|
PeerHost => $uri->host, |
190
|
0
|
0
|
|
|
|
|
) or die "connect to ".$current_dest->{path}." failed: $!"; |
191
|
|
|
|
|
|
|
|
192
|
0
|
0
|
|
|
|
|
$opts->{on_connect}->() if $opts->{on_connect}; |
193
|
|
|
|
|
|
|
|
194
|
0
|
|
|
|
|
|
my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $eventual_length\r\n\r\n"; |
195
|
0
|
0
|
0
|
|
|
|
setsockopt($socket, SOL_SOCKET, SO_SNDBUF, 65536) or warn "could not enlarge socket buffer: $!" if (unpack("I", getsockopt($socket, SOL_SOCKET, SO_SNDBUF)) < 65536); |
196
|
0
|
0
|
|
|
|
|
setsockopt($socket, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK; |
197
|
0
|
0
|
|
|
|
|
syswrite($socket, $buf)==length($buf) or die "Could not write all: $!"; |
198
|
|
|
|
|
|
|
} |
199
|
|
|
|
|
|
|
catch { |
200
|
0
|
|
|
|
|
|
$fail_write_attempt->($_); |
201
|
0
|
|
|
|
|
|
}; |
202
|
|
|
|
|
|
|
} |
203
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
# Write as much data as we have |
205
|
0
|
0
|
|
|
|
|
if ($socket) { |
206
|
0
|
|
|
|
|
|
my $bytes_to_write = $available_to_read - $last_written_point; |
207
|
0
|
|
|
|
|
|
my $block_size = $bytes_to_write; |
208
|
|
|
|
|
|
|
|
209
|
0
|
|
|
|
|
|
SENDFILE: while ($bytes_to_write > 0) { |
210
|
0
|
|
|
|
|
|
my $c = syssendfile($socket, $read_fh, $block_size); |
211
|
0
|
0
|
0
|
|
|
|
if ($c > 0) { |
|
|
0
|
|
|
|
|
|
212
|
0
|
|
|
|
|
|
$last_written_point += $c; |
213
|
0
|
|
|
|
|
|
$bytes_to_write -= $c; |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
elsif ($c == -1 && $block_size > 1024*1024) { |
216
|
|
|
|
|
|
|
# 32 bit kernels won't even allow you to send more than 2Gb, it seems. |
217
|
|
|
|
|
|
|
# Retry with a smaller block size. |
218
|
0
|
|
|
|
|
|
$block_size = 1024*1024; |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
else { |
221
|
0
|
|
|
|
|
|
$fail_write_attempt->($_); |
222
|
0
|
|
|
|
|
|
warn "syssendfile failed, only $c out of $bytes_to_write written: $!"; |
223
|
0
|
|
|
|
|
|
last SENDFILE; |
224
|
|
|
|
|
|
|
} |
225
|
|
|
|
|
|
|
} |
226
|
|
|
|
|
|
|
|
227
|
0
|
0
|
|
|
|
|
if ($bytes_to_write < 0) { |
228
|
0
|
|
|
|
|
|
die "unpossible!"; |
229
|
|
|
|
|
|
|
} |
230
|
|
|
|
|
|
|
} |
231
|
|
|
|
|
|
|
|
232
|
0
|
0
|
0
|
|
|
|
if ($socket && $eof) { |
|
|
0
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
|
setsockopt($socket, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK: $!" if TCP_CORK; |
234
|
0
|
0
|
|
|
|
|
shutdown($socket, 1) or warn "could not shutdown socket: $!"; |
235
|
0
|
0
|
|
|
|
|
die "File is longer than initially declared, is it still being written to? We are at $last_written_point, $eventual_length initially declared" if ($last_written_point > $eventual_length); |
236
|
0
|
0
|
|
|
|
|
die "Cannot be at eof, only $last_written_point out of $eventual_length written!" unless ($last_written_point == $eventual_length); |
237
|
|
|
|
|
|
|
|
238
|
0
|
|
|
|
|
|
$self->run_hook('new_file_end', $self, $key, $class, $opts); |
239
|
|
|
|
|
|
|
|
240
|
0
|
|
|
|
|
|
my $buf; |
241
|
|
|
|
|
|
|
try { |
242
|
0
|
|
|
|
|
|
$buf = slurp($socket); |
243
|
|
|
|
|
|
|
} |
244
|
|
|
|
|
|
|
catch { |
245
|
0
|
|
|
|
|
|
warn $_; |
246
|
0
|
|
|
|
|
|
}; |
247
|
|
|
|
|
|
|
|
248
|
0
|
0
|
|
|
|
|
if (!defined($buf)) { |
249
|
0
|
|
|
|
|
|
$fail_write_attempt->("slurp failed"); |
250
|
0
|
|
|
|
|
|
next; |
251
|
|
|
|
|
|
|
} |
252
|
|
|
|
|
|
|
|
253
|
0
|
0
|
|
|
|
|
unless(close($socket)) { |
254
|
0
|
|
|
|
|
|
$fail_write_attempt->($!); |
255
|
0
|
|
|
|
|
|
warn "could not close socket: $!"; |
256
|
0
|
|
|
|
|
|
next; |
257
|
|
|
|
|
|
|
} |
258
|
|
|
|
|
|
|
|
259
|
0
|
|
|
|
|
|
my ($top, @headers) = split /\r?\n/, $buf; |
260
|
0
|
0
|
|
|
|
|
if ($top =~ m{HTTP/1.[01]\s+2\d\d}) { |
261
|
|
|
|
|
|
|
# Woo, 200! |
262
|
|
|
|
|
|
|
|
263
|
0
|
0
|
|
|
|
|
$opts->{on_http_done}->() if $opts->{http_done}; |
264
|
|
|
|
|
|
|
|
265
|
0
|
|
|
|
|
|
my @cs; |
266
|
|
|
|
|
|
|
|
267
|
0
|
0
|
0
|
|
|
|
if (!$checksum) { |
|
|
0
|
|
|
|
|
|
268
|
|
|
|
|
|
|
try { |
269
|
|
|
|
|
|
|
# XXX - What's the timeout here. |
270
|
0
|
|
|
|
|
|
my $probe_length = (head($current_dest->{path}))[1]; |
271
|
0
|
0
|
|
|
|
|
die "probe failed: $probe_length vs $eventual_length" if $probe_length != $eventual_length; |
272
|
|
|
|
|
|
|
} |
273
|
|
|
|
|
|
|
catch { |
274
|
0
|
|
|
|
|
|
$fail_write_attempt->("HEAD check on newly written file failed: $_"); |
275
|
0
|
|
|
|
|
|
}; |
276
|
|
|
|
|
|
|
# No checksum to supply, but we have at least checked the length. |
277
|
|
|
|
|
|
|
} |
278
|
|
|
|
|
|
|
elsif ($checksum && $create_close_timed_out) { |
279
|
|
|
|
|
|
|
try { |
280
|
0
|
|
|
|
|
|
my $md5 = Digest::MD5->new(); |
281
|
0
|
|
|
|
|
|
my $req = HTTP::Request->new(GET => $current_dest->{path}); |
282
|
0
|
|
|
|
|
|
LWP::UserAgent->new->request($req, sub { $md5->add($_[0]) }); |
|
0
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
|
284
|
0
|
|
|
|
|
|
my $hex_checked = $md5->hexdigest(); |
285
|
0
|
0
|
|
|
|
|
die "Got $hex_checked, expected $checksum" if "MD5:$hex_checked" ne $checksum; |
286
|
|
|
|
|
|
|
} |
287
|
|
|
|
|
|
|
catch { |
288
|
0
|
|
|
|
|
|
$fail_write_attempt->("Cross network checksum failed: $_"); |
289
|
0
|
|
|
|
|
|
}; |
290
|
0
|
|
|
|
|
|
@cs = ( checksum => $checksum, checksumverify => 0 ); |
291
|
|
|
|
|
|
|
} |
292
|
|
|
|
|
|
|
else { |
293
|
0
|
|
|
|
|
|
@cs = ( checksum => $checksum, checksumverify => 1 ); |
294
|
|
|
|
|
|
|
} |
295
|
|
|
|
|
|
|
|
296
|
0
|
0
|
|
|
|
|
if (defined $last_error) { |
297
|
0
|
|
|
|
|
|
next; |
298
|
|
|
|
|
|
|
} |
299
|
|
|
|
|
|
|
|
300
|
0
|
|
|
|
|
|
my $rv; |
301
|
0
|
|
|
|
|
|
my $ts_sent_create_close = [gettimeofday]; |
302
|
|
|
|
|
|
|
try { |
303
|
|
|
|
|
|
|
$rv = $self->{backend}->do_request |
304
|
|
|
|
|
|
|
("create_close", { |
305
|
|
|
|
|
|
|
fid => $current_dest->{fid}, |
306
|
|
|
|
|
|
|
devid => $current_dest->{devid}, |
307
|
|
|
|
|
|
|
domain => $self->{domain}, |
308
|
|
|
|
|
|
|
size => $eventual_length, |
309
|
|
|
|
|
|
|
key => $key, |
310
|
|
|
|
|
|
|
path => $current_dest->{path}, |
311
|
0
|
|
|
|
|
|
@cs, |
312
|
|
|
|
|
|
|
}); |
313
|
|
|
|
|
|
|
} |
314
|
|
|
|
|
|
|
catch { |
315
|
0
|
|
|
|
|
|
warn "create_close exploded: $_"; |
316
|
0
|
0
|
|
|
|
|
$self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect'); |
317
|
0
|
|
|
|
|
|
}; |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
# TODO we used to have a file check to query the size of the |
320
|
|
|
|
|
|
|
# file which we just uploaded to MogileFS. |
321
|
|
|
|
|
|
|
|
322
|
0
|
0
|
0
|
|
|
|
if ($rv) { |
|
|
0
|
0
|
|
|
|
|
323
|
0
|
|
|
|
|
|
$self->run_hook('store_file_end', $self, $key, $class, $opts); |
324
|
0
|
|
|
|
|
|
return $eventual_length; |
325
|
|
|
|
|
|
|
} |
326
|
|
|
|
|
|
|
elsif (!$create_close_timed_out && $checksum && tv_interval($ts_sent_create_close) >= $self->{backend}->{timeout}) { |
327
|
0
|
|
|
|
|
|
@dests = (); |
328
|
0
|
|
|
|
|
|
$create_close_timed_out = 1; |
329
|
0
|
|
|
|
|
|
$fail_write_attempt->("create_close failed, possibly timed out checksumming"); |
330
|
|
|
|
|
|
|
} |
331
|
|
|
|
|
|
|
else { |
332
|
|
|
|
|
|
|
# create_close may explode due to a back checksum, |
333
|
|
|
|
|
|
|
# or a network error sending the acknowledgement of |
334
|
|
|
|
|
|
|
# a successfuly upload. To handle this. if |
335
|
|
|
|
|
|
|
# create_close fails we always retry with a new |
336
|
|
|
|
|
|
|
# create_open to get a new FID. |
337
|
0
|
|
|
|
|
|
@dests = (); |
338
|
0
|
|
|
|
|
|
$fail_write_attempt->("create_close failed"); |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
} |
341
|
|
|
|
|
|
|
else { |
342
|
0
|
|
|
|
|
|
$fail_write_attempt->("Got non-200 from remote server $top"); |
343
|
0
|
|
|
|
|
|
next; |
344
|
|
|
|
|
|
|
} |
345
|
|
|
|
|
|
|
} |
346
|
|
|
|
|
|
|
elsif ($last_written_point == $available_to_read) { |
347
|
0
|
|
|
|
|
|
return; |
348
|
|
|
|
|
|
|
} |
349
|
|
|
|
|
|
|
} |
350
|
|
|
|
|
|
|
|
351
|
0
|
|
|
|
|
|
die "Mogile write failed: $last_error"; |
352
|
0
|
|
|
|
|
|
}; |
353
|
|
|
|
|
|
|
} |
354
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
sub store_file { |
356
|
0
|
|
|
0
|
1
|
|
my ($self, $key, $class, $fn, $opts) = @_; |
357
|
|
|
|
|
|
|
|
358
|
0
|
0
|
|
|
|
|
if (ref($fn)) { |
359
|
0
|
|
|
|
|
|
warn "not scalar!"; |
360
|
0
|
|
|
|
|
|
return $self->SUPER::store_file($key, $class, $fn, $opts); |
361
|
|
|
|
|
|
|
} |
362
|
|
|
|
|
|
|
|
363
|
0
|
0
|
|
|
|
|
open(my $fh, "<", $fn) or die "could not open '$fn': $!"; |
364
|
|
|
|
|
|
|
|
365
|
0
|
|
|
|
|
|
my $file_length = -s $fh; |
366
|
|
|
|
|
|
|
|
367
|
0
|
|
|
|
|
|
my $cb = $self->store_file_from_fh( |
368
|
|
|
|
|
|
|
$key, $class, $fh, $file_length, $opts |
369
|
|
|
|
|
|
|
); |
370
|
|
|
|
|
|
|
|
371
|
0
|
0
|
|
|
|
|
open(my $checksum, "-|", "md5sum", "-b", "--", $fn) or die "could not fork off md5sum: $!"; |
372
|
0
|
|
|
|
|
|
$cb->($file_length, 0); |
373
|
0
|
|
|
|
|
|
my $line = <$checksum>; |
374
|
0
|
0
|
|
|
|
|
close($checksum) or die "could not finish checksum: $!"; |
375
|
|
|
|
|
|
|
|
376
|
0
|
0
|
|
|
|
|
$line =~ /^([0-9a-f]{32})/ or die "could not find checksum"; |
377
|
|
|
|
|
|
|
|
378
|
0
|
|
|
|
|
|
$cb->($file_length, 1, "MD5:$1"); |
379
|
|
|
|
|
|
|
|
380
|
0
|
|
|
|
|
|
return $file_length; |
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
1; |
384
|
|
|
|
|
|
|
|