File Coverage

blib/lib/EV/Nats/KV.pm
Criterion Covered Total %
statement 9 102 8.8
branch 0 38 0.0
condition 0 20 0.0
subroutine 3 23 13.0
pod 11 11 100.0
total 23 194 11.8


line stmt bran cond sub pod time code
1             package EV::Nats::KV;
2 1     1   1285 use strict;
  1         3  
  1         38  
3 1     1   4 use warnings;
  1         2  
  1         82  
4 1     1   7 use EV::Nats::JetStream;
  1         2  
  1         2215  
5              
6             sub new {
7 0     0 1   my ($class, %opts) = @_;
8 0   0       my $js = delete $opts{js} || die "js (JetStream) required";
9 0   0       my $bucket = delete $opts{bucket} || die "bucket name required";
10 0   0       my $timeout = delete $opts{timeout} || $js->{timeout};
11 0           bless {
12             js => $js,
13             bucket => $bucket,
14             stream => "KV_$bucket",
15             timeout => $timeout,
16             }, $class;
17             }
18              
19             sub get {
20 0     0 1   my ($self, $key, $cb) = @_;
21 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key;
22             $self->{js}->_json_api(
23             'STREAM.MSG.GET.' . $self->{stream},
24             { last_by_subj => $subj },
25             sub {
26 0     0     my ($resp, $err) = @_;
27 0 0         if ($err) {
28             # 10037 = "no message found" -- treat as missing key
29 0 0         return $cb->(undef, undef) if $err =~ /no message found|10037/;
30 0           return $cb->(undef, $err);
31             }
32 0           my $msg = $resp->{message};
33 0 0         return $cb->(undef, undef) unless $msg;
34             # DEL/PURGE tombstones surface as missing
35 0 0         return $cb->(undef, undef)
36             if EV::Nats::JetStream::msg_is_tombstone($msg);
37              
38 0           require MIME::Base64;
39 0   0       my $value = MIME::Base64::decode_base64($msg->{data} || '');
40 0           $cb->($value, undef);
41             },
42 0           );
43             }
44              
45             sub put {
46 0     0 1   my ($self, $key, $value, $cb) = @_;
47 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key;
48             $self->{js}->js_publish($subj, $value, sub {
49 0     0     my ($ack, $err) = @_;
50 0 0         return $cb->(undef, $err) if $err;
51 0           $cb->($ack->{seq}, undef);
52 0           });
53             }
54              
55             sub create {
56 0     0 1   my ($self, $key, $value, $cb) = @_;
57 0           my $nats = $self->{js}{nats};
58 0           my $headers = "NATS/1.0\r\nNats-Expected-Last-Subject-Sequence: 0\r\n\r\n";
59 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key;
60              
61 0 0         if ($cb) {
62 0           my $inbox = $nats->new_inbox;
63 0           my ($sid, $timer);
64             $sid = $nats->subscribe_max($inbox, sub {
65 0     0     my ($s, $payload) = @_;
66 0 0         if ($timer) { $timer->stop; undef $timer; }
  0            
  0            
67 0           my ($ack, $derr) = EV::Nats::JetStream::decode_json_or_error($payload);
68 0 0         return $cb->(undef, $derr) if $derr;
69 0 0         return $cb->(undef, $ack->{error}{description}) if $ack->{error};
70 0           $cb->($ack->{seq}, undef);
71 0           }, 1);
72             $timer = EV::timer($self->{timeout} / 1000.0, 0, sub {
73 0     0     $nats->unsubscribe($sid);
74 0           $cb->(undef, "create timeout");
75 0           });
76 0           $nats->hpublish($subj, $headers, $value, $inbox);
77             } else {
78 0           $nats->hpublish($subj, $headers, $value);
79             }
80             }
81              
82             sub delete {
83 0     0 1   my ($self, $key, $cb) = @_;
84             # KV delete = publish empty with KV-Operation: DEL header.
85             # flush() ensures the tombstone reaches the server before $cb fires
86             # so a subsequent get() doesn't race the publish.
87 0           my $headers = "NATS/1.0\r\nKV-Operation: DEL\r\n\r\n";
88 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key;
89 0           my $nats = $self->{js}{nats};
90 0           $nats->hpublish($subj, $headers, '');
91 0 0   0     $nats->flush(sub { $cb->(defined $_[0] ? undef : 1, $_[0]) }) if $cb;
  0 0          
92             }
93              
94             sub purge {
95 0     0 1   my ($self, $key, $cb) = @_;
96 0           my $headers = "NATS/1.0\r\nKV-Operation: PURGE\r\nNats-Rollup: sub\r\n\r\n";
97 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key;
98 0           my $nats = $self->{js}{nats};
99 0           $nats->hpublish($subj, $headers, '');
100 0 0   0     $nats->flush(sub { $cb->(defined $_[0] ? undef : 1, $_[0]) }) if $cb;
  0 0          
101             }
102              
103             sub keys {
104 0     0 1   my ($self, $cb) = @_;
105 0           my $prefix = '$KV.' . $self->{bucket} . '.';
106             $self->{js}->stream_info(
107             $self->{stream},
108             { subjects_filter => $prefix . '>' },
109             sub {
110 0     0     my ($info, $err) = @_;
111 0 0         return $cb->(undef, $err) if $err;
112 0   0       my $subjects = $info->{state}{subjects} || {};
113 0           my $plen = length $prefix;
114 0           my @keys = map { substr($_, $plen) }
115 0           grep { substr($_, 0, $plen) eq $prefix }
  0            
116             CORE::keys %$subjects;
117 0           $cb->(\@keys, undef);
118             },
119 0           );
120             }
121              
122             sub watch {
123 0     0 1   my ($self, $key_pattern, $cb) = @_;
124 0   0       $key_pattern //= '>';
125 0           my $subj = '$KV.' . $self->{bucket} . '.' . $key_pattern;
126             return $self->{js}{nats}->subscribe($subj, sub {
127 0     0     my ($subject, $payload, $reply, $headers) = @_;
128 0           my $prefix = '$KV.' . $self->{bucket} . '.';
129 0           my $key = substr($subject, length $prefix);
130 0           my $op = 'PUT';
131 0 0 0       if ($headers && $headers =~ /KV-Operation:\s*(\S+)/i) {
132 0           $op = uc $1;
133             }
134 0           $cb->($key, $payload, $op);
135 0           });
136             }
137              
138             sub create_bucket {
139 0     0 1   my ($self, $opts, $cb) = @_;
140 0   0       $opts //= {};
141 0           require JSON::PP;
142             my $config = {
143             name => $self->{stream},
144             subjects => ['$KV.' . $self->{bucket} . '.>'],
145             retention => 'limits',
146             max_msgs_per_subject => $opts->{max_history} || 1,
147             ($opts->{max_bytes} ? (max_bytes => $opts->{max_bytes}) : ()),
148             ($opts->{max_age} ? (max_age => $opts->{max_age}) : ()),
149 0 0 0       ($opts->{replicas} ? (num_replicas => $opts->{replicas}) : ()),
    0          
    0          
150             discard => 'new',
151             allow_rollup_hdrs => JSON::PP::true(),
152             deny_delete => JSON::PP::true(),
153             deny_purge => JSON::PP::false(),
154             };
155 0           $self->{js}->stream_create($config, $cb);
156             }
157              
158             sub delete_bucket {
159 0     0 1   my ($self, $cb) = @_;
160 0           $self->{js}->stream_delete($self->{stream}, $cb);
161             }
162              
163             sub status {
164 0     0 1   my ($self, $cb) = @_;
165             $self->{js}->stream_info($self->{stream}, sub {
166 0     0     my ($info, $err) = @_;
167 0 0         return $cb->(undef, $err) if $err;
168             $cb->({
169             bucket => $self->{bucket},
170             values => $info->{state}{messages},
171             bytes => $info->{state}{bytes},
172             history => $info->{config}{max_msgs_per_subject},
173 0           }, undef);
174 0           });
175             }
176              
177             1;
178              
179             =head1 NAME
180              
181             EV::Nats::KV - Key-Value store on top of NATS JetStream
182              
183             =head1 SYNOPSIS
184              
185             use EV;
186             use EV::Nats;
187             use EV::Nats::JetStream;
188             use EV::Nats::KV;
189              
190             my $nats = EV::Nats->new(host => '127.0.0.1');
191             my $js = EV::Nats::JetStream->new(nats => $nats);
192             my $kv = EV::Nats::KV->new(js => $js, bucket => 'config');
193              
194             $kv->create_bucket({}, sub {
195             $kv->put('app.setting', 'on', sub {
196             $kv->get('app.setting', sub {
197             my ($val, $err) = @_;
198             print "got: $val\n";
199             });
200             });
201             });
202              
203             # Live updates
204             $kv->watch('app.>', sub {
205             my ($key, $value, $op) = @_;
206             print "$op $key = $value\n";
207             });
208              
209             EV::run;
210              
211             =head1 DESCRIPTION
212              
213             A KV bucket is a JetStream stream named CbucketE> with
214             subjects C<$KV.EbucketE.E>, history of 1, rollup headers
215             allowed, and deletes denied (purge tombstones are used instead).
216             This module wraps that convention: C/C become single calls
217             that hide the underlying C + C dance.
218              
219             =head1 METHODS
220              
221             All callbacks fire on the L loop, not synchronously.
222              
223             =head2 new(js => $js, bucket => $name, [timeout => $ms])
224              
225             The bucket need not exist yet; call L first to
226             provision it. C defaults to the timeout of C<$js>.
227              
228             =head2 get($key, $cb)
229              
230             Fetch the latest value for C<$key>. Callback: C<($value, $err)>.
231             C<$value> is C if the key does not exist, or has been deleted
232             or purged (the tombstone is recognised and surfaces as a clean miss).
233              
234             =head2 put($key, $value, $cb)
235              
236             Set C<$key> to C<$value>. Callback: C<($seq, $err)>, where C<$seq> is
237             the JetStream sequence number assigned by the server.
238              
239             =head2 create($key, $value, $cb)
240              
241             Like C, but only succeeds if C<$key> does not yet exist. Uses
242             C; concurrent creators see
243             a wrong-last-sequence error from the server. Callback: C<($seq, $err)>.
244              
245             =head2 delete($key, [$cb])
246              
247             Mark C<$key> as deleted by publishing a C tombstone,
248             followed by a C fence so a subsequent C won't race the
249             publish. Callback: C<($ok, $err)>; C<$err> is set if the connection
250             dropped before the PONG arrived.
251              
252             =head2 purge($key, [$cb])
253              
254             Like C, but emits C too -- the prior history
255             of C<$key> is rolled up and replaced by the tombstone, freeing storage.
256             Callback: C<($ok, $err)>.
257              
258             =head2 keys($cb)
259              
260             List all keys currently stored in the bucket. Callback: C<(\@keys, $err)>.
261             Tombstoned keys are not filtered out -- check with C if needed.
262              
263             =head2 watch($pattern, $cb)
264              
265             Subscribe to live changes. C<$pattern> is a NATS subject suffix relative
266             to the bucket (e.g. C> for all keys, C> for everything
267             under C). Callback receives C<($key, $value, $op)> where C<$op>
268             is C, C, or C. Returns the underlying subscription
269             id; pass to L to stop.
270              
271             =head2 create_bucket(\%opts, $cb)
272              
273             Provision the underlying stream. Recognised C<\%opts>:
274              
275             =over
276              
277             =item * C - default 1; how many revisions to keep per key.
278              
279             =item * C - bucket-wide storage cap.
280              
281             =item * C - per-message TTL in nanoseconds.
282              
283             =item * C - cluster replication factor.
284              
285             =back
286              
287             Callback: C<($info, $err)>.
288              
289             =head2 delete_bucket($cb)
290              
291             Tear down the underlying stream. Callback: C<($info, $err)>.
292              
293             =head2 status($cb)
294              
295             Returns a snapshot hashref:
296              
297             { bucket => $name, values => $count, bytes => $n, history => $h }
298              
299             Callback: C<(\%status, $err)>.
300              
301             =head1 SEE ALSO
302              
303             L, L, L,
304             L.
305              
306             =cut