File Coverage

blib/lib/EV/Nats/ObjectStore.pm
Criterion Covered Total %
statement 12 158 7.5
branch 0 60 0.0
condition 0 37 0.0
subroutine 4 24 16.6
pod 9 9 100.0
total 25 288 8.6


line stmt bran cond sub pod time code
1             package EV::Nats::ObjectStore;
2 1     1   1347 use strict;
  1         7  
  1         38  
3 1     1   5 use warnings;
  1         2  
  1         57  
4 1     1   695 use Digest::SHA qw(sha256_hex);
  1         4124  
  1         123  
5 1     1   8 use EV::Nats::JetStream;
  1         2  
  1         3288  
6              
7             my $CHUNK_SIZE = 128 * 1024; # 128KB default
8              
9             sub new {
10 0     0 1   my ($class, %opts) = @_;
11 0   0       my $js = delete $opts{js} || die "js (JetStream) required";
12 0   0       my $bucket = delete $opts{bucket} || die "bucket name required";
13 0   0       my $timeout = delete $opts{timeout} || $js->{timeout};
14             bless {
15             js => $js,
16             bucket => $bucket,
17             stream => "OBJ_$bucket",
18 0   0       chunk_size => $opts{chunk_size} || $CHUNK_SIZE,
19             timeout => $timeout,
20             }, $class;
21             }
22              
23             sub create_bucket {
24 0     0 1   my ($self, $opts, $cb) = @_;
25 0   0       $opts //= {};
26 0           require JSON::PP;
27             my $config = {
28             name => $self->{stream},
29             subjects => [
30             '$O.' . $self->{bucket} . '.C.>', # chunks
31             '$O.' . $self->{bucket} . '.M.>', # metadata
32             ],
33             retention => 'limits',
34             ($opts->{max_bytes} ? (max_bytes => $opts->{max_bytes}) : ()),
35             ($opts->{max_age} ? (max_age => $opts->{max_age}) : ()),
36 0 0         ($opts->{replicas} ? (num_replicas => $opts->{replicas}) : ()),
    0          
    0          
37             discard => 'new',
38             allow_rollup_hdrs => JSON::PP::true(),
39             };
40 0           $self->{js}->stream_create($config, $cb);
41             }
42              
43             sub delete_bucket {
44 0     0 1   my ($self, $cb) = @_;
45 0           $self->{js}->stream_delete($self->{stream}, $cb);
46             }
47              
48             sub put {
49 0     0 1   my ($self, $name, $data, $cb) = @_;
50 0           my $nuid = _nuid();
51 0           my $sha = sha256_hex($data);
52 0           my $size = length $data;
53 0           my $chunks = 0;
54 0           my $offset = 0;
55 0           my $nats = $self->{js}{nats};
56 0           my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $nuid;
57              
58             # Publish chunks via JetStream (with ack for durability)
59 0           my $chunk_errors = 0;
60 0           my $chunks_acked = 0;
61              
62             # Define completion handler before starting any publishes
63             my $on_all_chunks = sub {
64 0 0   0     if ($chunk_errors) {
65 0           return $cb->(undef, "$chunk_errors chunk(s) failed to publish");
66             }
67 0           require JSON::PP;
68             my $meta = JSON::PP::encode_json({
69             name => $name,
70             size => $size,
71             chunks => $chunks,
72             nuid => $nuid,
73             digest => "SHA-256=$sha",
74             bucket => $self->{bucket},
75 0           });
76 0           my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name);
77             $self->{js}->js_publish($meta_subj, $meta, sub {
78 0           my ($ack, $err) = @_;
79 0 0         return $cb->(undef, $err) if $err;
80 0           $cb->({ name => $name, size => $size, chunks => $chunks, seq => $ack->{seq} }, undef);
81 0           });
82 0           };
83              
84             my $publish_chunk = sub {
85 0     0     my $chunk = shift;
86             $self->{js}->js_publish($chunk_subj, $chunk, sub {
87 0           my ($ack, $err) = @_;
88 0 0         $chunk_errors++ if $err;
89 0           $chunks_acked++;
90 0 0         $on_all_chunks->() if $chunks_acked >= $chunks;
91 0           });
92 0           };
93              
94 0 0         if ($size == 0) {
95 0           $chunks = 1;
96 0           $publish_chunk->('');
97             } else {
98 0           while ($offset < $size) {
99 0           my $end = $offset + $self->{chunk_size};
100 0 0         $end = $size if $end > $size;
101 0           $publish_chunk->(substr($data, $offset, $end - $offset));
102 0           $chunks++;
103 0           $offset = $end;
104             }
105             }
106             }
107              
108             sub get {
109 0     0 1   my ($self, $name, $cb) = @_;
110              
111             # Get metadata via JetStream STREAM.MSG.GET
112 0           my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name);
113             $self->{js}->_json_api(
114             'STREAM.MSG.GET.' . $self->{stream},
115             { last_by_subj => $meta_subj },
116             sub {
117 0     0     my ($resp, $err) = @_;
118 0 0         return $cb->(undef, $err) if $err;
119              
120 0 0         my $msg = $resp->{message} or return $cb->(undef, undef);
121             # Object was deleted: tombstone is on the metadata subject.
122 0 0         return $cb->(undef, undef)
123             if EV::Nats::JetStream::msg_is_tombstone($msg);
124              
125 0           require MIME::Base64;
126 0   0       my $raw = $msg->{data} || '';
127 0           my $meta_json = MIME::Base64::decode_base64($raw);
128 0           my ($meta, $derr) = EV::Nats::JetStream::decode_json_or_error($meta_json);
129 0 0         return $cb->(undef, "invalid metadata: $derr") if $derr;
130              
131 0           my $nuid = $meta->{nuid};
132 0   0       my $expected = $meta->{chunks} || 0;
133 0           my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $nuid;
134              
135 0 0         if ($expected == 0) {
136 0           return $cb->('', undef, $meta);
137             }
138              
139             # Fetch chunks sequentially via STREAM.MSG.GET.
140             # next_by_subj returns first matching message with seq >= start_sequence,
141             # so after each hit we must advance past the returned seq.
142 0           my @chunks;
143 0           my $seq = 1;
144              
145 0           my $fetch_next;
146             $fetch_next = sub {
147             $self->{js}->_json_api(
148             'STREAM.MSG.GET.' . $self->{stream},
149             { next_by_subj => $chunk_subj, seq => $seq },
150             sub {
151 0           my ($resp, $err) = @_;
152 0 0         if ($err) {
153 0           return $cb->(undef, "chunk fetch error: $err", $meta);
154             }
155             my $msg = $resp->{message} or
156 0 0         return $cb->(undef, "missing chunk message", $meta);
157 0   0       my $data = MIME::Base64::decode_base64($msg->{data} || '');
158 0           push @chunks, $data;
159 0   0       $seq = ($msg->{seq} || $seq) + 1;
160              
161 0 0         if (scalar @chunks >= $expected) {
162 0           my $assembled = join('', @chunks);
163 0 0 0       if ($meta->{digest} && $meta->{digest} =~ /^SHA-256=(.+)/) {
164 0 0         if (sha256_hex($assembled) ne $1) {
165 0           return $cb->(undef, "digest mismatch", $meta);
166             }
167             }
168 0           $cb->($assembled, undef, $meta);
169             } else {
170 0           $fetch_next->();
171             }
172             }
173 0           );
174 0           };
175 0           $fetch_next->();
176             }
177 0           );
178             }
179              
180             sub delete {
181 0     0 1   my ($self, $name, $cb) = @_;
182 0           my $nats = $self->{js}{nats};
183              
184             my $tombstone = sub {
185 0     0     my $purge_err = shift;
186 0           my $headers = "NATS/1.0\r\nKV-Operation: PURGE\r\nNats-Rollup: sub\r\n\r\n";
187 0           my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name);
188 0           $nats->hpublish($meta_subj, $headers, '');
189             # Flush so the tombstone reaches the server before $cb fires; this
190             # avoids a race with a subsequent info()/get() that would otherwise
191             # see the pre-tombstone metadata.
192             $nats->flush(sub {
193 0           my ($flush_err) = @_;
194 0 0         return unless $cb;
195 0   0       my $err = $purge_err // $flush_err;
196 0 0         $cb->($err ? undef : 1, $err);
197 0           });
198 0           };
199              
200             # Look up the object's nuid so we can purge its chunks.
201             $self->info($name, sub {
202 0     0     my ($meta, $err) = @_;
203 0 0 0       if ($err || !$meta || !$meta->{nuid}) {
      0        
204 0           return $tombstone->($err);
205             }
206 0           my $chunk_subj = '$O.' . $self->{bucket} . '.C.' . $meta->{nuid};
207             $self->{js}->_json_api(
208             'STREAM.PURGE.' . $self->{stream},
209             { filter => $chunk_subj },
210             sub {
211 0           my ($resp, $purge_err) = @_;
212 0           $tombstone->($purge_err);
213             },
214 0           );
215 0           });
216             }
217              
218             sub info {
219 0     0 1   my ($self, $name, $cb) = @_;
220 0           my $meta_subj = '$O.' . $self->{bucket} . '.M.' . _encode_name($name);
221             $self->{js}->_json_api(
222             'STREAM.MSG.GET.' . $self->{stream},
223             { last_by_subj => $meta_subj },
224             sub {
225 0     0     my ($resp, $err) = @_;
226 0 0         if ($err) {
227 0 0         return $cb->(undef, undef) if $err =~ /no message found|10037/;
228 0           return $cb->(undef, $err);
229             }
230 0 0         my $msg = $resp->{message} or return $cb->(undef, undef);
231             # Tombstone: PURGE/DEL on the metadata subject -> object is gone.
232 0 0         return $cb->(undef, undef)
233             if EV::Nats::JetStream::msg_is_tombstone($msg);
234              
235 0           require MIME::Base64;
236 0   0       my $raw = $msg->{data} || '';
237 0           my $meta_json = MIME::Base64::decode_base64($raw);
238 0           my ($meta, $derr) = EV::Nats::JetStream::decode_json_or_error($meta_json);
239 0 0         $cb->($meta, $derr ? "invalid metadata: $derr" : undef);
240             },
241 0           );
242             }
243              
244             sub list {
245 0     0 1   my ($self, $cb) = @_;
246 0           my $prefix = '$O.' . $self->{bucket} . '.M.';
247             $self->{js}->stream_info(
248             $self->{stream},
249             { subjects_filter => $prefix . '>' },
250             sub {
251 0     0     my ($info, $err) = @_;
252 0 0         return $cb->(undef, $err) if $err;
253 0   0       my $subjects = $info->{state}{subjects} || {};
254 0           my $plen = length $prefix;
255 0           my @names = map { _decode_name(substr($_, $plen)) }
256 0           grep { substr($_, 0, $plen) eq $prefix }
  0            
257             keys %$subjects;
258 0           $cb->(\@names, undef);
259             },
260 0           );
261             }
262              
263             sub status {
264 0     0 1   my ($self, $cb) = @_;
265             $self->{js}->stream_info($self->{stream}, sub {
266 0     0     my ($info, $err) = @_;
267 0 0         return $cb->(undef, $err) if $err;
268             $cb->({
269             bucket => $self->{bucket},
270             bytes => $info->{state}{bytes},
271 0 0         sealed => $info->{config}{sealed} ? 1 : 0,
272             }, undef);
273 0           });
274             }
275              
276             sub _nuid {
277 0     0     my @chars = ('A'..'Z', 'a'..'z', '0'..'9');
278 0           join '', map { $chars[rand @chars] } 1..22;
  0            
279             }
280              
281 0     0     sub _encode_name { my $n = $_[0]; $n =~ s/([^A-Za-z0-9._-])/sprintf("%%%02X", ord($1))/ge; $n }
  0            
  0            
  0            
