File Coverage

blib/lib/Net/Async/NATS.pm
Criterion Covered Total %
statement 134 245 54.6
branch 35 116 30.1
condition 19 49 38.7
subroutine 27 41 65.8
pod 15 15 100.0
total 230 466 49.3


line stmt bran cond sub pod time code
1             package Net::Async::NATS;
2             # ABSTRACT: Async NATS client for IO::Async
3             our $VERSION = '0.002';
4 4     4   1370114 use strict;
  4         12  
  4         188  
5 4     4   42 use warnings;
  4         18  
  4         314  
6 4     4   1615 use parent 'IO::Async::Notifier';
  4         1143  
  4         27  
7              
8 4     4   74996 use Carp qw(croak);
  4         12  
  4         289  
9 4     4   27 use Future;
  4         9  
  4         122  
10 4     4   2592 use Future::AsyncAwait;
  4         19758  
  4         28  
11 4     4   3334 use IO::Async::Stream;
  4         333079  
  4         417  
12 4     4   2824 use JSON::MaybeXS qw(encode_json decode_json);
  4         40526  
  4         536  
13 4     4   39 use Scalar::Util qw(weaken);
  4         8  
  4         241  
14              
15 4     4   2602 use Net::Async::NATS::Subscription;
  4         12  
  4         28718  
