| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package EV::Nats::ObjectStore; |
|
2
|
1
|
|
|
1
|
|
1347
|
use strict; |
|
|
1
|
|
|
|
|
7
|
|
|
|
1
|
|
|
|
|
38
|
|
|
3
|
1
|
|
|
1
|
|
5
|
use warnings; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
57
|
|
|
4
|
1
|
|
|
1
|
|
695
|
use Digest::SHA qw(sha256_hex); |
|
|
1
|
|
|
|
|
4124
|
|
|
|
1
|
|
|
|
|
123
|
|
|
5
|
1
|
|
|
1
|
|
8
|
use EV::Nats::JetStream; |
|
|
1
|
|
|
|
|
2
|
|
|
|
1
|
|
|
|
|
3288
|
|
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
my $CHUNK_SIZE = 128 * 1024; # 128KB default |
|
8
|
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
sub new { |
|
10
|
0
|
|
|
0
|
1
|
|
my ($class, %opts) = @_; |
|
11
|
0
|
|
0
|
|
|
|
my $js = delete $opts{js} || die "js (JetStream) required"; |
|
12
|
0
|
|
0
|
|
|
|
my $bucket = delete $opts{bucket} || die "bucket name required"; |
|
13
|
0
|
|
0
|
|
|
|
my $timeout = delete $opts{timeout} || $js->{timeout}; |
|
14
|
|
|
|
|
|
|
bless { |
|
15
|
|
|
|
|
|
|
js => $js, |
|
16
|
|
|
|
|
|
|
bucket => $bucket, |
|
17
|
|
|
|
|
|
|
stream => "OBJ_$bucket", |
|
18
|
0
|
|
0
|
|
|
|
chunk_size => $opts{chunk_size} || $CHUNK_SIZE, |
|
19
|
|
|
|
|
|
|
timeout => $timeout, |
|
20
|
|
|
|
|
|
|
}, $class; |
|
21
|
|
|
|
|
|
|
} |
|
22
|
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
sub create_bucket { |
|
24
|
0
|
|
|
0
|
1
|
|
my ($self, $opts, $cb) = @_; |
|
25
|
0
|
|
0
|
|
|
|
$opts //= {}; |
|
26
|
0
|
|
|
|
|
|
require JSON::PP; |
|
27
|
|
|
|
|
|
|
my $config = { |
|
28
|
|
|
|
|
|
|
name => $self->{stream}, |
|
29
|
|
|
|
|
|
|
subjects => [ |
|
30
|
|
|
|
|
|
|
'$O.' . $self->{bucket} . '.C.>', # chunks |
|
31
|
|
|
|
|
|
|
'$O.' . $self->{bucket} . '.M.>', # metadata |
|
32
|
|
|
|
|
|
|
], |
|
33
|
|
|
|
|
|
|
retention => 'limits', |
|
34
|
|
|
|
|
|
|
($opts->{max_bytes} ? (max_bytes => $opts->{max_bytes}) : ()), |
|
35
|
|
|
|
|
|
|
($opts->{max_age} ? (max_age => $opts->{max_age}) : ()), |
|
36
|
0
|
0
|
|
|
|
|
($opts->{replicas} ? (num_replicas => $opts->{replicas}) : ()), |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
discard => 'new', |
|
38
|
|
|
|
|
|
|
allow_rollup_hdrs => JSON::PP::true(), |
|
39
|
|
|
|
|
|
|
}; |
|
40
|
0
|
|
|
|
|
|
$self->{js}->stream_create($config, $cb); |
|
41
|
|
|
|
|
|
|
} |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
sub delete_bucket { |
|
44
|
0
|
|
|
0
|
1
|
|
my ($self, $cb) = @_; |
|
45
|
0
|
|
|
|
|
|
$self->{js}->stream_delete($self->{stream}, $cb); |
|
46
|
|
|
|
|
|
|
} |
|
47
|
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub put { |
|
49
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $data, $cb) = @_; |
|
50
|
0
|
|
|
|
|
|
my $nuid = _nuid(); |
|
51
|
0
|
|
|
|
|
|
my $sha = sha256_hex($data); |
|
52
|
0
|
|
|
|
|
|
my $size = length $data; |
|
53
|
0
|
|
|
|
|
|
my $chunks = 0; |
|
54
|
0
|
|
|
|
|
|
my $offset = 0; |
|
55
|
0
|
|
|
|
|
|
my $nats = $self->{js}{nats}; |
|
56
|
0
|
|
|
|
|
|
my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $nuid; |
|
57
|
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
# Publish chunks via JetStream (with ack for durability) |
|
59
|
0
|
|
|
|
|
|
my $chunk_errors = 0; |
|
60
|
0
|
|
|
|
|
|
my $chunks_acked = 0; |
|
61
|
|
|
|
|
|
|
|
|
62
|
|
|
|
|
|
|
# Define completion handler before starting any publishes |
|
63
|
|
|
|
|
|
|
my $on_all_chunks = sub { |
|
64
|
0
|
0
|
|
0
|
|
|
if ($chunk_errors) { |
|
65
|
0
|
|
|
|
|
|
return $cb->(undef, "$chunk_errors chunk(s) failed to publish"); |
|
66
|
|
|
|
|
|
|
} |
|
67
|
0
|
|
|
|
|
|
require JSON::PP; |
|
68
|
|
|
|
|
|
|
my $meta = JSON::PP::encode_json({ |
|
69
|
|
|
|
|
|
|
name => $name, |
|
70
|
|
|
|
|
|
|
size => $size, |
|
71
|
|
|
|
|
|
|
chunks => $chunks, |
|
72
|
|
|
|
|
|
|
nuid => $nuid, |
|
73
|
|
|
|
|
|
|
digest => "SHA-256=$sha", |
|
74
|
|
|
|
|
|
|
bucket => $self->{bucket}, |
|
75
|
0
|
|
|
|
|
|
}); |
|
76
|
0
|
|
|
|
|
|
my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name); |
|
77
|
|
|
|
|
|
|
$self->{js}->js_publish($meta_subj, $meta, sub { |
|
78
|
0
|
|
|
|
|
|
my ($ack, $err) = @_; |
|
79
|
0
|
0
|
|
|
|
|
return $cb->(undef, $err) if $err; |
|
80
|
0
|
|
|
|
|
|
$cb->({ name => $name, size => $size, chunks => $chunks, seq => $ack->{seq} }, undef); |
|
81
|
0
|
|
|
|
|
|
}); |
|
82
|
0
|
|
|
|
|
|
}; |
|
83
|
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
my $publish_chunk = sub { |
|
85
|
0
|
|
|
0
|
|
|
my $chunk = shift; |
|
86
|
|
|
|
|
|
|
$self->{js}->js_publish($chunk_subj, $chunk, sub { |
|
87
|
0
|
|
|
|
|
|
my ($ack, $err) = @_; |
|
88
|
0
|
0
|
|
|
|
|
$chunk_errors++ if $err; |
|
89
|
0
|
|
|
|
|
|
$chunks_acked++; |
|
90
|
0
|
0
|
|
|
|
|
$on_all_chunks->() if $chunks_acked >= $chunks; |
|
91
|
0
|
|
|
|
|
|
}); |
|
92
|
0
|
|
|
|
|
|
}; |
|
93
|
|
|
|
|
|
|
|
|
94
|
0
|
0
|
|
|
|
|
if ($size == 0) { |
|
95
|
0
|
|
|
|
|
|
$chunks = 1; |
|
96
|
0
|
|
|
|
|
|
$publish_chunk->(''); |
|
97
|
|
|
|
|
|
|
} else { |
|
98
|
0
|
|
|
|
|
|
while ($offset < $size) { |
|
99
|
0
|
|
|
|
|
|
my $end = $offset + $self->{chunk_size}; |
|
100
|
0
|
0
|
|
|
|
|
$end = $size if $end > $size; |
|
101
|
0
|
|
|
|
|
|
$publish_chunk->(substr($data, $offset, $end - $offset)); |
|
102
|
0
|
|
|
|
|
|
$chunks++; |
|
103
|
0
|
|
|
|
|
|
$offset = $end; |
|
104
|
|
|
|
|
|
|
} |
|
105
|
|
|
|
|
|
|
} |
|
106
|
|
|
|
|
|
|
} |
|
107
|
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
sub get { |
|
109
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $cb) = @_; |
|
110
|
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
# Get metadata via JetStream STREAM.MSG.GET |
|
112
|
0
|
|
|
|
|
|
my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name); |
|
113
|
|
|
|
|
|
|
$self->{js}->_json_api( |
|
114
|
|
|
|
|
|
|
'STREAM.MSG.GET.' . $self->{stream}, |
|
115
|
|
|
|
|
|
|
{ last_by_subj => $meta_subj }, |
|
116
|
|
|
|
|
|
|
sub { |
|
117
|
0
|
|
|
0
|
|
|
my ($resp, $err) = @_; |
|
118
|
0
|
0
|
|
|
|
|
return $cb->(undef, $err) if $err; |
|
119
|
|
|
|
|
|
|
|
|
120
|
0
|
0
|
|
|
|
|
my $msg = $resp->{message} or return $cb->(undef, undef); |
|
121
|
|
|
|
|
|
|
# Object was deleted: tombstone is on the metadata subject. |
|
122
|
0
|
0
|
|
|
|
|
return $cb->(undef, undef) |
|
123
|
|
|
|
|
|
|
if EV::Nats::JetStream::msg_is_tombstone($msg); |
|
124
|
|
|
|
|
|
|
|
|
125
|
0
|
|
|
|
|
|
require MIME::Base64; |
|
126
|
0
|
|
0
|
|
|
|
my $raw = $msg->{data} || ''; |
|
127
|
0
|
|
|
|
|
|
my $meta_json = MIME::Base64::decode_base64($raw); |
|
128
|
0
|
|
|
|
|
|
my ($meta, $derr) = EV::Nats::JetStream::decode_json_or_error($meta_json); |
|
129
|
0
|
0
|
|
|
|
|
return $cb->(undef, "invalid metadata: $derr") if $derr; |
|
130
|
|
|
|
|
|
|
|
|
131
|
0
|
|
|
|
|
|
my $nuid = $meta->{nuid}; |
|
132
|
0
|
|
0
|
|
|
|
my $expected = $meta->{chunks} || 0; |
|
133
|
0
|
|
|
|
|
|
my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $nuid; |
|
134
|
|
|
|
|
|
|
|
|
135
|
0
|
0
|
|
|
|
|
if ($expected == 0) { |
|
136
|
0
|
|
|
|
|
|
return $cb->('', undef, $meta); |
|
137
|
|
|
|
|
|
|
} |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# Fetch chunks sequentially via STREAM.MSG.GET. |
|
140
|
|
|
|
|
|
|
# next_by_subj returns first matching message with seq >= start_sequence, |
|
141
|
|
|
|
|
|
|
# so after each hit we must advance past the returned seq. |
|
142
|
0
|
|
|
|
|
|
my @chunks; |
|
143
|
0
|
|
|
|
|
|
my $seq = 1; |
|
144
|
|
|
|
|
|
|
|
|
145
|
0
|
|
|
|
|
|
my $fetch_next; |
|
146
|
|
|
|
|
|
|
$fetch_next = sub { |
|
147
|
|
|
|
|
|
|
$self->{js}->_json_api( |
|
148
|
|
|
|
|
|
|
'STREAM.MSG.GET.' . $self->{stream}, |
|
149
|
|
|
|
|
|
|
{ next_by_subj => $chunk_subj, seq => $seq }, |
|
150
|
|
|
|
|
|
|
sub { |
|
151
|
0
|
|
|
|
|
|
my ($resp, $err) = @_; |
|
152
|
0
|
0
|
|
|
|
|
if ($err) { |
|
153
|
0
|
|
|
|
|
|
return $cb->(undef, "chunk fetch error: $err", $meta); |
|
154
|
|
|
|
|
|
|
} |
|
155
|
|
|
|
|
|
|
my $msg = $resp->{message} or |
|
156
|
0
|
0
|
|
|
|
|
return $cb->(undef, "missing chunk message", $meta); |
|
157
|
0
|
|
0
|
|
|
|
my $data = MIME::Base64::decode_base64($msg->{data} || ''); |
|
158
|
0
|
|
|
|
|
|
push @chunks, $data; |
|
159
|
0
|
|
0
|
|
|
|
$seq = ($msg->{seq} || $seq) + 1; |
|
160
|
|
|
|
|
|
|
|
|
161
|
0
|
0
|
|
|
|
|
if (scalar @chunks >= $expected) { |
|
162
|
0
|
|
|
|
|
|
my $assembled = join('', @chunks); |
|
163
|
0
|
0
|
0
|
|
|
|
if ($meta->{digest} && $meta->{digest} =~ /^SHA-256=(.+)/) { |
|
164
|
0
|
0
|
|
|
|
|
if (sha256_hex($assembled) ne $1) { |
|
165
|
0
|
|
|
|
|
|
return $cb->(undef, "digest mismatch", $meta); |
|
166
|
|
|
|
|
|
|
} |
|
167
|
|
|
|
|
|
|
} |
|
168
|
0
|
|
|
|
|
|
$cb->($assembled, undef, $meta); |
|
169
|
|
|
|
|
|
|
} else { |
|
170
|
0
|
|
|
|
|
|
$fetch_next->(); |
|
171
|
|
|
|
|
|
|
} |
|
172
|
|
|
|
|
|
|
} |
|
173
|
0
|
|
|
|
|
|
); |
|
174
|
0
|
|
|
|
|
|
}; |
|
175
|
0
|
|
|
|
|
|
$fetch_next->(); |
|
176
|
|
|
|
|
|
|
} |
|
177
|
0
|
|
|
|
|
|
); |
|
178
|
|
|
|
|
|
|
} |
|
179
|
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
sub delete { |
|
181
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $cb) = @_; |
|
182
|
0
|
|
|
|
|
|
my $nats = $self->{js}{nats}; |
|
183
|
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
my $tombstone = sub { |
|
185
|
0
|
|
|
0
|
|
|
my $purge_err = shift; |
|
186
|
0
|
|
|
|
|
|
my $headers = "NATS/1.0\r\nKV-Operation: PURGE\r\nNats-Rollup: sub\r\n\r\n"; |
|
187
|
0
|
|
|
|
|
|
my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name); |
|
188
|
0
|
|
|
|
|
|
$nats->hpublish($meta_subj, $headers, ''); |
|
189
|
|
|
|
|
|
|
# Flush so the tombstone reaches the server before $cb fires; this |
|
190
|
|
|
|
|
|
|
# avoids a race with a subsequent info()/get() that would otherwise |
|
191
|
|
|
|
|
|
|
# see the pre-tombstone metadata. |
|
192
|
|
|
|
|
|
|
$nats->flush(sub { |
|
193
|
0
|
|
|
|
|
|
my ($flush_err) = @_; |
|
194
|
0
|
0
|
|
|
|
|
return unless $cb; |
|
195
|
0
|
|
0
|
|
|
|
my $err = $purge_err // $flush_err; |
|
196
|
0
|
0
|
|
|
|
|
$cb->($err ? undef : 1, $err); |
|
197
|
0
|
|
|
|
|
|
}); |
|
198
|
0
|
|
|
|
|
|
}; |
|
199
|
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
# Look up the object's nuid so we can purge its chunks. |
|
201
|
|
|
|
|
|
|
$self->info($name, sub { |
|
202
|
0
|
|
|
0
|
|
|
my ($meta, $err) = @_; |
|
203
|
0
|
0
|
0
|
|
|
|
if ($err || !$meta || !$meta->{nuid}) { |
|
|
|
|
0
|
|
|
|
|
|
204
|
0
|
|
|
|
|
|
return $tombstone->($err); |
|
205
|
|
|
|
|
|
|
} |
|
206
|
0
|
|
|
|
|
|
my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $meta->{nuid}; |
|
207
|
|
|
|
|
|
|
$self->{js}->_json_api( |
|
208
|
|
|
|
|
|
|
'STREAM.PURGE.' . $self->{stream}, |
|
209
|
|
|
|
|
|
|
{ filter => $chunk_subj }, |
|
210
|
|
|
|
|
|
|
sub { |
|
211
|
0
|
|
|
|
|
|
my ($resp, $purge_err) = @_; |
|
212
|
0
|
|
|
|
|
|
$tombstone->($purge_err); |
|
213
|
|
|
|
|
|
|
}, |
|
214
|
0
|
|
|
|
|
|
); |
|
215
|
0
|
|
|
|
|
|
}); |
|
216
|
|
|
|
|
|
|
} |
|
217
|
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
sub info { |
|
219
|
0
|
|
|
0
|
1
|
|
my ($self, $name, $cb) = @_; |
|
220
|
0
|
|
|
|
|
|
my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name); |
|
221
|
|
|
|
|
|
|
$self->{js}->_json_api( |
|
222
|
|
|
|
|
|
|
'STREAM.MSG.GET.' . $self->{stream}, |
|
223
|
|
|
|
|
|
|
{ last_by_subj => $meta_subj }, |
|
224
|
|
|
|
|
|
|
sub { |
|
225
|
0
|
|
|
0
|
|
|
my ($resp, $err) = @_; |
|
226
|
0
|
0
|
|
|
|
|
if ($err) { |
|
227
|
0
|
0
|
|
|
|
|
return $cb->(undef, undef) if $err =~ /no message found|10037/; |
|
228
|
0
|
|
|
|
|
|
return $cb->(undef, $err); |
|
229
|
|
|
|
|
|
|
} |
|
230
|
0
|
0
|
|
|
|
|
my $msg = $resp->{message} or return $cb->(undef, undef); |
|
231
|
|
|
|
|
|
|
# Tombstone: PURGE/DEL on the metadata subject -> object is gone. |
|
232
|
0
|
0
|
|
|
|
|
return $cb->(undef, undef) |
|
233
|
|
|
|
|
|
|
if EV::Nats::JetStream::msg_is_tombstone($msg); |
|
234
|
|
|
|
|
|
|
|
|
235
|
0
|
|
|
|
|
|
require MIME::Base64; |
|
236
|
0
|
|
0
|
|
|
|
my $raw = $msg->{data} || ''; |
|
237
|
0
|
|
|
|
|
|
my $meta_json = MIME::Base64::decode_base64($raw); |
|
238
|
0
|
|
|
|
|
|
my ($meta, $derr) = EV::Nats::JetStream::decode_json_or_error($meta_json); |
|
239
|
0
|
0
|
|
|
|
|
$cb->($meta, $derr ? "invalid metadata: $derr" : undef); |
|
240
|
|
|
|
|
|
|
}, |
|
241
|
0
|
|
|
|
|
|
); |
|
242
|
|
|
|
|
|
|
} |
|
243
|
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
sub list { |
|
245
|
0
|
|
|
0
|
1
|
|
my ($self, $cb) = @_; |
|
246
|
0
|
|
|
|
|
|
my $prefix = '$O.' . $self->{bucket} . '.M.'; |
|
247
|
|
|
|
|
|
|
$self->{js}->stream_info( |
|
248
|
|
|
|
|
|
|
$self->{stream}, |
|
249
|
|
|
|
|
|
|
{ subjects_filter => $prefix . '>' }, |
|
250
|
|
|
|
|
|
|
sub { |
|
251
|
0
|
|
|
0
|
|
|
my ($info, $err) = @_; |
|
252
|
0
|
0
|
|
|
|
|
return $cb->(undef, $err) if $err; |
|
253
|
0
|
|
0
|
|
|
|
my $subjects = $info->{state}{subjects} || {}; |
|
254
|
0
|
|
|
|
|
|
my $plen = length $prefix; |
|
255
|
0
|
|
|
|
|
|
my @names = map { _decode_name(substr($_, $plen)) } |
|
256
|
0
|
|
|
|
|
|
grep { substr($_, 0, $plen) eq $prefix } |
|
|
0
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
keys %$subjects; |
|
258
|
0
|
|
|
|
|
|
$cb->(\@names, undef); |
|
259
|
|
|
|
|
|
|
}, |
|
260
|
0
|
|
|
|
|
|
); |
|
261
|
|
|
|
|
|
|
} |
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
sub status { |
|
264
|
0
|
|
|
0
|
1
|
|
my ($self, $cb) = @_; |
|
265
|
|
|
|
|
|
|
$self->{js}->stream_info($self->{stream}, sub { |
|
266
|
0
|
|
|
0
|
|
|
my ($info, $err) = @_; |
|
267
|
0
|
0
|
|
|
|
|
return $cb->(undef, $err) if $err; |
|
268
|
|
|
|
|
|
|
$cb->({ |
|
269
|
|
|
|
|
|
|
bucket => $self->{bucket}, |
|
270
|
|
|
|
|
|
|
bytes => $info->{state}{bytes}, |
|
271
|
0
|
0
|
|
|
|
|
sealed => $info->{config}{sealed} ? 1 : 0, |
|
272
|
|
|
|
|
|
|
}, undef); |
|
273
|
0
|
|
|
|
|
|
}); |
|
274
|
|
|
|
|
|
|
} |
|
275
|
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
sub _nuid { |
|
277
|
0
|
|
|
0
|
|
|
my @chars = ('A'..'Z', 'a'..'z', '0'..'9'); |
|
278
|
0
|
|
|
|
|
|
join '', map { $chars[rand @chars] } 1..22; |
|
|
0
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
} |
|
280
|
|
|
|
|
|
|
|
|
281
|
0
|
|
|
0
|
|
|
sub _encode_name { my $n = $_[0]; $n =~ s/([^A-Za-z0-9._-])/sprintf("%%%02X", ord($1))/ge; $n } |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
282
|
0
|
|
|
0
|
|
|
sub _decode_name { my $n = $_[0]; $n =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge; $n } |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
1; |
|
285
|
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=head1 NAME |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
EV::Nats::ObjectStore - Chunked object store on top of NATS JetStream |
|
289
|
|
|
|
|
|
|
|
|
290
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
291
|
|
|
|
|
|
|
|
|
292
|
|
|
|
|
|
|
use EV; |
|
293
|
|
|
|
|
|
|
use EV::Nats; |
|
294
|
|
|
|
|
|
|
use EV::Nats::JetStream; |
|
295
|
|
|
|
|
|
|
use EV::Nats::ObjectStore; |
|
296
|
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
my $nats = EV::Nats->new(host => '127.0.0.1'); |
|
298
|
|
|
|
|
|
|
my $js = EV::Nats::JetStream->new(nats => $nats); |
|
299
|
|
|
|
|
|
|
my $os = EV::Nats::ObjectStore->new(js => $js, bucket => 'files'); |
|
300
|
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
$os->create_bucket({}, sub { |
|
302
|
|
|
|
|
|
|
$os->put('report.pdf', $pdf_data, sub { |
|
303
|
|
|
|
|
|
|
my ($info, $err) = @_; |
|
304
|
|
|
|
|
|
|
print "stored: $info->{size} bytes in $info->{chunks} chunks\n"; |
|
305
|
|
|
|
|
|
|
$os->get('report.pdf', sub { |
|
306
|
|
|
|
|
|
|
my ($data, $err, $meta) = @_; |
|
307
|
|
|
|
|
|
|
print "got $meta->{size} bytes back\n"; |
|
308
|
|
|
|
|
|
|
}); |
|
309
|
|
|
|
|
|
|
}); |
|
310
|
|
|
|
|
|
|
}); |
|
311
|
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
EV::run; |
|
313
|
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
An object-store bucket is a JetStream stream named CbucketE> |
|
317
|
|
|
|
|
|
|
with two subject groups: |
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
=over |
|
320
|
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
=item * C<$O.EbucketE.C.EnuidE> - opaque chunks for one |
|
322
|
|
|
|
|
|
|
object (one chunk per stream message; the nuid is generated per object). |
|
323
|
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
=item * C<$O.EbucketE.M.Eencoded-nameE> - last-write-wins |
|
325
|
|
|
|
|
|
|
metadata describing an object (name, size, chunk count, SHA-256 digest). |
|
326
|
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
=back |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
C chunks the input, publishes each chunk via C for |
|
330
|
|
|
|
|
|
|
durability, then writes a metadata entry. C fetches the metadata, |
|
331
|
|
|
|
|
|
|
walks the chunks back via C, and verifies the digest. |
|
332
|
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
=head1 METHODS |
|
334
|
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
All callbacks fire on the L loop, not synchronously. |
|
336
|
|
|
|
|
|
|
|
|
337
|
|
|
|
|
|
|
=head2 new(js => $js, bucket => $name, [chunk_size => $bytes]) |
|
338
|
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
Default C is 128 KiB. C defaults to the timeout |
|
340
|
|
|
|
|
|
|
of C<$js>. |
|
341
|
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
=head2 create_bucket(\%opts, $cb) |
|
343
|
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
Provision the underlying stream. Recognised C<\%opts>: |
|
345
|
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
=over |
|
347
|
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
=item * C - bucket-wide storage cap. |
|
349
|
|
|
|
|
|
|
|
|
350
|
|
|
|
|
|
|
=item * C - per-message TTL in nanoseconds. |
|
351
|
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
=item * C - cluster replication factor. |
|
353
|
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
=back |
|
355
|
|
|
|
|
|
|
|
|
356
|
|
|
|
|
|
|
Callback: C<($info, $err)>. |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
=head2 delete_bucket($cb) |
|
359
|
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
Tear down the underlying stream. Callback: C<($info, $err)>. |
|
361
|
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
=head2 put($name, $data, $cb) |
|
363
|
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
Store C<$data> under C<$name>, automatically chunked. Each chunk is |
|
365
|
|
|
|
|
|
|
published with JetStream ack; the metadata entry is written last so |
|
366
|
|
|
|
|
|
|
a partial upload doesn't surface a half-stored object. Callback: |
|
367
|
|
|
|
|
|
|
C<($info, $err)> where C<$info> is C<{ name, size, chunks, seq }>. |
|
368
|
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
=head2 get($name, $cb) |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
Retrieve a previously-stored object. Callback: C<($data, $err, $meta)>. |
|
372
|
|
|
|
|
|
|
C<$data> is C if the object does not exist or has been deleted |
|
373
|
|
|
|
|
|
|
(the tombstone is recognised). On digest mismatch, C<$data> is C |
|
374
|
|
|
|
|
|
|
and C<$err> is C<"digest mismatch">. |
|
375
|
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
=head2 delete($name, [$cb]) |
|
377
|
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
Looks up the object's nuid via metadata, purges all chunks under |
|
379
|
|
|
|
|
|
|
C<$O.EbucketE.C.EnuidE> via C, then |
|
380
|
|
|
|
|
|
|
publishes a C tombstone on the metadata subject |
|
381
|
|
|
|
|
|
|
followed by a C fence. Callback: C<($ok, $err)>; C<$err> is |
|
382
|
|
|
|
|
|
|
set if the chunk purge or flush failed. |
|
383
|
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
=head2 info($name, $cb) |
|
385
|
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
Fetch only the metadata entry for an object, without downloading |
|
387
|
|
|
|
|
|
|
chunks. Callback: C<(\%meta, $err)>; C<\%meta> is C if the |
|
388
|
|
|
|
|
|
|
object does not exist or was deleted (the tombstone is recognised). |
|
389
|
|
|
|
|
|
|
This is the recommended way to filter live objects out of a |
|
390
|
|
|
|
|
|
|
L result. |
|
391
|
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
=head2 list($cb) |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
List names of all live objects in the bucket. Callback: |
|
395
|
|
|
|
|
|
|
C<(\@names, $err)>. Tombstoned entries appear in the listing -- call |
|
396
|
|
|
|
|
|
|
C to filter. |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
=head2 status($cb) |
|
399
|
|
|
|
|
|
|
|
|
400
|
|
|
|
|
|
|
Returns a snapshot hashref: |
|
401
|
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
{ bucket => $name, bytes => $n, sealed => 0|1 } |
|
403
|
|
|
|
|
|
|
|
|
404
|
|
|
|
|
|
|
C reflects the underlying stream's C flag; |
|
405
|
|
|
|
|
|
|
this client never seals on its own, so unless someone manually |
|
406
|
|
|
|
|
|
|
sealed the stream out-of-band the value is always 0. |
|
407
|
|
|
|
|
|
|
|
|
408
|
|
|
|
|
|
|
Callback: C<(\%status, $err)>. |
|
409
|
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
411
|
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
L, L, L, |
|
413
|
|
|
|
|
|
|
L. |
|
414
|
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
=cut |