| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package EV::Nats; |
|
2
|
14
|
|
|
14
|
|
1687986
|
use strict; |
|
|
14
|
|
|
|
|
19
|
|
|
|
14
|
|
|
|
|
418
|
|
|
3
|
14
|
|
|
14
|
|
63
|
use warnings; |
|
|
14
|
|
|
|
|
19
|
|
|
|
14
|
|
|
|
|
1713
|
|
|
4
|
14
|
|
|
14
|
|
64
|
use EV; |
|
|
14
|
|
|
|
|
35
|
|
|
|
14
|
|
|
|
|
389
|
|
|
5
|
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
BEGIN { |
|
7
|
14
|
|
|
14
|
|
56
|
use XSLoader; |
|
|
14
|
|
|
|
|
24
|
|
|
|
14
|
|
|
|
|
568
|
|
|
8
|
14
|
|
|
14
|
|
34
|
our $VERSION = '0.01'; |
|
9
|
14
|
|
|
|
|
34895
|
XSLoader::load __PACKAGE__, $VERSION; |
|
10
|
|
|
|
|
|
|
} |
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
*pub = \&publish; |
|
13
|
|
|
|
|
|
|
*hpub = \&hpublish; |
|
14
|
|
|
|
|
|
|
*sub = \&subscribe; |
|
15
|
|
|
|
|
|
|
*unsub = \&unsubscribe; |
|
16
|
|
|
|
|
|
|
*req = \&request; |
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub creds_file { |
|
19
|
0
|
|
|
0
|
0
|
|
my ($self, $path) = @_; |
|
20
|
0
|
0
|
|
|
|
|
open my $fh, '<', $path or die "cannot open creds file $path: $!"; |
|
21
|
0
|
|
|
|
|
|
my $content = do { local $/; <$fh> }; |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
22
|
0
|
|
|
|
|
|
close $fh; |
|
23
|
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
# NATS creds format: --- BEGIN USER JWT --- / jwt / --- END / --- BEGIN NKEY SEED --- / seed / --- END |
|
25
|
0
|
0
|
|
|
|
|
if ($content =~ /-----BEGIN NATS USER JWT-----\s*\n\s*(\S+)\s*\n/) { |
|
26
|
0
|
|
|
|
|
|
$self->jwt($1); |
|
27
|
|
|
|
|
|
|
} |
|
28
|
0
|
0
|
|
|
|
|
if ($content =~ /-----BEGIN USER NKEY SEED-----\s*\n\s*(\S+)\s*\n/) { |
|
29
|
0
|
|
|
|
|
|
$self->nkey_seed($1); |
|
30
|
|
|
|
|
|
|
} |
|
31
|
0
|
|
|
|
|
|
$self; |
|
32
|
|
|
|
|
|
|
} |
|
33
|
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
sub subscribe_max { |
|
35
|
0
|
|
|
0
|
1
|
|
my ($self, $subject, $cb, $max_msgs, $queue_group) = @_; |
|
36
|
0
|
0
|
|
|
|
|
my $sid = defined $queue_group |
|
37
|
|
|
|
|
|
|
? $self->subscribe($subject, $cb, $queue_group) |
|
38
|
|
|
|
|
|
|
: $self->subscribe($subject, $cb); |
|
39
|
0
|
0
|
|
|
|
|
$self->unsubscribe($sid, $max_msgs) if $max_msgs; |
|
40
|
0
|
|
|
|
|
|
$sid; |
|
41
|
|
|
|
|
|
|
} |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
1; |
|
44
|
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
=head1 NAME |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
EV::Nats - High-performance asynchronous NATS client using EV |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
use EV::Nats; |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
my $nats = EV::Nats->new( |
|
54
|
|
|
|
|
|
|
host => '127.0.0.1', |
|
55
|
|
|
|
|
|
|
port => 4222, |
|
56
|
|
|
|
|
|
|
on_error => sub { warn "nats error: @_" }, |
|
57
|
|
|
|
|
|
|
on_connect => sub { warn "connected to NATS" }, |
|
58
|
|
|
|
|
|
|
); |
|
59
|
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
# Subscribe |
|
61
|
|
|
|
|
|
|
my $sid = $nats->subscribe('foo.>', sub { |
|
62
|
|
|
|
|
|
|
my ($subject, $payload, $reply) = @_; |
|
63
|
|
|
|
|
|
|
print "[$subject] $payload\n"; |
|
64
|
|
|
|
|
|
|
}); |
|
65
|
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
# Subscribe with queue group |
|
67
|
|
|
|
|
|
|
$nats->subscribe('worker.>', sub { |
|
68
|
|
|
|
|
|
|
my ($subject, $payload, $reply) = @_; |
|
69
|
|
|
|
|
|
|
}, 'workers'); |
|
70
|
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
# Publish |
|
72
|
|
|
|
|
|
|
$nats->publish('foo.bar', 'hello world'); |
|
73
|
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
# Request/reply |
|
75
|
|
|
|
|
|
|
$nats->request('service.echo', 'ping', sub { |
|
76
|
|
|
|
|
|
|
my ($response, $err) = @_; |
|
77
|
|
|
|
|
|
|
die $err if $err; |
|
78
|
|
|
|
|
|
|
print "reply: $response\n"; |
|
79
|
|
|
|
|
|
|
}, 5000); # 5s timeout |
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
# Unsubscribe |
|
82
|
|
|
|
|
|
|
$nats->unsubscribe($sid); |
|
83
|
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
EV::run; |
|
85
|
|
|
|
|
|
|
|
|
86
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
87
|
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
EV::Nats is a high-performance asynchronous NATS client that implements |
|
89
|
|
|
|
|
|
|
the NATS client protocol in pure XS with L event loop integration. |
|
90
|
|
|
|
|
|
|
No external C NATS library is required. |
|
91
|
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
Features: |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=over |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
=item * Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG) |
|
97
|
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
=item * Request/reply with automatic inbox management |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=item * Queue group subscriptions for load balancing |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
=item * Wildcard subjects (C<*> and C>) |
|
103
|
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
=item * Headers support (HPUB/HMSG) |
|
105
|
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item * Automatic PING/PONG keep-alive |
|
107
|
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=item * Automatic reconnection with subscription and queue group restore |
|
109
|
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
=item * Fire-and-forget publish (no callback overhead) |
|
111
|
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
=item * Token, user/pass authentication |
|
113
|
|
|
|
|
|
|
|
|
114
|
|
|
|
|
|
|
=item * TCP keepalive and connect timeout |
|
115
|
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
=item * Write coalescing via ev_prepare (batches writes per event loop iteration) |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item * O(1) subscription lookup via hash table |
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
=item * Graceful drain (unsubscribe all, flush, then disconnect) |
|
121
|
|
|
|
|
|
|
|
|
122
|
|
|
|
|
|
|
=item * Server pool with cluster URL failover from INFO connect_urls |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
=item * Optional TLS via OpenSSL (auto-detected at build time) |
|
125
|
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=item * Reconnect jitter to prevent thundering herd |
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=item * Per-connection stats counters (msgs/bytes in/out) |
|
129
|
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
=item * JetStream API (L) |
|
131
|
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
=item * Key-Value store (L) |
|
133
|
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
=item * Object store with chunking (L) |
|
135
|
|
|
|
|
|
|
|
|
136
|
|
|
|
|
|
|
=item * NKey/JWT authentication (Ed25519 via OpenSSL) |
|
137
|
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
=item * Slow consumer detection with configurable threshold |
|
139
|
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
=item * Publish batching API (C) |
|
141
|
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
=item * Lame duck mode (leaf node graceful shutdown) notification |
|
143
|
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=back |
|
145
|
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
B DNS resolution via C is blocking. Use numeric IP |
|
147
|
|
|
|
|
|
|
addresses for latency-sensitive applications. |
|
148
|
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
=head1 METHODS |
|
150
|
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=head2 new(%options) |
|
152
|
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
Create a new EV::Nats instance. Connects automatically if C is given. |
|
154
|
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
my $nats = EV::Nats->new( |
|
156
|
|
|
|
|
|
|
host => '127.0.0.1', |
|
157
|
|
|
|
|
|
|
port => 4222, |
|
158
|
|
|
|
|
|
|
on_error => sub { die @_ }, |
|
159
|
|
|
|
|
|
|
); |
|
160
|
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
Options: |
|
162
|
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
=over |
|
164
|
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
=item host => 'Str' |
|
166
|
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
=item port => 'Int' (default 4222) |
|
168
|
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
Server hostname and port. If C is provided, connection starts |
|
170
|
|
|
|
|
|
|
immediately. |
|
171
|
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
=item on_error => $cb->($errstr) |
|
173
|
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
Error callback. Default: C. |
|
175
|
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
=item on_connect => $cb->() |
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
Called when connection is fully established (after CONNECT/PONG handshake). |
|
179
|
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
=item on_disconnect => $cb->() |
|
181
|
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
Called on disconnect. |
|
183
|
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
=item user => 'Str' |
|
185
|
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
=item pass => 'Str' |
|
187
|
|
|
|
|
|
|
|
|
188
|
|
|
|
|
|
|
Username/password authentication. Values are JSON-escaped in the |
|
189
|
|
|
|
|
|
|
CONNECT command. |
|
190
|
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
=item token => 'Str' |
|
192
|
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
Token authentication. |
|
194
|
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
=item name => 'Str' |
|
196
|
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
Client name sent in CONNECT. |
|
198
|
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
=item verbose => $bool (default 0) |
|
200
|
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
Request +OK acknowledgments from server. |
|
202
|
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
=item pedantic => $bool (default 0) |
|
204
|
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
Enable strict subject checking. |
|
206
|
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
=item echo => $bool (default 1) |
|
208
|
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
Receive messages published by this client. |
|
210
|
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
=item no_responders => $bool (default 0) |
|
212
|
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
Enable no-responders notification for requests. |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=item reconnect => $bool (default 0) |
|
216
|
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
Enable automatic reconnection. |
|
218
|
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
=item reconnect_delay => $ms (default 2000) |
|
220
|
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
Delay between reconnect attempts. |
|
222
|
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
=item max_reconnect_attempts => $num (default 60) |
|
224
|
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
Maximum reconnect attempts. 0 = unlimited. |
|
226
|
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
=item connect_timeout => $ms |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
Connection timeout. 0 = no timeout. |
|
230
|
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
=item ping_interval => $ms (default 120000) |
|
232
|
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
Interval for client-initiated PING. 0 = disabled. |
|
234
|
|
|
|
|
|
|
|
|
235
|
|
|
|
|
|
|
=item max_pings_outstanding => $num (default 2) |
|
236
|
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
Max unanswered PINGs before declaring stale connection. |
|
238
|
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=item priority => $num (-2 to +2) |
|
240
|
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
EV watcher priority. |
|
242
|
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
=item keepalive => $seconds |
|
244
|
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
TCP keepalive interval. |
|
246
|
|
|
|
|
|
|
|
|
247
|
|
|
|
|
|
|
=item path => 'Str' |
|
248
|
|
|
|
|
|
|
|
|
249
|
|
|
|
|
|
|
Unix socket path. Mutually exclusive with C. |
|
250
|
|
|
|
|
|
|
|
|
251
|
|
|
|
|
|
|
=item loop => EV::Loop |
|
252
|
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
EV loop to use. Default: C. |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
=back |
|
256
|
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
=head2 connect($host, [$port]) |
|
258
|
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
Connect to NATS server. Port defaults to 4222. |
|
260
|
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
=head2 connect_unix($path) |
|
262
|
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
Connect via Unix domain socket. |
|
264
|
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
=head2 disconnect |
|
266
|
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
Graceful disconnect. |
|
268
|
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
=head2 is_connected |
|
270
|
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
Returns true if connected. |
|
272
|
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
=head2 publish($subject, [$payload], [$reply_to]) |
|
274
|
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
Publish a message. Alias: C. |
|
276
|
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
$nats->publish('foo', 'hello'); |
|
278
|
|
|
|
|
|
|
$nats->publish('foo', 'hello', 'reply.subject'); |
|
279
|
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
=head2 hpublish($subject, $headers, [$payload], [$reply_to]) |
|
281
|
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
Publish with headers. Alias: C. |
|
283
|
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
$nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body'); |
|
285
|
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=head2 subscribe($subject, $cb, [$queue_group]) |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
Subscribe to a subject. Returns subscription ID. Alias: C. |
|
289
|
|
|
|
|
|
|
|
|
290
|
|
|
|
|
|
|
my $sid = $nats->subscribe('foo.*', sub { |
|
291
|
|
|
|
|
|
|
my ($subject, $payload, $reply, $headers) = @_; |
|
292
|
|
|
|
|
|
|
}); |
|
293
|
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
Queue groups are preserved across reconnects. |
|
295
|
|
|
|
|
|
|
|
|
296
|
|
|
|
|
|
|
Callback receives: |
|
297
|
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
=over |
|
299
|
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
=item C<$subject> - actual subject the message was published to |
|
301
|
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
=item C<$payload> - message body |
|
303
|
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
=item C<$reply> - reply-to subject (undef if none) |
|
305
|
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
=item C<$headers> - raw headers string (only for HMSG) |
|
307
|
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
=back |
|
309
|
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
=head2 subscribe_max($subject, $cb, $max_msgs, [$queue_group]) |
|
311
|
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
Subscribe and auto-unsubscribe after C<$max_msgs> messages in one call. |
|
313
|
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
=head2 unsubscribe($sid, [$max_msgs]) |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
Unsubscribe. With C<$max_msgs>, auto-unsubscribes after receiving that many |
|
317
|
|
|
|
|
|
|
messages. Auto-unsub state is restored on reconnect. Alias: C. |
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
=head2 request($subject, $payload, $cb, [$timeout_ms]) |
|
320
|
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
Request/reply. Uses automatic inbox subscription. Alias: C. |
|
322
|
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
$nats->request('service', 'data', sub { |
|
324
|
|
|
|
|
|
|
my ($response, $err) = @_; |
|
325
|
|
|
|
|
|
|
die $err if $err; |
|
326
|
|
|
|
|
|
|
print "got: $response\n"; |
|
327
|
|
|
|
|
|
|
}, 5000); |
|
328
|
|
|
|
|
|
|
|
|
329
|
|
|
|
|
|
|
Callback receives C<($response, $error)>. Error is set on timeout |
|
330
|
|
|
|
|
|
|
("request timeout") or no responders ("no responders"). |
|
331
|
|
|
|
|
|
|
|
|
332
|
|
|
|
|
|
|
=head2 drain([$cb]) |
|
333
|
|
|
|
|
|
|
|
|
334
|
|
|
|
|
|
|
Graceful shutdown: sends UNSUB for all subscriptions, flushes pending |
|
335
|
|
|
|
|
|
|
writes with a PING fence, fires C<$cb> when the server confirms with |
|
336
|
|
|
|
|
|
|
PONG, then disconnects. No new messages will be received after drain |
|
337
|
|
|
|
|
|
|
is initiated. |
|
338
|
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
$nats->drain(sub { |
|
340
|
|
|
|
|
|
|
print "drained, safe to exit\n"; |
|
341
|
|
|
|
|
|
|
}); |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
=head2 ping |
|
344
|
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
Send PING to server. |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
=head2 flush |
|
348
|
|
|
|
|
|
|
|
|
349
|
|
|
|
|
|
|
Send PING as a write fence; the subsequent PONG guarantees all prior |
|
350
|
|
|
|
|
|
|
messages were processed by the server. |
|
351
|
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
=head2 server_info |
|
353
|
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
Returns raw INFO JSON string from server. |
|
355
|
|
|
|
|
|
|
|
|
356
|
|
|
|
|
|
|
=head2 max_payload([$limit]) |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
Get/set max payload size. |
|
359
|
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
=head2 waiting_count |
|
361
|
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
Number of writes queued locally (during connect/reconnect). |
|
363
|
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
=head2 skip_waiting |
|
365
|
|
|
|
|
|
|
|
|
366
|
|
|
|
|
|
|
Cancel all waiting writes. |
|
367
|
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
=head2 reconnect($enable, [$delay_ms], [$max_attempts]) |
|
369
|
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
Configure reconnection. |
|
371
|
|
|
|
|
|
|
|
|
372
|
|
|
|
|
|
|
=head2 reconnect_enabled |
|
373
|
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
Returns true if reconnect is enabled. |
|
375
|
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
=head2 connect_timeout([$ms]) |
|
377
|
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
Get/set connect timeout. |
|
379
|
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
=head2 ping_interval([$ms]) |
|
381
|
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
Get/set PING interval. |
|
383
|
|
|
|
|
|
|
|
|
384
|
|
|
|
|
|
|
=head2 max_pings_outstanding([$num]) |
|
385
|
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
Get/set max outstanding PINGs. |
|
387
|
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
=head2 priority([$num]) |
|
389
|
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
Get/set EV watcher priority. |
|
391
|
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
=head2 keepalive([$seconds]) |
|
393
|
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
Get/set TCP keepalive. |
|
395
|
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=head2 batch($coderef) |
|
397
|
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Batch multiple publishes into a single write. Suppresses per-publish |
|
399
|
|
|
|
|
|
|
write scheduling; all buffered data is flushed after the coderef returns. |
|
400
|
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
$nats->batch(sub { |
|
402
|
|
|
|
|
|
|
$nats->publish("foo.$_", "msg-$_") for 1..1000; |
|
403
|
|
|
|
|
|
|
}); |
|
404
|
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
=head2 slow_consumer($bytes_threshold, [$cb]) |
|
406
|
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
Enable slow consumer detection. When the write buffer exceeds |
|
408
|
|
|
|
|
|
|
C<$bytes_threshold> bytes, C<$cb> is called with the current buffer size. |
|
409
|
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
$nats->slow_consumer(1024*1024, sub { |
|
411
|
|
|
|
|
|
|
my ($pending_bytes) = @_; |
|
412
|
|
|
|
|
|
|
warn "slow consumer: ${pending_bytes}B pending\n"; |
|
413
|
|
|
|
|
|
|
}); |
|
414
|
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
=head2 on_lame_duck([$cb]) |
|
416
|
|
|
|
|
|
|
|
|
417
|
|
|
|
|
|
|
Get/set callback for lame duck mode. Fired when the server signals |
|
418
|
|
|
|
|
|
|
it's shutting down (leaf node / rolling restart). Use this to migrate |
|
419
|
|
|
|
|
|
|
to another server. |
|
420
|
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
=head2 nkey_seed($seed) |
|
422
|
|
|
|
|
|
|
|
|
423
|
|
|
|
|
|
|
Set NKey seed for Ed25519 authentication (requires OpenSSL at build time). |
|
424
|
|
|
|
|
|
|
The seed is a base32-encoded NATS NKey. The server nonce from INFO is |
|
425
|
|
|
|
|
|
|
automatically signed during CONNECT. |
|
426
|
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
$nats->nkey_seed('SUAM...'); |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
Or via constructor: C 'SUAM...'>. |
|
430
|
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
=head2 jwt($token) |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
Set user JWT for authentication. Combined with C for |
|
434
|
|
|
|
|
|
|
NATS decentralized auth. |
|
435
|
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
=head2 tls($enable, [$ca_file], [$skip_verify]) |
|
437
|
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
Configure TLS (requires OpenSSL at build time). |
|
439
|
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
$nats->tls(1); # system CA |
|
441
|
|
|
|
|
|
|
$nats->tls(1, '/path/to/ca.pem'); # custom CA |
|
442
|
|
|
|
|
|
|
$nats->tls(1, undef, 1); # skip verification |
|
443
|
|
|
|
|
|
|
|
|
444
|
|
|
|
|
|
|
Or via constructor: C 1, tls_ca_file =E $path>. |
|
445
|
|
|
|
|
|
|
|
|
446
|
|
|
|
|
|
|
=head2 stats |
|
447
|
|
|
|
|
|
|
|
|
448
|
|
|
|
|
|
|
Returns a hash of connection statistics: |
|
449
|
|
|
|
|
|
|
|
|
450
|
|
|
|
|
|
|
my %s = $nats->stats; |
|
451
|
|
|
|
|
|
|
# msgs_in, msgs_out, bytes_in, bytes_out |
|
452
|
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=head2 reset_stats |
|
454
|
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
Reset all stats counters to zero. |
|
456
|
|
|
|
|
|
|
|
|
457
|
|
|
|
|
|
|
=head2 on_error([$cb]) |
|
458
|
|
|
|
|
|
|
|
|
459
|
|
|
|
|
|
|
=head2 on_connect([$cb]) |
|
460
|
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
=head2 on_disconnect([$cb]) |
|
462
|
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
Get/set handler callbacks. |
|
464
|
|
|
|
|
|
|
|
|
465
|
|
|
|
|
|
|
=head1 BENCHMARKS |
|
466
|
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, |
|
468
|
|
|
|
|
|
|
100-byte payloads (C): |
|
469
|
|
|
|
|
|
|
|
|
470
|
|
|
|
|
|
|
100K msgs 200K msgs |
|
471
|
|
|
|
|
|
|
PUB fire-and-forget 4.7M 5.0M msgs/sec |
|
472
|
|
|
|
|
|
|
PUB + SUB (loopback) 1.8M 1.6M msgs/sec |
|
473
|
|
|
|
|
|
|
PUB + SUB (8B payload) 2.2M 1.9M msgs/sec |
|
474
|
|
|
|
|
|
|
REQ/REP (pipelined, 128) 334K msgs/sec |
|
475
|
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
Connected-path publish appends directly to the write buffer with no |
|
477
|
|
|
|
|
|
|
per-message allocation. Write coalescing via C batches |
|
478
|
|
|
|
|
|
|
all publishes per event-loop iteration into a single C syscall. |
|
479
|
|
|
|
|
|
|
|
|
480
|
|
|
|
|
|
|
Run C for full results. Set C, |
|
481
|
|
|
|
|
|
|
C, C, C to customize. |
|
482
|
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
=head1 NATS PROTOCOL |
|
484
|
|
|
|
|
|
|
|
|
485
|
|
|
|
|
|
|
This module implements the NATS client protocol directly in XS. |
|
486
|
|
|
|
|
|
|
The protocol is text-based with CRLF-delimited control lines and |
|
487
|
|
|
|
|
|
|
binary payloads. |
|
488
|
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
Connection flow: server sends INFO, client sends CONNECT + PING, |
|
490
|
|
|
|
|
|
|
server responds with PONG to confirm. All subscriptions (including |
|
491
|
|
|
|
|
|
|
queue groups and auto-unsub state) are automatically restored on |
|
492
|
|
|
|
|
|
|
reconnect. |
|
493
|
|
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
Request/reply uses a single wildcard inbox subscription |
|
495
|
|
|
|
|
|
|
(C<_INBOX.ErandomE.*>) for all requests, with unique |
|
496
|
|
|
|
|
|
|
suffixes per request. |
|
497
|
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
=head1 CAVEATS |
|
499
|
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
=over |
|
501
|
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
=item * DNS resolution via C is blocking. Use numeric IP |
|
503
|
|
|
|
|
|
|
addresses for latency-sensitive applications. |
|
504
|
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
=item * TLS requires OpenSSL headers at build time (auto-detected). |
|
507
|
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
=item * NKey auth requires OpenSSL with Ed25519 support (1.1.1+). |
|
509
|
|
|
|
|
|
|
|
|
510
|
|
|
|
|
|
|
=item * The module handles all data as bytes. Encode UTF-8 strings before |
|
511
|
|
|
|
|
|
|
passing them. |
|
512
|
|
|
|
|
|
|
|
|
513
|
|
|
|
|
|
|
=back |
|
514
|
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
=head1 ENVIRONMENT |
|
516
|
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
=over |
|
518
|
|
|
|
|
|
|
|
|
519
|
|
|
|
|
|
|
=item TEST_NATS_HOST, TEST_NATS_PORT |
|
520
|
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
Set these to run the test suite against a NATS server |
|
522
|
|
|
|
|
|
|
(default: 127.0.0.1:4222). |
|
523
|
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
=back |
|
525
|
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
527
|
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
L, L, |
|
529
|
|
|
|
|
|
|
L |
|
530
|
|
|
|
|
|
|
|
|
531
|
|
|
|
|
|
|
=head1 AUTHOR |
|
532
|
|
|
|
|
|
|
|
|
533
|
|
|
|
|
|
|
vividsnow |
|
534
|
|
|
|
|
|
|
|
|
535
|
|
|
|
|
|
|
=head1 LICENSE |
|
536
|
|
|
|
|
|
|
|
|
537
|
|
|
|
|
|
|
This library is free software; you can redistribute it and/or modify it |
|
538
|
|
|
|
|
|
|
under the same terms as Perl itself. |
|
539
|
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
=cut |