File Coverage

blib/lib/EV/Nats.pm
Criterion Covered Total %
statement 14 29 48.2
branch 0 10 0.0
condition n/a
subroutine 5 7 71.4
pod 1 2 50.0
total 20 48 41.6


line stmt bran cond sub pod time code
1             package EV::Nats;
2 14     14   1687986 use strict;
  14         19  
  14         418  
3 14     14   63 use warnings;
  14         19  
  14         1713  
4 14     14   64 use EV;
  14         35  
  14         389  
5              
6             BEGIN {
7 14     14   56 use XSLoader;
  14         24  
  14         568  
8 14     14   34 our $VERSION = '0.01';
9 14         34895 XSLoader::load __PACKAGE__, $VERSION;
10             }
11              
12             *pub = \&publish;
13             *hpub = \&hpublish;
14             *sub = \&subscribe;
15             *unsub = \&unsubscribe;
16             *req = \&request;
17              
18             sub creds_file {
19 0     0 0   my ($self, $path) = @_;
20 0 0         open my $fh, '<', $path or die "cannot open creds file $path: $!";
21 0           my $content = do { local $/; <$fh> };
  0            
  0            
22 0           close $fh;
23              
24             # NATS creds format: --- BEGIN USER JWT --- / jwt / --- END / --- BEGIN NKEY SEED --- / seed / --- END
25 0 0         if ($content =~ /-----BEGIN NATS USER JWT-----\s*\n\s*(\S+)\s*\n/) {
26 0           $self->jwt($1);
27             }
28 0 0         if ($content =~ /-----BEGIN USER NKEY SEED-----\s*\n\s*(\S+)\s*\n/) {
29 0           $self->nkey_seed($1);
30             }
31 0           $self;
32             }
33              
34             sub subscribe_max {
35 0     0 1   my ($self, $subject, $cb, $max_msgs, $queue_group) = @_;
36 0 0         my $sid = defined $queue_group
37             ? $self->subscribe($subject, $cb, $queue_group)
38             : $self->subscribe($subject, $cb);
39 0 0         $self->unsubscribe($sid, $max_msgs) if $max_msgs;
40 0           $sid;
41             }
42              
43             1;
44              
45             =head1 NAME
46              
47             EV::Nats - High-performance asynchronous NATS client using EV
48              
49             =head1 SYNOPSIS
50              
51             use EV::Nats;
52              
53             my $nats = EV::Nats->new(
54             host => '127.0.0.1',
55             port => 4222,
56             on_error => sub { warn "nats error: @_" },
57             on_connect => sub { warn "connected to NATS" },
58             );
59              
60             # Subscribe
61             my $sid = $nats->subscribe('foo.>', sub {
62             my ($subject, $payload, $reply) = @_;
63             print "[$subject] $payload\n";
64             });
65              
66             # Subscribe with queue group
67             $nats->subscribe('worker.>', sub {
68             my ($subject, $payload, $reply) = @_;
69             }, 'workers');
70              
71             # Publish
72             $nats->publish('foo.bar', 'hello world');
73              
74             # Request/reply
75             $nats->request('service.echo', 'ping', sub {
76             my ($response, $err) = @_;
77             die $err if $err;
78             print "reply: $response\n";
79             }, 5000); # 5s timeout
80              
81             # Unsubscribe
82             $nats->unsubscribe($sid);
83              
84             EV::run;
85              
86             =head1 DESCRIPTION
87              
88             EV::Nats is a high-performance asynchronous NATS client that implements
89             the NATS client protocol in pure XS with L event loop integration.
90             No external C NATS library is required.
91              
92             Features:
93              
94             =over
95              
96             =item * Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)
97              
98             =item * Request/reply with automatic inbox management
99              
100             =item * Queue group subscriptions for load balancing
101              
102             =item * Wildcard subjects (C<*> and C>)
103              
104             =item * Headers support (HPUB/HMSG)
105              
106             =item * Automatic PING/PONG keep-alive
107              
108             =item * Automatic reconnection with subscription and queue group restore
109              
110             =item * Fire-and-forget publish (no callback overhead)
111              
112             =item * Token, user/pass authentication
113              
114             =item * TCP keepalive and connect timeout
115              
116             =item * Write coalescing via ev_prepare (batches writes per event loop iteration)
117              
118             =item * O(1) subscription lookup via hash table
119              
120             =item * Graceful drain (unsubscribe all, flush, then disconnect)
121              
122             =item * Server pool with cluster URL failover from INFO connect_urls
123              
124             =item * Optional TLS via OpenSSL (auto-detected at build time)
125              
126             =item * Reconnect jitter to prevent thundering herd
127              
128             =item * Per-connection stats counters (msgs/bytes in/out)
129              
130             =item * JetStream API (L)
131              
132             =item * Key-Value store (L)
133              
134             =item * Object store with chunking (L)
135              
136             =item * NKey/JWT authentication (Ed25519 via OpenSSL)
137              
138             =item * Slow consumer detection with configurable threshold
139              
140             =item * Publish batching API (C)
141              
142             =item * Lame duck mode (leaf node graceful shutdown) notification
143              
144             =back
145              
146             B DNS resolution via C is blocking. Use numeric IP
147             addresses for latency-sensitive applications.
148              
149             =head1 METHODS
150              
151             =head2 new(%options)
152              
153             Create a new EV::Nats instance. Connects automatically if C is given.
154              
155             my $nats = EV::Nats->new(
156             host => '127.0.0.1',
157             port => 4222,
158             on_error => sub { die @_ },
159             );
160              
161             Options:
162              
163             =over
164              
165             =item host => 'Str'
166              
167             =item port => 'Int' (default 4222)
168              
169             Server hostname and port. If C is provided, connection starts
170             immediately.
171              
172             =item on_error => $cb->($errstr)
173              
174             Error callback. Default: C.
175              
176             =item on_connect => $cb->()
177              
178             Called when connection is fully established (after CONNECT/PONG handshake).
179              
180             =item on_disconnect => $cb->()
181              
182             Called on disconnect.
183              
184             =item user => 'Str'
185              
186             =item pass => 'Str'
187              
188             Username/password authentication. Values are JSON-escaped in the
189             CONNECT command.
190              
191             =item token => 'Str'
192              
193             Token authentication.
194              
195             =item name => 'Str'
196              
197             Client name sent in CONNECT.
198              
199             =item verbose => $bool (default 0)
200              
201             Request +OK acknowledgments from server.
202              
203             =item pedantic => $bool (default 0)
204              
205             Enable strict subject checking.
206              
207             =item echo => $bool (default 1)
208              
209             Receive messages published by this client.
210              
211             =item no_responders => $bool (default 0)
212              
213             Enable no-responders notification for requests.
214              
215             =item reconnect => $bool (default 0)
216              
217             Enable automatic reconnection.
218              
219             =item reconnect_delay => $ms (default 2000)
220              
221             Delay between reconnect attempts.
222              
223             =item max_reconnect_attempts => $num (default 60)
224              
225             Maximum reconnect attempts. 0 = unlimited.
226              
227             =item connect_timeout => $ms
228              
229             Connection timeout. 0 = no timeout.
230              
231             =item ping_interval => $ms (default 120000)
232              
233             Interval for client-initiated PING. 0 = disabled.
234              
235             =item max_pings_outstanding => $num (default 2)
236              
237             Max unanswered PINGs before declaring stale connection.
238              
239             =item priority => $num (-2 to +2)
240              
241             EV watcher priority.
242              
243             =item keepalive => $seconds
244              
245             TCP keepalive interval.
246              
247             =item path => 'Str'
248              
249             Unix socket path. Mutually exclusive with C.
250              
251             =item loop => EV::Loop
252              
253             EV loop to use. Default: C.
254              
255             =back
256              
257             =head2 connect($host, [$port])
258              
259             Connect to NATS server. Port defaults to 4222.
260              
261             =head2 connect_unix($path)
262              
263             Connect via Unix domain socket.
264              
265             =head2 disconnect
266              
267             Graceful disconnect.
268              
269             =head2 is_connected
270              
271             Returns true if connected.
272              
273             =head2 publish($subject, [$payload], [$reply_to])
274              
275             Publish a message. Alias: C.
276              
277             $nats->publish('foo', 'hello');
278             $nats->publish('foo', 'hello', 'reply.subject');
279              
280             =head2 hpublish($subject, $headers, [$payload], [$reply_to])
281              
282             Publish with headers. Alias: C.
283              
284             $nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');
285              
286             =head2 subscribe($subject, $cb, [$queue_group])
287              
288             Subscribe to a subject. Returns subscription ID. Alias: C.
289              
290             my $sid = $nats->subscribe('foo.*', sub {
291             my ($subject, $payload, $reply, $headers) = @_;
292             });
293              
294             Queue groups are preserved across reconnects.
295              
296             Callback receives:
297              
298             =over
299              
300             =item C<$subject> - actual subject the message was published to
301              
302             =item C<$payload> - message body
303              
304             =item C<$reply> - reply-to subject (undef if none)
305              
306             =item C<$headers> - raw headers string (only for HMSG)
307              
308             =back
309              
310             =head2 subscribe_max($subject, $cb, $max_msgs, [$queue_group])
311              
312             Subscribe and auto-unsubscribe after C<$max_msgs> messages in one call.
313              
314             =head2 unsubscribe($sid, [$max_msgs])
315              
316             Unsubscribe. With C<$max_msgs>, auto-unsubscribes after receiving that many
317             messages. Auto-unsub state is restored on reconnect. Alias: C.
318              
319             =head2 request($subject, $payload, $cb, [$timeout_ms])
320              
321             Request/reply. Uses automatic inbox subscription. Alias: C.
322              
323             $nats->request('service', 'data', sub {
324             my ($response, $err) = @_;
325             die $err if $err;
326             print "got: $response\n";
327             }, 5000);
328              
329             Callback receives C<($response, $error)>. Error is set on timeout
330             ("request timeout") or no responders ("no responders").
331              
332             =head2 drain([$cb])
333              
334             Graceful shutdown: sends UNSUB for all subscriptions, flushes pending
335             writes with a PING fence, fires C<$cb> when the server confirms with
336             PONG, then disconnects. No new messages will be received after drain
337             is initiated.
338              
339             $nats->drain(sub {
340             print "drained, safe to exit\n";
341             });
342              
343             =head2 ping
344              
345             Send PING to server.
346              
347             =head2 flush
348              
349             Send PING as a write fence; the subsequent PONG guarantees all prior
350             messages were processed by the server.
351              
352             =head2 server_info
353              
354             Returns raw INFO JSON string from server.
355              
356             =head2 max_payload([$limit])
357              
358             Get/set max payload size.
359              
360             =head2 waiting_count
361              
362             Number of writes queued locally (during connect/reconnect).
363              
364             =head2 skip_waiting
365              
366             Cancel all waiting writes.
367              
368             =head2 reconnect($enable, [$delay_ms], [$max_attempts])
369              
370             Configure reconnection.
371              
372             =head2 reconnect_enabled
373              
374             Returns true if reconnect is enabled.
375              
376             =head2 connect_timeout([$ms])
377              
378             Get/set connect timeout.
379              
380             =head2 ping_interval([$ms])
381              
382             Get/set PING interval.
383              
384             =head2 max_pings_outstanding([$num])
385              
386             Get/set max outstanding PINGs.
387              
388             =head2 priority([$num])
389              
390             Get/set EV watcher priority.
391              
392             =head2 keepalive([$seconds])
393              
394             Get/set TCP keepalive.
395              
396             =head2 batch($coderef)
397              
398             Batch multiple publishes into a single write. Suppresses per-publish
399             write scheduling; all buffered data is flushed after the coderef returns.
400              
401             $nats->batch(sub {
402             $nats->publish("foo.$_", "msg-$_") for 1..1000;
403             });
404              
405             =head2 slow_consumer($bytes_threshold, [$cb])
406              
407             Enable slow consumer detection. When the write buffer exceeds
408             C<$bytes_threshold> bytes, C<$cb> is called with the current buffer size.
409              
410             $nats->slow_consumer(1024*1024, sub {
411             my ($pending_bytes) = @_;
412             warn "slow consumer: ${pending_bytes}B pending\n";
413             });
414              
415             =head2 on_lame_duck([$cb])
416              
417             Get/set callback for lame duck mode. Fired when the server signals
418             it's shutting down (leaf node / rolling restart). Use this to migrate
419             to another server.
420              
421             =head2 nkey_seed($seed)
422              
423             Set NKey seed for Ed25519 authentication (requires OpenSSL at build time).
424             The seed is a base32-encoded NATS NKey. The server nonce from INFO is
425             automatically signed during CONNECT.
426              
427             $nats->nkey_seed('SUAM...');
428              
429             Or via constructor: C 'SUAM...'>.
430              
431             =head2 jwt($token)
432              
433             Set user JWT for authentication. Combined with C for
434             NATS decentralized auth.
435              
436             =head2 tls($enable, [$ca_file], [$skip_verify])
437              
438             Configure TLS (requires OpenSSL at build time).
439              
440             $nats->tls(1); # system CA
441             $nats->tls(1, '/path/to/ca.pem'); # custom CA
442             $nats->tls(1, undef, 1); # skip verification
443              
444             Or via constructor: C 1, tls_ca_file =E $path>.
445              
446             =head2 stats
447              
448             Returns a hash of connection statistics:
449              
450             my %s = $nats->stats;
451             # msgs_in, msgs_out, bytes_in, bytes_out
452              
453             =head2 reset_stats
454              
455             Reset all stats counters to zero.
456              
457             =head2 on_error([$cb])
458              
459             =head2 on_connect([$cb])
460              
461             =head2 on_disconnect([$cb])
462              
463             Get/set handler callbacks.
464              
465             =head1 BENCHMARKS
466              
467             Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12,
468             100-byte payloads (C):
469              
470             100K msgs 200K msgs
471             PUB fire-and-forget 4.7M 5.0M msgs/sec
472             PUB + SUB (loopback) 1.8M 1.6M msgs/sec
473             PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
474             REQ/REP (pipelined, 128) 334K msgs/sec
475              
476             Connected-path publish appends directly to the write buffer with no
477             per-message allocation. Write coalescing via C batches
478             all publishes per event-loop iteration into a single C syscall.
479              
480             Run C for full results. Set C,
481             C, C, C to customize.
482              
483             =head1 NATS PROTOCOL
484              
485             This module implements the NATS client protocol directly in XS.
486             The protocol is text-based with CRLF-delimited control lines and
487             binary payloads.
488              
489             Connection flow: server sends INFO, client sends CONNECT + PING,
490             server responds with PONG to confirm. All subscriptions (including
491             queue groups and auto-unsub state) are automatically restored on
492             reconnect.
493              
494             Request/reply uses a single wildcard inbox subscription
495             (C<_INBOX.ErandomE.*>) for all requests, with unique
496             suffixes per request.
497              
498             =head1 CAVEATS
499              
500             =over
501              
502             =item * DNS resolution via C is blocking. Use numeric IP
503             addresses for latency-sensitive applications.
504              
505              
506             =item * TLS requires OpenSSL headers at build time (auto-detected).
507              
508             =item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
509              
510             =item * The module handles all data as bytes. Encode UTF-8 strings before
511             passing them.
512              
513             =back
514              
515             =head1 ENVIRONMENT
516              
517             =over
518              
519             =item TEST_NATS_HOST, TEST_NATS_PORT
520              
521             Set these to run the test suite against a NATS server
522             (default: 127.0.0.1:4222).
523              
524             =back
525              
526             =head1 SEE ALSO
527              
528             L, L,
529             L
530              
531             =head1 AUTHOR
532              
533             vividsnow
534              
535             =head1 LICENSE
536              
537             This library is free software; you can redistribute it and/or modify it
538             under the same terms as Perl itself.
539              
540             =cut