282 0     0     sub _decode_name { my $n = $_[0]; $n =~ s/%([0-9A-Fa-f]{2})/chr(hex($1))/ge; $n }
  0            
  0            
  0            
283              
284             1;
285              
286             =head1 NAME
287              
288             EV::Nats::ObjectStore - Chunked object store on top of NATS JetStream
289              
290             =head1 SYNOPSIS
291              
292             use EV;
293             use EV::Nats;
294             use EV::Nats::JetStream;
295             use EV::Nats::ObjectStore;
296              
297             my $nats = EV::Nats->new(host => '127.0.0.1');
298             my $js = EV::Nats::JetStream->new(nats => $nats);
299             my $os = EV::Nats::ObjectStore->new(js => $js, bucket => 'files');
300              
301             $os->create_bucket({}, sub {
302             $os->put('report.pdf', $pdf_data, sub {
303             my ($info, $err) = @_;
304             print "stored: $info->{size} bytes in $info->{chunks} chunks\n";
305             $os->get('report.pdf', sub {
306             my ($data, $err, $meta) = @_;
307             print "got $meta->{size} bytes back\n";
308             });
309             });
310             });
311              
312             EV::run;
313              
314             =head1 DESCRIPTION
315              
316             An object-store bucket is a JetStream stream named CbucketE>
317             with two subject groups:
318              
319             =over
320              
321             =item * C<$O.EbucketE.C.EnuidE> - opaque chunks for one
322             object (one chunk per stream message; the nuid is generated per object).
323              
324             =item * C<$O.EbucketE.M.Eencoded-nameE> - last-write-wins
325             metadata describing an object (name, size, chunk count, SHA-256 digest).
326              
327             =back
328              
329             C chunks the input, publishes each chunk via C for
330             durability, then writes a metadata entry. C fetches the metadata,
331             walks the chunks back via C, and verifies the digest.
332              
333             =head1 METHODS
334              
335             All callbacks fire on the L loop, not synchronously.
336              
337             =head2 new(js => $js, bucket => $name, [chunk_size => $bytes])
338              
339             Default C is 128 KiB. C defaults to the timeout
340             of C<$js>.
341              
342             =head2 create_bucket(\%opts, $cb)
343              
344             Provision the underlying stream. Recognised C<\%opts>:
345              
346             =over
347              
348             =item * C - bucket-wide storage cap.
349              
350             =item * C - per-message TTL in nanoseconds.
351              
352             =item * C - cluster replication factor.
353              
354             =back
355              
356             Callback: C<($info, $err)>.
357              
358             =head2 delete_bucket($cb)
359              
360             Tear down the underlying stream. Callback: C<($info, $err)>.
361              
362             =head2 put($name, $data, $cb)
363              
364             Store C<$data> under C<$name>, automatically chunked. Each chunk is
365             published with JetStream ack; the metadata entry is written last so
366             a partial upload doesn't surface a half-stored object. Callback:
367             C<($info, $err)> where C<$info> is C<{ name, size, chunks, seq }>.
368              
369             =head2 get($name, $cb)
370              
371             Retrieve a previously-stored object. Callback: C<($data, $err, $meta)>.
372             C<$data> is C if the object does not exist or has been deleted
373             (the tombstone is recognised). On digest mismatch, C<$data> is C
374             and C<$err> is C<"digest mismatch">.
375              
376             =head2 delete($name, [$cb])
377              
378             Looks up the object's nuid via metadata, purges all chunks under
379             C<$O.EbucketE.C.EnuidE> via C, then
380             publishes a C tombstone on the metadata subject
381             followed by a C fence. Callback: C<($ok, $err)>; C<$err> is
382             set if the chunk purge or flush failed.
383              
384             =head2 info($name, $cb)
385              
386             Fetch only the metadata entry for an object, without downloading
387             chunks. Callback: C<(\%meta, $err)>; C<\%meta> is C if the
388             object does not exist or was deleted (the tombstone is recognised).
389             This is the recommended way to filter live objects out of a
390             L result.
391              
392             =head2 list($cb)
393              
394             List names of all live objects in the bucket. Callback:
395             C<(\@names, $err)>. Tombstoned entries appear in the listing -- call
396             C to filter.
397              
398             =head2 status($cb)
399              
400             Returns a snapshot hashref:
401              
402             { bucket => $name, bytes => $n, sealed => 0|1 }
403              
404             C reflects the underlying stream's C flag;
405             this client never seals on its own, so unless someone manually
406             sealed the stream out-of-band the value is always 0.
407              
408             Callback: C<(\%status, $err)>.
409              
410             =head1 SEE ALSO
411              
412             L, L, L,
413             L.
414              
415             =cut