16              
17              
18             sub configure {
19 2     2 1 938189 my ($self, %params) = @_;
20              
21 2         26 for my $key (qw(host port name user pass auth_token
22             verbose pedantic reconnect max_reconnect_attempts
23             reconnect_wait on_disconnect on_error on_connect)) {
24 28 100       96 $self->{$key} = delete $params{$key} if exists $params{$key};
25             }
26              
27             # Defaults
28 2   50     10 $self->{host} //= 'localhost';
29 2   50     12 $self->{port} //= 4222;
30 2   50     10 $self->{name} //= 'net-async-nats-perl';
31 2   50     16 $self->{verbose} //= 0;
32 2   50     14 $self->{pedantic} //= 0;
33 2   100     11 $self->{reconnect} //= 1;
34 2   50     12 $self->{max_reconnect_attempts} //= 10;
35 2   50     14 $self->{reconnect_wait} //= 2;
36              
37             # Internal state
38 2   50     14 $self->{_sid_counter} //= 0;
39 2   50     15 $self->{_subscriptions} //= {};
40 2   50     16 $self->{_pending} //= [];
41 2   50     15 $self->{_connected} //= 0;
42 2   50     14 $self->{_server_info} //= {};
43 2   50     14 $self->{_ping_future} //= undef;
44 2   50     12 $self->{_connect_future} //= undef;
45 2   33     18 $self->{_inbox_prefix} //= '_INBOX.' . _random_id();
46              
47 2         28 $self->SUPER::configure(%params);
48             }
49              
50              
51 1     1 1 148 sub host { $_[0]->{host} }
52 1     1 1 7 sub port { $_[0]->{port} }
53 1     1 1 6 sub name { $_[0]->{name} }
54 1     1 1 7 sub verbose { $_[0]->{verbose} }
55 1     1 1 7 sub pedantic { $_[0]->{pedantic} }
56              
57              
58 4     4 1 32 sub server_info { $_[0]->{_server_info} }
59 2     2 1 673 sub is_connected { $_[0]->{_connected} }
60              
61              
62 1     1 1 143 async sub connect {
63 1         2 my ($self) = @_;
64              
65 1 50       6 croak "Already connected" if $self->{_connected};
66              
67 1         4 $self->{_connect_future} = $self->loop->new_future;
68              
69             my $stream = IO::Async::Stream->new(
70             handle => undef,
71 1     1   18694 on_read => sub { $self->_on_read(@_) },
72 0     0   0 on_read_eof => sub { $self->_on_disconnect('read_eof') },
73 0     0   0 on_write_eof => sub { $self->_on_disconnect('write_eof') },
74 0     0   0 on_read_error => sub { $self->_on_error("Read error: $_[1]") },
75 0     0   0 on_write_error => sub { $self->_on_error("Write error: $_[1]") },
76 1         78 );
77 1         263 $self->{_stream} = $stream;
78 1         9 $self->add_child($stream);
79              
80             # Retain the TCP connect future — if GC'd before resolution, the
81             # stream never gets its handle and on_read never fires.
82             $self->{_tcp_connect_future} = $stream->connect(
83             host => $self->{host},
84             service => $self->{port},
85             )->on_fail(sub {
86 0     0   0 my $err = shift;
87             $self->{_connect_future}->fail("Connection failed: $err")
88 0 0       0 unless $self->{_connect_future}->is_ready;
89 1         158 });
90              
91 1         52017 return await $self->{_connect_future};
92             }
93              
94              
95 0     0 1 0 async sub publish {
96 0         0 my ($self, $subject, $payload, %opts) = @_;
97              
98 0 0       0 croak "Not connected" unless $self->{_connected};
99 0 0 0     0 croak "Subject required" unless defined $subject && length $subject;
100              
101 0   0     0 $payload //= '';
102 0         0 my $bytes = length($payload);
103 0         0 my $reply = $opts{reply_to};
104              
105 0 0       0 my $cmd = defined $reply
106             ? "PUB $subject $reply $bytes\r\n$payload\r\n"
107             : "PUB $subject $bytes\r\n$payload\r\n";
108              
109 0         0 $self->_write($cmd);
110 0         0 return;
111             }
112              
113              
114 0     0 1 0 async sub subscribe {
115 0         0 my ($self, $subject, $callback, %opts) = @_;
116              
117 0 0       0 croak "Not connected" unless $self->{_connected};
118 0 0 0     0 croak "Subject required" unless defined $subject && length $subject;
119 0 0       0 croak "Callback required" unless ref $callback eq 'CODE';
120              
121 0         0 my $sid = ++$self->{_sid_counter};
122 0         0 my $queue = $opts{queue};
123              
124 0         0 my $sub = Net::Async::NATS::Subscription->new(
125             sid => $sid,
126             subject => $subject,
127             queue => $queue,
128             callback => $callback,
129             );
130 0         0 $self->{_subscriptions}{$sid} = $sub;
131              
132 0 0       0 my $cmd = defined $queue
133             ? "SUB $subject $queue $sid\r\n"
134             : "SUB $subject $sid\r\n";
135              
136 0         0 $self->_write($cmd);
137 0         0 return $sub;
138             }
139              
140              
141 0     0 1 0 async sub unsubscribe {
142 0         0 my ($self, $sub, %opts) = @_;
143              
144 0 0       0 croak "Not connected" unless $self->{_connected};
145              
146 0 0       0 my $sid = ref $sub ? $sub->sid : $sub;
147 0         0 my $max = $opts{max_msgs};
148              
149 0 0       0 my $cmd = defined $max
150             ? "UNSUB $sid $max\r\n"
151             : "UNSUB $sid\r\n";
152              
153 0         0 $self->_write($cmd);
154 0 0       0 delete $self->{_subscriptions}{$sid} unless defined $max;
155 0         0 return;
156             }
157              
158              
159 0     0 1 0 async sub request {
160 0         0 my ($self, $subject, $payload, %opts) = @_;
161              
162 0 0       0 croak "Not connected" unless $self->{_connected};
163              
164 0   0     0 my $timeout = $opts{timeout} // 30;
165 0         0 my $inbox = $self->{_inbox_prefix} . '.' . _random_id();
166 0         0 my $f = $self->loop->new_future;
167              
168             my $sub = await $self->subscribe($inbox, sub {
169 0     0   0 my ($subj, $data, $reply) = @_;
170 0 0       0 $f->done($data, $subj) unless $f->is_ready;
171 0         0 });
172              
173             # Auto-unsubscribe after 1 message
174 0         0 await $self->unsubscribe($sub, max_msgs => 1);
175              
176             # Publish with reply-to
177 0         0 await $self->publish($subject, $payload, reply_to => $inbox);
178              
179             # Apply timeout
180 0         0 my $timer = $self->loop->delay_future(after => $timeout);
181 0         0 my $result = await Future->wait_any($f, $timer->then_fail('Request timed out'));
182              
183 0         0 return $result;
184             }
185              
186              
187 0     0 1 0 async sub ping {
188 0         0 my ($self) = @_;
189 0 0       0 croak "Not connected" unless $self->{_connected};
190              
191 0         0 $self->{_ping_future} = $self->loop->new_future;
192 0         0 $self->_write("PING\r\n");
193 0         0 return await $self->{_ping_future};
194             }
195              
196              
197 1     1 1 3 async sub disconnect {
198 1         3 my ($self) = @_;
199 1 50       4 return unless $self->{_connected};
200              
201 1         3 $self->{_connected} = 0;
202 1         21 $self->{reconnect} = 0; # suppress auto-reconnect
203 1         17 $self->{_subscriptions} = {};
204              
205 1 50       6 if (my $stream = $self->{_stream}) {
206 1         7 $stream->close_when_empty;
207             }
208 1         47 return;
209             }
210              
211             # ── Wire protocol parsing ────────────────────────────────
212              
213             sub _on_read {
214 10     10   10207 my ($self, $stream, $buffref, $eof) = @_;
215              
216 10         165 while ($$buffref =~ s/\A([^\r\n]*)\r\n//) {
217 11         70 my $line = $1;
218              
219             # MSG [reply-to] <#bytes>
220 11 100       74 if ($line =~ /\AMSG\s+(\S+)\s+(\S+)\s+(?:(\S+)\s+)?(\d+)\z/i) {
221 5         30 my ($subject, $sid, $reply_to, $bytes) = ($1, $2, $3, $4);
222              
223             # Need to read payload + trailing CRLF
224 5 50       23 if (length($$buffref) < $bytes + 2) {
225             # Put line back, wait for more data
226 0 0       0 $$buffref = "MSG $subject $sid "
227             . (defined $reply_to ? "$reply_to " : '')
228             . "$bytes\r\n$$buffref";
229 0         0 return 0;
230             }
231              
232 5         15 my $payload = substr($$buffref, 0, $bytes, '');
233 5         17 $$buffref =~ s/\A\r\n//; # consume trailing CRLF
234              
235 5         21 $self->_dispatch_msg($subject, $sid, $reply_to, $payload);
236 5         24 next;
237             }
238              
239             # HMSG [reply-to] <#header_bytes> <#total_bytes>
240 6 50       37 if ($line =~ /\AHMSG\s+(\S+)\s+(\S+)\s+(?:(\S+)\s+)?(\d+)\s+(\d+)\z/i) {
241 0         0 my ($subject, $sid, $reply_to, $hdr_bytes, $total_bytes) = ($1, $2, $3, $4, $5);
242              
243 0 0       0 if (length($$buffref) < $total_bytes + 2) {
244 0 0       0 $$buffref = "HMSG $subject $sid "
245             . (defined $reply_to ? "$reply_to " : '')
246             . "$hdr_bytes $total_bytes\r\n$$buffref";
247 0         0 return 0;
248             }
249              
250 0         0 my $raw = substr($$buffref, 0, $total_bytes, '');
251 0         0 $$buffref =~ s/\A\r\n//;
252              
253 0         0 my $payload_bytes = $total_bytes - $hdr_bytes;
254 0 0       0 my $payload = $payload_bytes > 0
255             ? substr($raw, $hdr_bytes)
256             : '';
257              
258             # For now, ignore headers — deliver payload only
259 0         0 $self->_dispatch_msg($subject, $sid, $reply_to, $payload);
260 0         0 next;
261             }
262              
263             # INFO {...}
264 6 100       31 if ($line =~ /\AINFO\s+(\{.*\})\s*\z/i) {
265 2   50     7 my $info = eval { decode_json($1) } // {};
  2         107  
266 2         11 $self->{_server_info} = $info;
267 2         12 $self->_handle_info($info);
268 2         13 next;
269             }
270              
271             # PING
272 4 100       20 if ($line =~ /\APING\z/i) {
273 1         7 $self->_write("PONG\r\n");
274 1         9 next;
275             }
276              
277             # PONG
278 3 100       11 if ($line =~ /\APONG\z/i) {
279 1 50       5 if (my $f = delete $self->{_ping_future}) {
280 1 50       12 $f->done if !$f->is_ready;
281             }
282 1         90 next;
283             }
284              
285             # +OK
286 2 100       12 next if $line =~ /\A\+OK\z/i;
287              
288             # -ERR
289 1 50       11 if ($line =~ /\A-ERR\s+'?(.+?)'?\z/i) {
290 1         9 $self->_on_error($1);
291 1         8 next;
292             }
293             }
294              
295 10         33 return 0;
296             }
297              
298             sub _handle_info {
299 2     2   8 my ($self, $info) = @_;
300              
301             # If we haven't sent CONNECT yet, do it now
302 2 50       14 return if $self->{_connected};
303              
304             my %connect = (
305             verbose => $self->{verbose} ? \1 : \0,
306             pedantic => $self->{pedantic} ? \1 : \0,
307             tls_required => \0,
308             lang => 'perl',
309             version => ($Net::Async::NATS::VERSION // '0.001'),
310             name => $self->{name},
311 2 50 50     45 protocol => 1,
    50          
312             echo => \1,
313             headers => \1,
314             no_responders => \1,
315             );
316              
317 2 50       9 if (defined $self->{auth_token}) {
318 0         0 $connect{auth_token} = $self->{auth_token};
319             }
320 2 50       8 if (defined $self->{user}) {
321 0         0 $connect{user} = $self->{user};
322 0   0     0 $connect{pass} = $self->{pass} // '';
323             }
324              
325 2         36 my $json = encode_json(\%connect);
326 2         17 $self->_write("CONNECT $json\r\n");
327              
328             # Send initial PING to verify connection
329 2         304 $self->_write("PING\r\n");
330              
331 2         169 $self->{_connected} = 1;
332 2         11 delete $self->{_tcp_connect_future}; # no longer needed
333              
334 2 100       12 if (my $f = delete $self->{_connect_future}) {
335 1 50       7 $f->done($info) unless $f->is_ready;
336             }
337              
338 2 50       533 if (my $cb = $self->{on_connect}) {
339 0         0 $cb->($self, $info);
340             }
341             }
342              
343             sub _dispatch_msg {
344 5     5   15 my ($self, $subject, $sid, $reply_to, $payload) = @_;
345              
346 5         13 my $sub = $self->{_subscriptions}{$sid};
347 5 50       15 return unless $sub;
348              
349 5         76 $sub->callback->($subject, $payload, $reply_to);
350              
351 5 50       37 if (defined $sub->max_msgs) {
352 0         0 $sub->{_received}++;
353 0 0       0 if ($sub->{_received} >= $sub->max_msgs) {
354 0         0 delete $self->{_subscriptions}{$sid};
355             }
356             }
357             }
358              
359             sub _write {
360 4     4   10 my ($self, $data) = @_;
361 4 100       18 if (my $stream = $self->{_stream}) {
362 2         7 $stream->write($data);
363             }
364             }
365              
366             sub _on_disconnect {
367 0     0   0 my ($self, $reason) = @_;
368 0         0 $self->{_connected} = 0;
369              
370 0 0       0 if (my $cb = $self->{on_disconnect}) {
371 0         0 $cb->($self, $reason);
372             }
373              
374 0 0       0 if ($self->{reconnect}) {
375 0         0 $self->_reconnect;
376             }
377             }
378              
379             sub _on_error {
380 1     1   5 my ($self, $error) = @_;
381 1 50       5 if (my $cb = $self->{on_error}) {
382 1         5 $cb->($self, $error);
383             }
384             }
385              
386             sub _reconnect {
387 0     0   0 my ($self) = @_;
388 0         0 my $attempts = 0;
389 0         0 my $max = $self->{max_reconnect_attempts};
390 0         0 my $wait = $self->{reconnect_wait};
391              
392 0         0 weaken(my $weak_self = $self);
393              
394 0         0 my $try; $try = sub {
395 0 0   0   0 my $self = $weak_self or return;
396 0 0       0 return if $self->{_connected};
397 0 0       0 return if ++$attempts > $max;
398              
399             $self->loop->delay_future(after => $wait)->on_done(sub {
400 0 0       0 my $self = $weak_self or return;
401 0 0       0 return if $self->{_connected};
402              
403             # Remove old stream
404 0 0       0 if (my $old = delete $self->{_stream}) {
405 0 0       0 $self->remove_child($old) if $old->parent;
406             }
407              
408             # Save subscriptions for replay
409 0         0 my %saved_subs = %{ $self->{_subscriptions} };
  0         0  
410              
411 0         0 eval {
412 0         0 my $f = $self->connect;
413             $f->on_done(sub {
414 0 0       0 my $self = $weak_self or return;
415             # Replay subscriptions
416 0         0 for my $sub (values %saved_subs) {
417 0 0       0 my $cmd = defined $sub->queue
418             ? "SUB " . $sub->subject . " " . $sub->queue . " " . $sub->sid . "\r\n"
419             : "SUB " . $sub->subject . " " . $sub->sid . "\r\n";
420 0         0 $self->_write($cmd);
421 0         0 $self->{_subscriptions}{$sub->sid} = $sub;
422             }
423 0         0 });
424 0         0 $f->on_fail(sub { $try->() });
  0         0  
425             };
426 0         0 });
427 0         0 };
428              
429 0         0 $try->();
430             }
431              
432             sub _random_id {
433 2     2   7 my $bytes = '';
434 2 50       129 if (open my $fh, '<:raw', '/dev/urandom') {
435 2         221 read $fh, $bytes, 12;
436 2         34 close $fh;
437             } else {
438 0         0 $bytes = pack('N3', rand(2**32), rand(2**32), rand(2**32));
439             }
440 2         33 return unpack('H*', $bytes);
441             }
442              
443              
444             1;
445              
446             __END__