File Coverage

blib/lib/EV/Nats.pm
Criterion Covered Total %
statement 27 31 87.1
branch 6 10 60.0
condition 0 3 0.0
subroutine 6 7 85.7
pod 2 2 100.0
total 41 53 77.3


line stmt bran cond sub pod time code
1             package EV::Nats;
2 17     17   2204547 use strict;
  17         29  
  17         585  
3 17     17   66 use warnings;
  17         28  
  17         1004  
4 17     17   1263 use EV;
  17         7106  
  17         512  
5              
6             BEGIN {
7 17     17   85 use XSLoader;
  17         43  
  17         694  
8 17     17   56 our $VERSION = '0.02';
9 17         43995 XSLoader::load __PACKAGE__, $VERSION;
10             }
11              
12             *pub = \&publish;
13             *hpub = \&hpublish;
14             *sub = \&subscribe;
15             *unsub = \&unsubscribe;
16             *req = \&request;
17              
18             sub creds_file {
19 6     6 1 182427 my ($self, $path) = @_;
20 6 100       337 open my $fh, '<', $path or die "cannot open creds file $path: $!";
21 5         13 my $content = do { local $/; <$fh> };
  5         29  
  5         115  
22 5         71 close $fh;
23              
24             # NATS .creds is two PEM-style blocks. Require both BEGIN+END markers.
25 5         69 my ($jwt) = $content =~
26             /-----BEGIN\s+NATS\s+USER\s+JWT-----\r?\n\s*(\S+)\s*\r?\n-+END\s+NATS\s+USER\s+JWT-+/;
27 5         34 my ($seed) = $content =~
28             /-----BEGIN\s+USER\s+NKEY\s+SEED-----\r?\n\s*(\S+)\s*\r?\n-+END\s+USER\s+NKEY\s+SEED-+/;
29              
30 5 100       37 die "$path: missing NATS USER JWT block\n" unless defined $jwt;
31 3 100       17 die "$path: missing USER NKEY SEED block\n" unless defined $seed;
32              
33 2         16 $self->jwt($jwt);
34 2         9 $self->nkey_seed($seed);
35 2         13 $self;
36             }
37              
38             sub subscribe_max {
39 0     0 1   my ($self, $subject, $cb, $max_msgs, $queue_group) = @_;
40 0 0         my $sid = defined $queue_group
41             ? $self->subscribe($subject, $cb, $queue_group)
42             : $self->subscribe($subject, $cb);
43 0 0 0       $self->unsubscribe($sid, $max_msgs) if $max_msgs && $max_msgs > 0;
44 0           $sid;
45             }
46              
47             1;
48              
49             =head1 NAME
50              
51             EV::Nats - High-performance asynchronous NATS client using EV
52              
53             =head1 SYNOPSIS
54              
55             use EV;
56             use EV::Nats;
57              
58             my $nats = EV::Nats->new(
59             host => '127.0.0.1',
60             port => 4222,
61             reconnect => 1,
62             on_error => sub { warn "nats: $_[0]\n" },
63             on_connect => sub { warn "connected\n" },
64             );
65              
66             # Subscribe (plain or queue group)
67             my $sid = $nats->subscribe('foo.>', sub {
68             my ($subject, $payload, $reply, $headers) = @_;
69             print "[$subject] $payload\n";
70             });
71             $nats->subscribe('work.>', sub { ... }, 'workers');
72              
73             # Publish (fire-and-forget) and headered publish
74             $nats->publish('foo.bar', 'hello world');
75             $nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');
76              
77             # Request / reply
78             $nats->request('service.echo', 'ping', sub {
79             my ($response, $err) = @_;
80             die $err if $err;
81             print "reply: $response\n";
82             }, 5000); # 5s timeout
83              
84             $nats->unsubscribe($sid);
85             EV::run;
86              
87             =head1 DESCRIPTION
88              
89             EV::Nats is an async NATS client that implements the protocol directly
90             in XS on top of L. There is no external C library dependency.
91              
92             =head2 Protocol
93              
94             Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG),
95             including headered publish/receive, wildcard subjects (C<*>, C>),
96             queue groups, and request/reply with an automatic shared inbox
97             subscription.
98              
99             =head2 Connectivity
100              
101             TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto
102             reconnect with exponential backoff and jitter; subscription and
103             auto-unsub state restored on reconnect; cluster failover from INFO
104             C; lame-duck-mode (leaf node graceful shutdown) callback;
105             graceful C.
106              
107             =head2 Auth
108              
109             Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).
110              
111             =head2 TLS
112              
113             Optional, auto-detected at build time. STARTTLS-style upgrade after
114             INFO; full hostname verification (DNS or IP literal) by default;
115             opt-out C; custom CA via C.
116              
117             =head2 Performance
118              
119             Write coalescing via C (one C per loop
120             iteration); O(1) subscription lookup; per-publish allocation-free
121             fast path; explicit C mode for tight loops; per-connection
122             stats counters.
123              
124             =head2 Higher-level APIs
125              
126             L, L, L.
127              
128             B DNS resolution via C is blocking. Use numeric IP
129             addresses for latency-sensitive applications.
130              
131             =head1 METHODS
132              
133             =head2 new(%options)
134              
135             Create an EV::Nats instance. If C or C is supplied,
136             connection is initiated immediately and the C callback
137             fires once the CONNECT/PONG handshake completes.
138              
139             my $nats = EV::Nats->new(
140             host => '127.0.0.1',
141             port => 4222,
142             reconnect => 1,
143             on_error => sub { warn "nats: $_[0]\n" },
144             on_connect => sub { warn "ready\n" },
145             );
146              
147             =head3 Connection options
148              
149             =over
150              
151             =item host => Str
152              
153             Server hostname (numeric IP recommended; see L). When set,
154             connection starts immediately.
155              
156             =item port => Int (default 4222)
157              
158             Server port.
159              
160             =item path => Str
161              
162             Unix-domain socket path. Mutually exclusive with C.
163              
164             =item connect_timeout => Int (ms; 0 = none)
165              
166             How long to wait for the TCP/TLS handshake before giving up.
167              
168             =item keepalive => Int (seconds)
169              
170             If set, enables C with this idle interval.
171              
172             =item priority => Int (-2 .. +2)
173              
174             L watcher priority for the I/O watchers on this connection.
175              
176             =item loop => EV::Loop (default C)
177              
178             The L loop to attach watchers to.
179              
180             =item name => Str
181              
182             Client name advertised in CONNECT.
183              
184             =back
185              
186             =head3 Auth options
187              
188             =over
189              
190             =item user => Str / pass => Str
191              
192             Username/password authentication. JSON-escaped in CONNECT.
193              
194             =item token => Str
195              
196             Token authentication.
197              
198             =item nkey_seed => Str
199              
200             NATS NKey seed (the C form). Requires the build to have
201             OpenSSL (C).
202              
203             =item jwt => Str
204              
205             User JWT, paired with C for decentralized auth. See also
206             L.
207              
208             =item tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool
209              
210             See L for details.
211              
212             =back
213              
214             =head3 Protocol options
215              
216             =over
217              
218             =item verbose => Bool (default 0)
219              
220             Request C<+OK> acknowledgments after each command.
221              
222             =item pedantic => Bool (default 0)
223              
224             Server-side strict subject checking.
225              
226             =item echo => Bool (default 1)
227              
228             Receive messages this client itself publishes.
229              
230             =item no_responders => Bool (default 0)
231              
232             Ask the server to send a 503 status reply when a request has no
233             responders, surfaced as the C<"no responders"> error in C.
234              
235             =item ping_interval => Int (ms, default 120000; 0 = disabled)
236              
237             Client-initiated PING interval for keep-alive.
238              
239             =item max_pings_outstanding => Int (default 2)
240              
241             Maximum unacked PINGs before the connection is declared stale.
242              
243             =back
244              
245             =head3 Reconnect options
246              
247             =over
248              
249             =item reconnect => Bool (default 0)
250              
251             Enable automatic reconnection.
252              
253             =item reconnect_delay => Int (ms, default 2000)
254              
255             Initial delay between reconnect attempts; subsequent attempts use
256             exponential backoff with jitter, capped by C.
257              
258             =item max_reconnect_delay => Int (ms, default 30000)
259              
260             Upper bound on the backoff delay.
261              
262             =item max_reconnect_attempts => Int (default 60; 0 = unlimited)
263              
264             Give up after this many consecutive failures.
265              
266             =back
267              
268             =head3 Callback options
269              
270             All callbacks fire on the L loop, never inline.
271              
272             =over
273              
274             =item on_connect => sub { }
275              
276             Called after the CONNECT/PONG handshake completes.
277              
278             =item on_disconnect => sub { }
279              
280             Called when the connection drops, before any auto-reconnect attempt.
281              
282             =item on_error => sub { my ($err) = @_ }
283              
284             Receives a string. If unset, errors C.
285              
286             =item on_lame_duck => sub { }
287              
288             Called once when the server signals lame-duck-mode shutdown via
289             INFO C.
290              
291             =item on_slow_consumer => sub { my ($pending_bytes) = @_ }
292              
293             See L.
294              
295             =back
296              
297             =head2 connect($host, [$port])
298              
299             Initiate a TCP connection. Port defaults to 4222. Croaks if already
300             connected or in the middle of connecting; otherwise returns
301             immediately and signals completion via C.
302              
303             =head2 connect_unix($path)
304              
305             Initiate a Unix-domain-socket connection. Same async semantics as
306             L.
307              
308             =head2 disconnect
309              
310             Cancel any pending reconnect, drop queued writes, close the socket,
311             and fire C. C is set so no
312             auto-reconnect is scheduled. For a clean shutdown that flushes
313             pending writes first, see L.
314              
315             =head2 is_connected
316              
317             True if the CONNECT/PONG handshake has completed and no disconnect
318             or reconnect is in progress.
319              
320             =head2 publish($subject, [$payload], [$reply_to])
321              
322             Publish a message. Alias: C.
323              
324             $nats->publish('foo', 'hello');
325             $nats->publish('foo', 'hello', 'reply.subject');
326              
327             =head2 hpublish($subject, $headers, [$payload], [$reply_to])
328              
329             Publish with headers. Alias: C.
330              
331             $nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');
332              
333             =head2 subscribe($subject, $cb, [$queue_group])
334              
335             Subscribe to a subject. Returns subscription ID. Alias: C.
336              
337             my $sid = $nats->subscribe('foo.*', sub {
338             my ($subject, $payload, $reply, $headers) = @_;
339             });
340              
341             Queue groups are preserved across reconnects.
342              
343             Callback receives:
344              
345             =over
346              
347             =item C<$subject> - actual subject the message was published to
348              
349             =item C<$payload> - message body
350              
351             =item C<$reply> - reply-to subject (undef if none)
352              
353             =item C<$headers> - raw headers string (only for HMSG)
354              
355             =back
356              
357             =head2 subscribe_max($subject, $cb, $max_msgs, [$queue_group])
358              
359             Convenience: L followed by an auto-unsubscribe after
360             C<$max_msgs> messages have been delivered.
361              
362             =head2 unsubscribe($sid, [$max_msgs])
363              
364             Unsubscribe. With C<$max_msgs>, the server is told to deliver that
365             many more messages and then drop the subscription. The auto-unsub
366             state is restored on reconnect (so the partial count survives a
367             disconnect). Alias: C.
368              
369             =head2 request($subject, $payload, $cb, [$timeout_ms])
370              
371             Request/reply. Uses automatic inbox subscription. Alias: C.
372              
373             $nats->request('service', 'data', sub {
374             my ($response, $err) = @_;
375             die $err if $err;
376             print "got: $response\n";
377             }, 5000);
378              
379             Callback receives C<($response, $error)>. For replies that include
380             NATS message headers (HMSG), a third argument C<$headers> with the
381             raw header block is also passed. Error is set on timeout
382             ("request timeout") or no responders ("no responders").
383              
384             =head2 drain([$cb])
385              
386             Graceful shutdown: sends UNSUB for all subscriptions, flushes pending
387             writes with a PING fence, fires C<$cb> when the server confirms with
388             PONG, then disconnects. No new messages will be received after drain
389             is initiated.
390              
391             C<$cb> receives a single argument: C on clean drain, or an error
392             string (e.g. C<"disconnected">) if the connection dropped before the
393             PONG arrived.
394              
395             $nats->drain(sub {
396             my ($err) = @_;
397             die "drain failed: $err" if $err;
398             print "drained, safe to exit\n";
399             });
400              
401             =head2 ping
402              
403             Send PING to server.
404              
405             =head2 flush([$cb])
406              
407             Send PING as a write fence; the subsequent PONG guarantees all prior
408             messages were processed by the server. If C<$cb> is given, it is invoked
409             when the PONG arrives. The callback receives a single argument: C
410             on success, or an error string (e.g. C<"disconnected">) if the connection
411             dropped before the PONG arrived.
412              
413             =head2 creds_file($path)
414              
415             Read a NATS C<.creds> file and apply the embedded JWT and NKey seed
416             via L and L. Apply this BEFORE C so the
417             credentials are available during the CONNECT handshake. Dies if the
418             file is unreadable or missing either the C or
419             C block.
420              
421             =head2 new_inbox
422              
423             Returns a fresh subject suitable for use as a private reply target
424             (C<_INBOX.ErandE.EnE>). Each call burns a slot from
425             the same counter that L uses, so manual subscribers must
426             treat the returned subject as opaque.
427              
428             =head2 subscription_count
429              
430             Returns the number of currently-registered subscriptions, including
431             the implicit C<_INBOX.E> subscription used by L.
432              
433             =head2 server_info
434              
435             Returns the raw JSON string of the most recent INFO frame received
436             from the server (or C before the first INFO). Useful for
437             inspecting C, C, C, C,
438             etc.
439              
440             =head2 max_payload([$limit])
441              
442             Server-advertised maximum payload size in bytes. Returns the current
443             value; with an argument, overrides it (publishes above this croak
444             locally before reaching the wire).
445              
446             =head2 waiting_count
447              
448             Number of writes queued locally during connect or reconnect (i.e.
449             C/C calls made while the connection is not yet
450             ready). They flush when the handshake completes.
451              
452             =head2 skip_waiting
453              
454             Drop all queued writes without sending them. Useful before
455             C if reconnect is enabled and you don't want stale
456             publishes replayed.
457              
458             =head2 reconnect($enable, [$delay_ms], [$max_attempts])
459              
460             Configure reconnection. C<$delay_ms> and C<$max_attempts> are only
461             written when supplied; omitted args leave the existing value unchanged.
462              
463             =head2 reconnect_enabled
464              
465             Returns true if reconnect is enabled.
466              
467             =head2 connect_timeout([$ms])
468              
469             Get/set connect timeout.
470              
471             =head2 ping_interval([$ms])
472              
473             Get/set PING interval.
474              
475             =head2 max_pings_outstanding([$num])
476              
477             Get/set max outstanding PINGs.
478              
479             =head2 priority([$num])
480              
481             Get/set EV watcher priority.
482              
483             =head2 keepalive([$seconds])
484              
485             Get/set TCP keepalive.
486              
487             =head2 batch($coderef)
488              
489             Batch multiple publishes into a single write. Suppresses per-publish
490             write scheduling; all buffered data is flushed after the coderef returns.
491              
492             $nats->batch(sub {
493             $nats->publish("foo.$_", "msg-$_") for 1..1000;
494             });
495              
496             =head2 slow_consumer($bytes_threshold, [$cb])
497              
498             Enable slow consumer detection. When the write buffer exceeds
499             C<$bytes_threshold> bytes, C<$cb> is called with the current buffer size.
500              
501             $nats->slow_consumer(1024*1024, sub {
502             my ($pending_bytes) = @_;
503             warn "slow consumer: ${pending_bytes}B pending\n";
504             });
505              
506             =head2 on_lame_duck([$cb])
507              
508             Get/set the lame-duck callback. Fires once when the server signals
509             shutdown (leaf node, rolling restart) via INFO C. Use this
510             to migrate work to another server before the grace period elapses.
511              
512             =head2 nkey_seed($seed)
513              
514             Set the NKey seed (the C base32-encoded form) for Ed25519
515             authentication. Requires the build to have OpenSSL (see
516             L). The server nonce from INFO is automatically
517             signed during CONNECT. May also be passed to L as
518             C ...>.
519              
520             =head2 jwt($token)
521              
522             Set the user JWT. Combine with L for NATS decentralized
523             auth. May also be passed to L. See L for the
524             common case of loading both from a C<.creds> file.
525              
526             =head2 EV::Nats->nkey_generate_user_seed
527              
528             Class method. Returns a fresh, valid NATS User NKey seed (the
529             C form). Useful for tests and provisioning scripts that
530             don't have the C CLI available. Requires C; croaks
531             otherwise.
532              
533             =head2 EV::Nats->nkey_public_from_seed($seed)
534              
535             Class method. Derives the matching public key (the C form)
536             from a User NKey seed. Croaks on an invalid seed. Pair with
537             L to provision the server with the public
538             key while the client keeps the seed.
539              
540             =head2 tls($enable, [$ca_file], [$skip_verify])
541              
542             Configure TLS. Requires OpenSSL at build time (see
543             L).
544              
545             $nats->tls(1); # system CA
546             $nats->tls(1, '/path/to/ca.pem'); # custom CA
547             $nats->tls(1, undef, 1); # skip verification
548              
549             When verification is enabled (the default), the server certificate's
550             SAN must match either the resolved IP literal or the DNS hostname
551             passed to L. May also be passed to L as C 1,
552             tls_ca_file =E $path>.
553              
554             =head2 stats
555              
556             Returns a hash of connection counters:
557              
558             my %s = $nats->stats;
559             # ( msgs_in, msgs_out, bytes_in, bytes_out )
560              
561             =head2 reset_stats
562              
563             Zero all counters returned by L.
564              
565             =head2 on_error([$cb])
566              
567             =head2 on_connect([$cb])
568              
569             =head2 on_disconnect([$cb])
570              
571             Get/set the corresponding callback at runtime. With no argument,
572             returns the current value (or C). With an argument, replaces
573             it; pass C to clear.
574              
575             =head1 BUILD-TIME FEATURES
576              
577             =over
578              
579             =item EV::Nats::HAS_TLS
580              
581             True if compiled with OpenSSL (TLS supported).
582              
583             =item EV::Nats::HAS_NKEY
584              
585             True if NKey/JWT signing is available (also requires OpenSSL).
586              
587             =back
588              
589             =head1 BENCHMARKS
590              
591             Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12,
592             100-byte payloads (C):
593              
594             100K msgs 200K msgs
595             PUB fire-and-forget 4.7M 5.0M msgs/sec
596             PUB + SUB (loopback) 1.8M 1.6M msgs/sec
597             PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
598             REQ/REP (pipelined, 64) 334K msgs/sec
599              
600             Connected-path publish appends directly to the write buffer with no
601             per-message allocation. Write coalescing via C batches
602             all publishes per event-loop iteration into a single C syscall.
603              
604             Run C for full results. Set C,
605             C, C, C to customize.
606              
607             =head1 NATS PROTOCOL
608              
609             This module implements the NATS client protocol directly in XS.
610             The protocol is text-based with CRLF-delimited control lines and
611             binary payloads.
612              
613             Connection flow: server sends INFO, client sends CONNECT + PING,
614             server responds with PONG to confirm. All subscriptions (including
615             queue groups and auto-unsub state) are automatically restored on
616             reconnect.
617              
618             Request/reply uses a single wildcard inbox subscription
619             (C<_INBOX.ErandomE.*>) for all requests, with unique
620             suffixes per request.
621              
622             =head1 CAVEATS
623              
624             =over
625              
626             =item * DNS resolution via C is blocking. Use numeric IP
627             addresses for latency-sensitive applications.
628              
629              
630             =item * TLS requires OpenSSL headers at build time (auto-detected).
631              
632             =item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
633              
634             =item * The module handles all data as bytes. Encode UTF-8 strings before
635             passing them.
636              
637             =item * Do not let the C instance go out of scope (or be
638             explicitly C-ed) from inside a callback while that callback is
639             still executing. The callback closure normally references C<$nats>
640             (via C<< $nats->publish(...) >> etc.), which keeps it alive; if you
641             write a callback that does not capture C<$nats> and you C the
642             last outer reference inside that callback, Perl will run C
643             mid-callback and free the underlying state. Any subsequent operation
644             on C<$nats> in that callback is undefined behavior.
645              
646             =item * Cluster URL discovery (the C field of INFO) is
647             trusted by default. On failover the client connects to whatever
648             hostnames the previous server advertised, and TLS hostname verification
649             is performed against those names. Use a private CA (C) to
650             restrict which certificates are acceptable, or do not enable C on
651             public-CA topologies where any holder of a valid cert could redirect
652             clients.
653              
654             =back
655              
656             =head1 ENVIRONMENT
657              
658             =over
659              
660             =item TEST_NATS_HOST, TEST_NATS_PORT
661              
662             Set these to run the test suite against a NATS server
663             (default: 127.0.0.1:4222).
664              
665             =back
666              
667             =head1 SEE ALSO
668              
669             L, L, L,
670             L,
671             L,
672             L.
673              
674             =head1 AUTHOR
675              
676             vividsnow
677              
678             =head1 LICENSE
679              
680             This library is free software; you can redistribute it and/or modify it
681             under the same terms as Perl itself.
682              
683             =cut