File Coverage

blib/lib/EV/Nats/JetStream.pm
Criterion Covered Total %
statement 12 113 10.6
branch 0 40 0.0
condition 0 33 0.0
subroutine 4 27 14.8
pod 16 16 100.0
total 32 229 13.9


line stmt bran cond sub pod time code
1             package EV::Nats::JetStream;
2 1     1   1543 use strict;
  1         6  
  1         41  
3 1     1   6 use warnings;
  1         2  
  1         58  
4 1     1   6 use EV;
  1         2  
  1         19  
5 1     1   5 use JSON::PP ();
  1         2  
  1         2308  
6              
7             sub new {
8 0     0 1   my ($class, %opts) = @_;
9 0   0       my $nats = delete $opts{nats} || die "nats connection required";
10 0   0       my $prefix = delete $opts{prefix} || '$JS.API';
11 0   0       my $timeout = delete $opts{timeout} || 5000;
12 0           bless {
13             nats => $nats,
14             prefix => $prefix,
15             timeout => $timeout,
16             }, $class;
17             }
18              
19             sub _api {
20 0     0     my ($self, $subj, $payload, $cb) = @_;
21 0   0       $payload //= '';
22             $self->{nats}->request(
23             "$self->{prefix}.$subj",
24             $payload, $cb, $self->{timeout},
25 0           );
26             }
27              
28             ## Public to sibling modules (KV, ObjectStore, ...) -- not part of the user
29             ## API. Decode a JSON string; returns ($decoded, $error_or_undef) where
30             ## $error already includes a "JSON decode error: ..." prefix when set.
31             ## Gates on $@ so falsy-but-valid JSON (null/0/false/empty-string) doesn't
32             ## get misreported as a decode failure.
33             sub decode_json_or_error {
34 0     0 1   my ($json) = @_;
35 0           local $@;
36 0           my $r = eval { JSON::PP::decode_json($json) };
  0            
37 0 0         return ($r, undef) unless $@;
38 0           return (undef, "JSON decode error: $@");
39             }
40              
41             ## Public to sibling modules. True if a STREAM.MSG.GET response message
42             ## carries a KV-Operation: DEL or PURGE tombstone header. $msg is the
43             ## decoded {message} hash; its {hdrs} field is base64-encoded by the server.
44             sub msg_is_tombstone {
45 0     0 1   my ($msg) = @_;
46 0 0 0       return 0 unless $msg && $msg->{hdrs};
47 0           require MIME::Base64;
48 0           my $hdrs = MIME::Base64::decode_base64($msg->{hdrs});
49 0 0         return $hdrs =~ /KV-Operation:\s*(?:DEL|PURGE)/i ? 1 : 0;
50             }
51              
52             sub _json_api {
53 0     0     my ($self, $subj, $data, $cb) = @_;
54 0 0         my $payload = defined $data ? JSON::PP::encode_json($data) : '';
55             $self->_api($subj, $payload, sub {
56 0     0     my ($resp, $err) = @_;
57 0 0         return $cb->(undef, $err) if $err;
58 0           my ($decoded, $derr) = decode_json_or_error($resp);
59 0 0         return $cb->(undef, $derr) if $derr;
60 0 0         if ($decoded->{error}) {
61 0           return $cb->(undef, "$decoded->{error}{description} (code $decoded->{error}{code})");
62             }
63 0           $cb->($decoded, undef);
64 0           });
65             }
66              
67             # Stream management
68              
69             sub stream_create {
70 0     0 1   my ($self, $config, $cb) = @_;
71 0   0       my $name = $config->{name} || die "stream name required";
72 0           $self->_json_api("STREAM.CREATE.$name", $config, $cb);
73             }
74              
75             sub stream_update {
76 0     0 1   my ($self, $config, $cb) = @_;
77 0   0       my $name = $config->{name} || die "stream name required";
78 0           $self->_json_api("STREAM.UPDATE.$name", $config, $cb);
79             }
80              
81             sub stream_delete {
82 0     0 1   my ($self, $name, $cb) = @_;
83 0           $self->_json_api("STREAM.DELETE.$name", undef, $cb);
84             }
85              
86             sub stream_info {
87 0     0 1   my ($self, $name, @rest) = @_;
88 0           my $cb = pop @rest;
89 0           my $opts = $rest[0];
90 0           $self->_json_api("STREAM.INFO.$name", $opts, $cb);
91             }
92              
93             sub stream_list {
94 0     0 1   my ($self, $cb) = @_;
95 0           $self->_json_api("STREAM.LIST", undef, $cb);
96             }
97              
98             sub stream_purge {
99 0     0 1   my ($self, $name, $cb) = @_;
100 0           $self->_json_api("STREAM.PURGE.$name", undef, $cb);
101             }
102              
103             # Fetch a single message from a stream by sequence, last-by-subject, or
104             # next-by-subject. \%opts is passed verbatim as the request body
105             # (server accepts: seq, last_by_subj, next_by_subj).
106             sub stream_msg_get {
107 0     0 1   my ($self, $stream, $opts, $cb) = @_;
108 0           $self->_json_api("STREAM.MSG.GET.$stream", $opts, $cb);
109             }
110              
111             # Consumer management
112              
113             sub consumer_create {
114 0     0 1   my ($self, $stream, $config, $cb) = @_;
115 0   0       my $name = $config->{durable_name} || $config->{name};
116 0 0         my $subj = $name
117             ? "CONSUMER.CREATE.$stream.$name"
118             : "CONSUMER.CREATE.$stream";
119 0           $self->_json_api($subj, { stream_name => $stream, config => $config }, $cb);
120             }
121              
122             sub consumer_delete {
123 0     0 1   my ($self, $stream, $consumer, $cb) = @_;
124 0           $self->_json_api("CONSUMER.DELETE.$stream.$consumer", undef, $cb);
125             }
126              
127             sub consumer_info {
128 0     0 1   my ($self, $stream, $consumer, $cb) = @_;
129 0           $self->_json_api("CONSUMER.INFO.$stream.$consumer", undef, $cb);
130             }
131              
132             sub consumer_list {
133 0     0 1   my ($self, $stream, $cb) = @_;
134 0           $self->_json_api("CONSUMER.LIST.$stream", undef, $cb);
135             }
136              
137             # Publishing with ack
138              
139             sub js_publish {
140 0     0 1   my ($self, $subject, $payload, $cb) = @_;
141             $self->{nats}->request($subject, $payload, sub {
142 0     0     my ($resp, $err) = @_;
143 0 0         return $cb->(undef, $err) if $err;
144 0           my ($ack, $derr) = decode_json_or_error($resp);
145 0 0         return $cb->(undef, $derr) if $derr;
146 0 0         return $cb->(undef, "$ack->{error}{description}") if $ack->{error};
147 0           $cb->($ack, undef);
148 0           }, $self->{timeout});
149             }
150              
151             # Fetch messages (pull consumer).
152             #
153             # A pull-consumer NEXT request returns up to N messages on a reply inbox,
154             # not a single JSON API response. We subscribe to a fresh inbox, publish
155             # the fetch request, and collect messages until $batch is reached, the
156             # server emits a status message (404/408/503), or the local safety timer
157             # fires.
158              
159             sub fetch {
160 0     0 1   my ($self, $stream, $consumer, $opts, $cb) = @_;
161 0   0       $opts //= {};
162 0   0       my $batch = $opts->{batch} || 1;
163 0   0       my $expires_ns = $opts->{expires} || 5_000_000_000; # 5s default
164 0 0         my $no_wait = $opts->{no_wait} ? 1 : 0;
165 0           my $expires_sec = $expires_ns / 1_000_000_000;
166              
167 0           my $nats = $self->{nats};
168 0           my $inbox = $nats->new_inbox;
169              
170 0           my @messages;
171 0           my ($sid, $timer);
172 0           my $finished = 0;
173              
174             my $finish = sub {
175 0     0     my $err = shift;
176 0 0         return if $finished;
177 0           $finished = 1;
178 0 0         if ($timer) { $timer->stop; undef $timer; }
  0            
  0            
179 0 0         $nats->unsubscribe($sid) if defined $sid;
180 0 0         $err ? $cb->(undef, $err) : $cb->(\@messages, undef);
181 0           };
182              
183             $sid = $nats->subscribe($inbox, sub {
184 0     0     my ($subject, $payload, $reply, $headers) = @_;
185 0 0 0       if ($headers && $headers =~ m{^NATS/1\.0\s+(\d+)}) {
186 0           my $code = $1;
187             # 404 = no messages, 408 = expires elapsed, 503 = no responders
188 0 0 0       if ($code == 404 || $code == 408 || $code == 503) {
      0        
189 0           return $finish->();
190             }
191             }
192 0           push @messages, {
193             subject => $subject,
194             payload => $payload,
195             reply => $reply,
196             headers => $headers,
197             };
198 0 0         $finish->() if @messages >= $batch;
199 0           });
200              
201 0           my %req = (batch => $batch, expires => $expires_ns);
202 0 0         $req{no_wait} = JSON::PP::true() if $no_wait;
203 0           my $req_body = JSON::PP::encode_json(\%req);
204 0           my $subj = "$self->{prefix}.CONSUMER.MSG.NEXT.$stream.$consumer";
205 0           $nats->publish($subj, $req_body, $inbox);
206              
207             $timer = EV::timer($expires_sec + 1, 0, sub {
208 0     0     $finish->();
209 0           });
210 0           return;
211             }
212              
213             1;
214              
215             =head1 NAME
216              
217             EV::Nats::JetStream - JetStream API client for L
218              
219             =head1 SYNOPSIS
220              
221             use EV;
222             use EV::Nats;
223             use EV::Nats::JetStream;
224              
225             my $nats = EV::Nats->new(host => '127.0.0.1');
226             my $js = EV::Nats::JetStream->new(nats => $nats);
227              
228             $js->stream_create({ name => 'ORDERS', subjects => ['orders.>'] },
229             sub {
230             my ($info, $err) = @_;
231             die $err if $err;
232             $js->js_publish('orders.new', '{"item":"widget"}', sub {
233             my ($ack, $err) = @_;
234             print "stored at seq=$ack->{seq}\n";
235             });
236             });
237              
238             EV::run;
239              
240             =head1 DESCRIPTION
241              
242             Thin async wrapper over the JetStream C<$JS.API.*> request/reply
243             endpoints. Each method is a single request whose callback is invoked
244             with the decoded JSON response (or an error string). The C<$nats>
245             connection passed to L handles all the actual I/O.
246              
247             L and L build on top of this
248             module -- see those for higher-level KV / blob APIs.
249              
250             =head1 METHODS
251              
252             All methods are async. Callbacks fire on the L loop.
253              
254             =head2 new(%opts)
255              
256             my $js = EV::Nats::JetStream->new(
257             nats => $nats,
258             prefix => '$JS.API', # default API subject prefix
259             timeout => 5000, # ms; default 5000
260             );
261              
262             =head2 Stream management
263              
264             =head3 stream_create($config, $cb)
265              
266             Create a stream. C<$config> is passed verbatim as the
267             C request body. Callback: C<($info, $err)>.
268              
269             =head3 stream_update($config, $cb)
270              
271             Update an existing stream. Same shape as C.
272              
273             =head3 stream_delete($name, $cb)
274              
275             Delete the stream by name.
276              
277             =head3 stream_info($name, [\%opts], $cb)
278              
279             Fetch stream config + state. Optional C<\%opts> may include
280             C (e.g. C>) to populate C;
281             without it the server omits that field for performance.
282              
283             =head3 stream_list($cb)
284              
285             List all streams' info.
286              
287             =head3 stream_purge($name, $cb)
288              
289             Purge all messages from the stream.
290              
291             =head3 stream_msg_get($stream, \%opts, $cb)
292              
293             Fetch a single message from C<$stream>. C<\%opts> selects the message:
294              
295             { seq => $n } # by sequence number
296             { last_by_subj => $subject } # latest matching subject
297             { next_by_subj => $subject, seq => $start } # next at-or-after $start
298              
299             The message body and headers in the response are base64-encoded
300             under C<< $resp->{message}{data} >> and C<< $resp->{message}{hdrs} >>.
301              
302             =head2 Consumer management
303              
304             =head3 consumer_create($stream, $config, $cb)
305              
306             Create a consumer (push or pull). C<$config> is the consumer config
307             hashref; C makes it durable, C controls
308             ack semantics.
309              
310             =head3 consumer_delete($stream, $consumer, $cb)
311              
312             =head3 consumer_info($stream, $consumer, $cb)
313              
314             =head3 consumer_list($stream, $cb)
315              
316             =head2 Publishing and fetching
317              
318             =head3 js_publish($subject, $payload, $cb)
319              
320             Publish with JetStream acknowledgment. Callback: C<($ack, $err)>
321             where C<$ack> is C<{ stream, seq, duplicate }>.
322              
323             =head3 fetch($stream, $consumer, \%opts, $cb)
324              
325             Pull messages from a pull-mode consumer. Options:
326              
327             =over
328              
329             =item C
330              
331             Maximum number of messages to fetch (default 1).
332              
333             =item C
334              
335             Server-side wait time in nanoseconds (default 5_000_000_000 = 5s).
336              
337             =item C
338              
339             If true, return immediately if no messages are currently available.
340              
341             =back
342              
343             Callback: C<(\@messages, $err)>. Each message is a hashref:
344              
345             {
346             subject => 'orders.new',
347             payload => '...',
348             reply => '$JS.ACK....', # for explicit ack/nak/wpi
349             headers => "...", # raw NATS/1.0 header block, or undef
350             }
351              
352             To acknowledge a message:
353              
354             $nats->publish($msg->{reply}, '+ACK'); # success
355             $nats->publish($msg->{reply}, '-NAK'); # negative --redeliver after ack_wait
356             $nats->publish($msg->{reply}, '+WPI'); # work-in-progress --extend ack_wait
357              
358             =head1 INTERNAL
359              
360             These are exposed for sibling modules (L,
361             L) -- not part of the end-user API and subject
362             to change.
363              
364             =head2 decode_json_or_error($json)
365              
366             Decode C<$json>. Returns C<($decoded, $error_or_undef)>; the error
367             string already includes a C<"JSON decode error: "> prefix when set.
368             Gates on C<$@> so falsy-but-valid JSON (C, C<0>, C,
369             empty string) is reported as a clean decode rather than a failure.
370              
371             =head2 msg_is_tombstone($msg)
372              
373             True if a C response message carries a
374             C or C header. C<$msg> is the
375             decoded C hash from the response. Used by L
376             and L to surface deleted/purged entries as
377             clean misses rather than as malformed payloads.
378              
379             =head1 SEE ALSO
380              
381             L, L, L,
382             L.
383              
384             =cut