File Coverage

Nats.xs
Criterion Covered Total %
statement 1 1520 0.0
branch 3 1320 0.2
condition n/a
subroutine n/a
pod n/a
total 4 2840 0.1


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19              
20             #ifdef HAVE_OPENSSL
21             #include
22             #include
23             #include
24             #endif
25              
26             /* ================================================================
27             * Constants
28             * ================================================================ */
29              
30             #define NATS_MAGIC_ALIVE 0xCA5E4A75
31             #define NATS_MAGIC_FREED 0xDEAD4A75
32              
33             #define BUF_INIT_SIZE 16384
34             #define MAX_CONTROL_LINE 4096
35              
36             #define DEFAULT_MAX_PAYLOAD (1024 * 1024)
37              
38             #define PARSE_OP 0
39             #define PARSE_MSG_BODY 1
40              
41             #define MSG_TYPE_MSG 0
42             #define MSG_TYPE_HMSG 1
43              
44             #define CLEAR_HANDLER(field) \
45             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
46              
47             #define NATS_CROAK_UNLESS_CONNECTED(self) \
48             do { \
49             if (!(self)->connected && !(self)->connecting && \
50             !((self)->reconnect_enabled && (self)->reconnect_timer_active)) \
51             croak("not connected"); \
52             } while(0)
53              
54             /* ================================================================
55             * Type declarations
56             * ================================================================ */
57              
58             typedef struct nats_s nats_t;
59             typedef struct nats_sub_s nats_sub_t;
60             typedef struct nats_pub_s nats_pub_t;
61              
62             typedef nats_t* EV__Nats;
63             typedef struct ev_loop* EV__Loop;
64              
65             /* ================================================================
66             * Data structures
67             * ================================================================ */
68              
69             struct nats_sub_s {
70             uint64_t sid;
71             SV *subject;
72             SV *queue_group;
73             SV *cb;
74             int max_msgs;
75             int received;
76             ngx_queue_t queue;
77             };
78              
79             /* PONG callback entry (for flush) */
80             typedef struct nats_pong_cb_s {
81             SV *cb;
82             ngx_queue_t queue;
83             } nats_pong_cb_t;
84              
85             /* Server pool entry (parsed from INFO connect_urls) */
86             typedef struct nats_server_s {
87             char *host;
88             int port;
89             ngx_queue_t queue;
90             } nats_server_t;
91              
92             struct nats_pub_s {
93             char *data;
94             size_t len;
95             ngx_queue_t queue;
96             };
97              
98             struct nats_s {
99             unsigned int magic;
100             struct ev_loop *loop;
101             int fd;
102             int connected;
103             int connecting;
104              
105             ev_io rio, wio;
106             int reading, writing;
107              
108             char *rbuf;
109             size_t rbuf_len, rbuf_cap;
110              
111             char *wbuf;
112             size_t wbuf_len, wbuf_off, wbuf_cap;
113              
114             SV *on_error;
115             SV *on_connect;
116             SV *on_disconnect;
117              
118             ngx_queue_t subs;
119             HV *sub_map; /* SID -> nats_sub_t* hash for O(1) lookup */
120             uint64_t next_sid;
121              
122             ngx_queue_t wait_queue;
123             int waiting_count;
124              
125             /* Write coalescing */
126             ev_prepare prepare_watcher;
127             int prepare_active;
128             int wbuf_dirty;
129              
130             char *host;
131             int port;
132             char *path; /* Unix socket path */
133              
134             int reconnect_enabled;
135             int reconnect_delay_ms;
136             int max_reconnect_delay_ms;
137             int max_reconnect_attempts;
138             int reconnect_attempts;
139             ev_timer reconnect_timer;
140             int reconnect_timer_active;
141             int intentional_disconnect;
142              
143             int connect_timeout_ms;
144             ev_timer connect_timer;
145             int connect_timer_active;
146              
147             int ping_interval_ms;
148             ev_timer ping_timer;
149             int ping_timer_active;
150             int pings_outstanding;
151             int max_pings_outstanding;
152              
153             int max_payload;
154             SV *server_info_json;
155              
156             char *user;
157             char *pass;
158             char *token;
159             char *name;
160             int verbose;
161             int pedantic;
162             int echo;
163             int no_responders;
164              
165             char inbox_prefix[32];
166             uint64_t next_req_id;
167             ngx_queue_t req_queue;
168             uint64_t inbox_sub_sid;
169              
170             int parse_state;
171             int msg_type;
172             /* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
173             size_t msg_subject_off;
174             size_t msg_subject_len;
175             size_t msg_reply_off; /* msg_reply_len == 0 means no reply */
176             size_t msg_reply_len;
177             uint64_t msg_sid;
178             size_t msg_hdr_len;
179             size_t msg_total_len;
180              
181             int priority;
182             int keepalive;
183              
184             /* Stats */
185             UV msgs_in;
186             UV msgs_out;
187             UV bytes_in;
188             UV bytes_out;
189              
190             /* Server pool for cluster failover */
191             ngx_queue_t server_pool;
192             int server_pool_count;
193              
194             /* Drain state */
195             int draining;
196             SV *drain_cb;
197              
198             /* Slow consumer detection */
199             size_t slow_consumer_bytes; /* wbuf threshold, 0 = disabled */
200             SV *on_slow_consumer;
201              
202             /* Batch mode */
203             int batch_mode;
204              
205             /* PONG callback queue (for flush) */
206             ngx_queue_t pong_cbs;
207              
208             /* NKey auth */
209             char *nkey_seed; /* Ed25519 seed (base32-encoded) */
210             char *jwt;
211             char *server_nonce; /* nonce from INFO for signing */
212              
213             /* Lame duck mode (leaf node graceful shutdown) */
214             int ldm;
215             SV *on_ldm;
216              
217             /* TLS */
218             #ifdef HAVE_OPENSSL
219             SSL_CTX *ssl_ctx;
220             SSL *ssl;
221             int tls;
222             int tls_skip_verify;
223             char *tls_ca_file;
224             int ssl_handshaking;
225             #endif
226             };
227              
228             typedef struct nats_req_s {
229             uint64_t req_id;
230             SV *cb;
231             ev_timer timer;
232             int timer_active;
233             nats_t *self;
234             ngx_queue_t queue;
235             } nats_req_t;
236              
237             /* ================================================================
238             * Forward declarations
239             * ================================================================ */
240              
241             static void nats_connect_tcp(nats_t *self);
242             static void nats_connect_unix(nats_t *self);
243             static void nats_do_connect(nats_t *self);
244             static void nats_schedule_reconnect(nats_t *self);
245             static void nats_next_server(nats_t *self);
246             static void nats_free_server_pool(nats_t *self);
247             static void nats_on_read(struct ev_loop *loop, ev_io *w, int revents);
248             static void nats_on_prepare(struct ev_loop *loop, ev_prepare *w, int revents);
249             static void nats_parse_connect_urls(nats_t *self, const char *json, size_t len);
250             static void nats_on_write(struct ev_loop *loop, ev_io *w, int revents);
251             static void nats_on_connect_timeout(struct ev_loop *loop, ev_timer *w, int revents);
252             static void nats_on_reconnect_timer(struct ev_loop *loop, ev_timer *w, int revents);
253             static void nats_on_ping_timer(struct ev_loop *loop, ev_timer *w, int revents);
254             static void nats_send_connect(nats_t *self);
255             static void nats_try_write(nats_t *self);
256             static void nats_process_line(nats_t *self, char *line, size_t len);
257             static void nats_process_msg(nats_t *self, char *payload, size_t len);
258             static void nats_emit_error(nats_t *self, const char *err);
259             static void nats_cleanup(nats_t *self);
260             static void nats_start_ping_timer(nats_t *self);
261             static void nats_drain_waiting(nats_t *self);
262             static void nats_setup_inbox(nats_t *self);
263             static nats_sub_t *nats_find_sub(nats_t *self, uint64_t sid);
264             static void nats_remove_sub(nats_t *self, nats_sub_t *sub);
265             static void nats_cancel_all_requests(nats_t *self, const char *err);
266             static void nats_resub_all(nats_t *self);
267              
268             /* ================================================================
269             * Buffer helpers
270             * ================================================================ */
271              
272 0           static void buf_ensure(char **buf, size_t *cap, size_t needed)
273             {
274 0 0         if (needed <= *cap)
275 0           return;
276 0 0         size_t newcap = *cap ? *cap : BUF_INIT_SIZE;
277 0 0         while (newcap < needed)
278 0           newcap *= 2;
279 0           Renew(*buf, newcap, char);
280 0           *cap = newcap;
281             }
282              
283 0           static void wbuf_append(nats_t *self, const char *data, size_t len)
284             {
285 0           size_t used = self->wbuf_len - self->wbuf_off;
286 0 0         if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
    0          
287 0 0         if (used > 0)
288 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, used);
289 0           self->wbuf_len = used;
290 0           self->wbuf_off = 0;
291             }
292 0           buf_ensure(&self->wbuf, &self->wbuf_cap, self->wbuf_len + len);
293 0           memcpy(self->wbuf + self->wbuf_len, data, len);
294 0           self->wbuf_len += len;
295 0           }
296              
297             /* ================================================================
298             * NKey helpers (base32 + Ed25519)
299             * ================================================================ */
300              
301             #ifdef HAVE_OPENSSL
302             #include
303              
304             /* NATS base32 decode (RFC 4648, no padding) */
305 0           static int nats_base32_decode(const char *src, size_t src_len, unsigned char *dst, size_t *dst_len)
306             {
307             static const int8_t b32_tab[128] = {
308             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
309             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
310             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
311             -1,-1,26,27,28,29,30,31,-1,-1,-1,-1,-1,-1,-1,-1, /* 2-7 */
312             -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14, /* A-O */
313             15,16,17,18,19,20,21,22,23,24,25,-1,-1,-1,-1,-1, /* P-Z */
314             -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14, /* a-o */
315             15,16,17,18,19,20,21,22,23,24,25,-1,-1,-1,-1,-1, /* p-z */
316             };
317 0           uint64_t buf = 0;
318 0           int bits = 0;
319 0           size_t di = 0;
320             size_t i;
321              
322 0 0         for (i = 0; i < src_len; i++) {
323 0           unsigned char c = (unsigned char)src[i];
324 0 0         if (c == '=' || c == ' ') continue;
    0          
325 0 0         if (c >= 128 || b32_tab[c] < 0) return -1;
    0          
326 0           buf = (buf << 5) | b32_tab[c];
327 0           bits += 5;
328 0 0         if (bits >= 8) {
329 0           bits -= 8;
330 0           dst[di++] = (unsigned char)(buf >> bits);
331             }
332             }
333 0           *dst_len = di;
334 0           return 0;
335             }
336              
337             /* NATS base64url encode (no padding) */
338 0           static int nats_base64url_encode(const unsigned char *src, size_t src_len, char *dst, size_t dst_size)
339             {
340             static const char b64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
341 0           size_t di = 0;
342             size_t i;
343 0 0         for (i = 0; i + 2 < src_len; i += 3) {
344 0 0         if (di + 4 > dst_size) return -1;
345 0           uint32_t v = ((uint32_t)src[i] << 16) | ((uint32_t)src[i+1] << 8) | src[i+2];
346 0           dst[di++] = b64[(v >> 18) & 0x3F];
347 0           dst[di++] = b64[(v >> 12) & 0x3F];
348 0           dst[di++] = b64[(v >> 6) & 0x3F];
349 0           dst[di++] = b64[v & 0x3F];
350             }
351 0 0         if (i < src_len) {
352 0 0         if (di + 4 > dst_size) return -1;
353 0           uint32_t v = (uint32_t)src[i] << 16;
354 0 0         if (i + 1 < src_len) v |= (uint32_t)src[i+1] << 8;
355 0           dst[di++] = b64[(v >> 18) & 0x3F];
356 0           dst[di++] = b64[(v >> 12) & 0x3F];
357 0 0         if (i + 1 < src_len) dst[di++] = b64[(v >> 6) & 0x3F];
358             }
359 0 0         if (di < dst_size) dst[di] = '\0';
360 0           return (int)di;
361             }
362              
363             /* CRC16/IBM for NATS NKey validation */
364 0           static uint16_t nats_crc16(const unsigned char *data, size_t len)
365             {
366 0           uint16_t crc = 0;
367             size_t i;
368 0 0         for (i = 0; i < len; i++) {
369 0           crc ^= (uint16_t)data[i];
370             int j;
371 0 0         for (j = 0; j < 8; j++) {
372 0 0         if (crc & 1) crc = (crc >> 1) ^ 0xA001;
373 0           else crc >>= 1;
374             }
375             }
376 0           return crc;
377             }
378              
379 0           static int nats_nkey_sign(const char *seed_encoded, const char *nonce, size_t nonce_len,
380             char *sig_out, size_t sig_out_size)
381             {
382             unsigned char raw[64];
383 0           size_t raw_len = 0;
384              
385 0 0         if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
386 0           return -1;
387 0 0         if (raw_len < 36) return -1; /* prefix(2) + seed(32) + CRC(2) */
388              
389             /* Validate CRC16 */
390 0           uint16_t expected_crc = (uint16_t)raw[raw_len - 2] | ((uint16_t)raw[raw_len - 1] << 8);
391 0           uint16_t actual_crc = nats_crc16(raw, raw_len - 2);
392 0 0         if (expected_crc != actual_crc) return -1;
393              
394 0           unsigned char *seed = raw + 2;
395              
396 0           EVP_PKEY *pkey = EVP_PKEY_new_raw_private_key(EVP_PKEY_ED25519, NULL, seed, 32);
397 0 0         if (!pkey) return -1;
398              
399 0           EVP_MD_CTX *ctx = EVP_MD_CTX_new();
400 0 0         if (!ctx) { EVP_PKEY_free(pkey); return -1; }
401              
402             unsigned char sig[64];
403 0           size_t sig_len = sizeof(sig);
404              
405 0           int ok = (EVP_DigestSignInit(ctx, NULL, NULL, NULL, pkey) == 1 &&
406 0           EVP_DigestSign(ctx, sig, &sig_len, (const unsigned char *)nonce, nonce_len) == 1);
407              
408 0           EVP_MD_CTX_free(ctx);
409 0           EVP_PKEY_free(pkey);
410              
411 0 0         if (!ok) return -1;
412              
413 0           return nats_base64url_encode(sig, sig_len, sig_out, sig_out_size);
414             }
415              
416 0           static int nats_nkey_public(const char *seed_encoded, char *pub_out, size_t pub_out_size)
417             {
418             unsigned char raw[64];
419 0           size_t raw_len = 0;
420              
421 0 0         if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
422 0           return -1;
423 0 0         if (raw_len < 36) return -1;
424              
425 0           uint16_t expected_crc = (uint16_t)raw[raw_len - 2] | ((uint16_t)raw[raw_len - 1] << 8);
426 0 0         if (nats_crc16(raw, raw_len - 2) != expected_crc) return -1;
427              
428 0           unsigned char *seed = raw + 2;
429              
430 0           EVP_PKEY *pkey = EVP_PKEY_new_raw_private_key(EVP_PKEY_ED25519, NULL, seed, 32);
431 0 0         if (!pkey) return -1;
432              
433             unsigned char pub[32];
434 0           size_t pub_len = 32;
435 0           int ok = EVP_PKEY_get_raw_public_key(pkey, pub, &pub_len) == 1;
436 0           EVP_PKEY_free(pkey);
437 0 0         if (!ok) return -1;
438              
439             /* Derive public key prefix from seed prefix.
440             Seed prefix byte encodes type in bits 3-7: User=0xD0>>3, etc.
441             The first two decoded bytes encode: byte0 = (prefix >> 3), byte1 = (prefix << 5) | ... */
442 0           unsigned char seed_prefix = (raw[0] << 3) | (raw[1] >> 5);
443             unsigned char pub_prefix;
444 0           switch (seed_prefix & 0xF8) {
445 0           case 0x90: pub_prefix = 0xD0; break; /* S+User -> U */
446 0           case 0x38: pub_prefix = 0x38; break; /* S+Operator -> O */
447 0           case 0x00: pub_prefix = 0x00; break; /* S+Account -> A */
448 0           default: pub_prefix = 0xD0; break;
449             }
450              
451             /* Build: prefix(1) + pubkey(32) + CRC16(2) = 35 bytes */
452             unsigned char full[35];
453 0           full[0] = pub_prefix;
454 0           memcpy(full + 1, pub, 32);
455 0           uint16_t crc = nats_crc16(full, 33);
456 0           full[33] = crc & 0xFF;
457 0           full[34] = (crc >> 8) & 0xFF;
458              
459             /* Base32 encode 35 bytes -> 56 chars */
460             static const char b32_chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
461 0           size_t di = 0, ci;
462 0           uint64_t buf = 0;
463 0           int bits = 0;
464 0 0         for (ci = 0; ci < 35 && di < pub_out_size - 1; ci++) {
    0          
465 0           buf = (buf << 8) | full[ci];
466 0           bits += 8;
467 0 0         while (bits >= 5 && di < pub_out_size - 1) {
    0          
468 0           bits -= 5;
469 0           pub_out[di++] = b32_chars[(buf >> bits) & 0x1F];
470             }
471             }
472 0 0         if (di < pub_out_size) pub_out[di] = '\0';
473 0           return (int)di;
474             }
475             #endif
476              
477             /* ================================================================
478             * I/O helpers (with optional TLS)
479             * ================================================================ */
480              
481 0           static ssize_t nats_io_read(nats_t *self, void *buf, size_t len)
482             {
483             #ifdef HAVE_OPENSSL
484 0 0         if (self->ssl) {
485 0 0         int n = SSL_read(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
486 0 0         if (n <= 0) {
487 0           int err = SSL_get_error(self->ssl, n);
488 0 0         if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
    0          
489 0           errno = EAGAIN;
490 0           return -1;
491             }
492 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
493 0           errno = EIO;
494 0           return -1;
495             }
496 0           return n;
497             }
498             #endif
499 0           return read(self->fd, buf, len);
500             }
501              
502 0           static ssize_t nats_io_write(nats_t *self, const void *buf, size_t len)
503             {
504             #ifdef HAVE_OPENSSL
505 0 0         if (self->ssl) {
506 0 0         int n = SSL_write(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
507 0 0         if (n <= 0) {
508 0           int err = SSL_get_error(self->ssl, n);
509 0 0         if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
    0          
510 0           errno = EAGAIN;
511 0           return -1;
512             }
513 0           errno = EIO;
514 0           return -1;
515             }
516 0           return n;
517             }
518             #endif
519 0           return write(self->fd, buf, len);
520             }
521              
522             #ifdef HAVE_OPENSSL
523 0           static int nats_ssl_setup(nats_t *self)
524             {
525 0 0         if (!self->ssl_ctx) {
526 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
527 0 0         if (!self->ssl_ctx) return -1;
528              
529 0 0         if (self->tls_ca_file) {
530 0 0         if (!SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL))
531 0           return -1;
532             } else {
533 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
534             }
535              
536 0 0         if (!self->tls_skip_verify)
537 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
538             }
539              
540 0           self->ssl = SSL_new(self->ssl_ctx);
541 0 0         if (!self->ssl) return -1;
542              
543 0           SSL_set_fd(self->ssl, self->fd);
544 0 0         if (self->host)
545 0           SSL_set_tlsext_host_name(self->ssl, self->host);
546              
547 0           return 0;
548             }
549              
550 0           static void nats_ssl_cleanup(nats_t *self)
551             {
552 0 0         if (self->ssl) {
553 0           SSL_free(self->ssl);
554 0           self->ssl = NULL;
555             }
556 0           self->ssl_handshaking = 0;
557 0           }
558              
559 0           static int nats_ssl_handshake(nats_t *self)
560             {
561 0           int ret = SSL_connect(self->ssl);
562 0 0         if (ret == 1) {
563 0           self->ssl_handshaking = 0;
564 0           return 1;
565             }
566 0           int err = SSL_get_error(self->ssl, ret);
567 0 0         if (err == SSL_ERROR_WANT_READ) {
568 0 0         if (!self->reading) {
569 0           ev_io_start(self->loop, &self->rio);
570 0           self->reading = 1;
571             }
572 0           return 0;
573             }
574 0 0         if (err == SSL_ERROR_WANT_WRITE) {
575 0 0         if (!self->writing) {
576 0           ev_io_start(self->loop, &self->wio);
577 0           self->writing = 1;
578             }
579 0           return 0;
580             }
581 0           return -1;
582             }
583             #endif
584              
585             /* ================================================================
586             * JSON string escaper (for CONNECT command)
587             * ================================================================ */
588              
589 0           static int json_escape_string(char *dst, size_t dst_size, const char *src)
590             {
591 0           size_t di = 0;
592 0           const unsigned char *s = (const unsigned char *)src;
593              
594             /* Need room for opening quote, at least one char, closing quote, NUL */
595 0 0         if (dst_size < 3) {
596 0 0         if (dst_size > 0) dst[0] = '\0';
597 0           return 0;
598             }
599              
600 0           dst[di++] = '"';
601              
602 0 0         for (; *s; s++) {
603 0 0         size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
    0          
    0          
604 0 0         if (di + need + 2 > dst_size) break; /* +2 for closing quote + NUL */
605              
606 0 0         if (*s == '"') {
607 0           dst[di++] = '\\'; dst[di++] = '"';
608 0 0         } else if (*s == '\\') {
609 0           dst[di++] = '\\'; dst[di++] = '\\';
610 0 0         } else if (*s < 0x20) {
611 0           di += snprintf(dst + di, dst_size - di, "\\u%04x", *s);
612             } else {
613 0           dst[di++] = *s;
614             }
615             }
616              
617 0           dst[di++] = '"';
618 0           dst[di] = '\0';
619 0           return (int)di;
620             }
621              
622             /* ================================================================
623             * Random hex for inbox prefix
624             * ================================================================ */
625              
626 0           static void nats_gen_hex(char *buf, int nbytes)
627             {
628             static const char hex[] = "0123456789abcdef";
629             unsigned char rnd[16];
630 0           int i, got = 0;
631              
632 0           int fd = open("/dev/urandom", O_RDONLY);
633 0 0         if (fd >= 0) {
634 0 0         while (got < nbytes) {
635 0           ssize_t n = read(fd, rnd + got, nbytes - got);
636 0 0         if (n > 0) got += n;
637 0 0         else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
    0          
    0          
638             }
639 0           close(fd);
640             }
641 0 0         for (i = got; i < nbytes; i++)
642 0           rnd[i] = (unsigned char)(rand() & 0xFF);
643              
644 0 0         for (i = 0; i < nbytes; i++) {
645 0           buf[i*2] = hex[rnd[i] >> 4];
646 0           buf[i*2+1] = hex[rnd[i] & 0x0F];
647             }
648 0           }
649              
650 0           static void nats_setup_inbox(nats_t *self)
651             {
652             char hex[20];
653 0           nats_gen_hex(hex, 8);
654 0           snprintf(self->inbox_prefix, sizeof(self->inbox_prefix),
655             "_INBOX.%.16s.", hex);
656 0           self->next_req_id = 1;
657 0           }
658              
659             /* ================================================================
660             * Subscription management
661             * ================================================================ */
662              
663 0           static nats_sub_t *nats_find_sub(nats_t *self, uint64_t sid)
664             {
665             char key[24];
666 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sid);
667 0           SV **svp = hv_fetch(self->sub_map, key, klen, 0);
668 0 0         if (svp && SvIOK(*svp))
    0          
669 0           return INT2PTR(nats_sub_t *, SvIVX(*svp));
670 0           return NULL;
671             }
672              
673 0           static void nats_register_sub(nats_t *self, nats_sub_t *sub)
674             {
675             char key[24];
676 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sub->sid);
677 0           hv_store(self->sub_map, key, klen, newSViv(PTR2IV(sub)), 0);
678 0           ngx_queue_insert_tail(&self->subs, &sub->queue);
679 0           }
680              
681 0           static void nats_unregister_sub(nats_t *self, nats_sub_t *sub)
682             {
683             char key[24];
684 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sub->sid);
685 0           hv_delete(self->sub_map, key, klen, G_DISCARD);
686 0           }
687              
688 0           static void nats_remove_sub(nats_t *self, nats_sub_t *sub)
689             {
690 0           nats_unregister_sub(self, sub);
691 0           ngx_queue_remove(&sub->queue);
692 0 0         CLEAR_HANDLER(sub->subject);
693 0 0         CLEAR_HANDLER(sub->queue_group);
694 0 0         CLEAR_HANDLER(sub->cb);
695 0           Safefree(sub);
696 0           }
697              
698 0           static void nats_resub_all(nats_t *self)
699             {
700             ngx_queue_t *q;
701             char buf[MAX_CONTROL_LINE];
702              
703 0 0         ngx_queue_foreach(q, &self->subs) {
    0          
704 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
705             STRLEN slen;
706 0           const char *subj = SvPV(sub->subject, slen);
707             int n;
708              
709 0 0         if (sub->queue_group) {
710             STRLEN glen;
711 0           const char *grp = SvPV(sub->queue_group, glen);
712 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %.*s %" UVuf "\r\n",
713 0           (int)slen, subj, (int)glen, grp, (UV)sub->sid);
714             } else {
715 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %" UVuf "\r\n",
716 0           (int)slen, subj, (UV)sub->sid);
717             }
718 0           wbuf_append(self, buf, n);
719              
720             /* Restore auto-unsub if partially consumed */
721 0 0         if (sub->max_msgs > 0) {
722 0           int remaining = sub->max_msgs - sub->received;
723 0 0         if (remaining < 1) remaining = 1;
724 0           n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf " %d\r\n",
725 0           (UV)sub->sid, remaining);
726 0           wbuf_append(self, buf, n);
727             }
728             }
729 0           }
730              
731             /* ================================================================
732             * Connection management
733             * ================================================================ */
734              
735 0           static void nats_stop_watchers(nats_t *self)
736             {
737 0 0         if (self->reading) {
738 0           ev_io_stop(self->loop, &self->rio);
739 0           self->reading = 0;
740             }
741 0 0         if (self->writing) {
742 0           ev_io_stop(self->loop, &self->wio);
743 0           self->writing = 0;
744             }
745 0           }
746              
747 0           static void nats_stop_timers(nats_t *self)
748             {
749 0 0         if (self->connect_timer_active) {
750 0           ev_timer_stop(self->loop, &self->connect_timer);
751 0           self->connect_timer_active = 0;
752             }
753 0 0         if (self->reconnect_timer_active) {
754 0           ev_timer_stop(self->loop, &self->reconnect_timer);
755 0           self->reconnect_timer_active = 0;
756             }
757 0 0         if (self->ping_timer_active) {
758 0           ev_timer_stop(self->loop, &self->ping_timer);
759 0           self->ping_timer_active = 0;
760             }
761 0 0         if (self->prepare_active) {
762 0           ev_prepare_stop(self->loop, &self->prepare_watcher);
763 0           self->prepare_active = 0;
764             }
765 0           }
766              
767 0           static void nats_cancel_all_requests(nats_t *self, const char *err)
768             {
769 0           dSP;
770 0 0         while (!ngx_queue_empty(&self->req_queue)) {
771 0           ngx_queue_t *q = ngx_queue_head(&self->req_queue);
772 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
773 0           ngx_queue_remove(q);
774              
775 0 0         if (req->timer_active) {
776 0           ev_timer_stop(self->loop, &req->timer);
777 0           req->timer_active = 0;
778             }
779              
780 0 0         if (req->cb) {
781 0           ENTER; SAVETMPS;
782 0 0         PUSHMARK(SP);
783 0 0         EXTEND(SP, 2);
784 0           PUSHs(&PL_sv_undef);
785 0           PUSHs(sv_2mortal(newSVpv(err, 0)));
786 0           PUTBACK;
787 0           call_sv(req->cb, G_DISCARD);
788 0 0         FREETMPS; LEAVE;
789 0           SvREFCNT_dec(req->cb);
790             }
791 0           Safefree(req);
792             }
793 0           }
794              
795 0           static void nats_skip_waiting(nats_t *self)
796             {
797 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
798 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
799 0           nats_pub_t *pub = ngx_queue_data(q, nats_pub_t, queue);
800 0           ngx_queue_remove(q);
801 0 0         if (pub->data)
802 0           Safefree(pub->data);
803 0           Safefree(pub);
804 0           self->waiting_count--;
805             }
806 0           }
807              
808 0           static void nats_cleanup(nats_t *self)
809             {
810 0 0         int was_connected = self->connected || self->connecting;
    0          
811              
812 0           nats_stop_watchers(self);
813 0           nats_stop_timers(self);
814              
815 0           self->connected = 0;
816 0           self->connecting = 0;
817 0           self->pings_outstanding = 0;
818 0           self->parse_state = PARSE_OP;
819              
820             #ifdef HAVE_OPENSSL
821 0           nats_ssl_cleanup(self);
822             #endif
823              
824 0 0         if (self->fd >= 0) {
825 0           close(self->fd);
826 0           self->fd = -1;
827             }
828              
829 0           self->rbuf_len = 0;
830 0           self->wbuf_len = 0;
831 0           self->wbuf_off = 0;
832              
833 0           nats_cancel_all_requests(self, "disconnected");
834              
835             /* Clear flush callbacks */
836 0 0         while (!ngx_queue_empty(&self->pong_cbs)) {
837 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
838 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
839 0           ngx_queue_remove(pq);
840 0 0         CLEAR_HANDLER(pcb->cb);
841 0           Safefree(pcb);
842             }
843              
844 0 0         if (was_connected && self->on_disconnect) {
    0          
845 0           dSP;
846 0           ENTER; SAVETMPS;
847 0 0         PUSHMARK(SP);
848 0           PUTBACK;
849 0           call_sv(self->on_disconnect, G_DISCARD);
850 0 0         FREETMPS; LEAVE;
851             }
852              
853 0           nats_schedule_reconnect(self);
854 0           }
855              
856 0           static void nats_emit_error(nats_t *self, const char *err)
857             {
858 0 0         if (self->on_error) {
859 0           dSP;
860 0           ENTER; SAVETMPS;
861 0 0         PUSHMARK(SP);
862 0 0         EXTEND(SP, 1);
863 0           PUSHs(sv_2mortal(newSVpv(err, 0)));
864 0           PUTBACK;
865 0           call_sv(self->on_error, G_DISCARD);
866 0 0         FREETMPS; LEAVE;
867             } else {
868 0           croak("EV::Nats: %s", err);
869             }
870 0           }
871              
872             /* ================================================================
873             * CONNECT command builder
874             * ================================================================ */
875              
876             #define CONNECT_BUF_SIZE 8192
877             #define CONNECT_APPEND(fmt, ...) \
878             do { \
879             if (off < (int)sizeof(buf)) \
880             off += snprintf(buf + off, sizeof(buf) - off, fmt, ##__VA_ARGS__); \
881             } while(0)
882              
883 0           static void nats_send_connect(nats_t *self)
884             {
885             char buf[CONNECT_BUF_SIZE];
886 0           int off = 0;
887             char escaped[1024];
888              
889 0 0         CONNECT_APPEND("CONNECT {");
890 0 0         CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
    0          
    0          
    0          
891             self->verbose ? "true" : "false",
892             self->pedantic ? "true" : "false",
893             self->echo ? "true" : "false");
894 0 0         CONNECT_APPEND(",\"protocol\":1");
895 0 0         if (self->no_responders)
896 0 0         CONNECT_APPEND(",\"no_responders\":true");
897 0 0         CONNECT_APPEND(",\"headers\":true");
898 0 0         CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.01\"");
899              
900 0 0         if (self->user) {
901 0           int elen = json_escape_string(escaped, sizeof(escaped), self->user);
902 0 0         CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
903 0 0         if (self->pass) {
904 0           elen = json_escape_string(escaped, sizeof(escaped), self->pass);
905 0 0         CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
906             }
907             }
908 0 0         if (self->token) {
909 0           int elen = json_escape_string(escaped, sizeof(escaped), self->token);
910 0 0         CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
911             }
912 0 0         if (self->name) {
913 0           int elen = json_escape_string(escaped, sizeof(escaped), self->name);
914 0 0         CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
915             }
916 0 0         if (self->jwt) {
917 0           int elen = json_escape_string(escaped, sizeof(escaped), self->jwt);
918 0 0         CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
919             }
920             #ifdef HAVE_OPENSSL
921 0 0         if (self->nkey_seed && self->server_nonce) {
    0          
922             /* Sign nonce with NKey */
923             char sig[128];
924             char pub[64];
925 0 0         if (nats_nkey_sign(self->nkey_seed, self->server_nonce,
926 0 0         strlen(self->server_nonce), sig, sizeof(sig)) > 0 &&
927 0           nats_nkey_public(self->nkey_seed, pub, sizeof(pub)) > 0) {
928 0 0         CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
929             }
930             }
931             #endif
932              
933 0 0         CONNECT_APPEND("}\r\n");
934              
935 0 0         if (off > (int)sizeof(buf))
936 0           off = (int)sizeof(buf);
937              
938 0           wbuf_append(self, buf, off);
939              
940 0           nats_resub_all(self);
941              
942 0           wbuf_append(self, "PING\r\n", 6);
943 0           }
944              
945             /* ================================================================
946             * Protocol parser
947             * ================================================================ */
948              
949 0           static int nats_parse_msg_args(nats_t *self, char *line, size_t len)
950             {
951 0           char *p = line;
952 0           char *end = line + len;
953             char *tok_start;
954              
955 0 0         if (len < 4) return -1;
956 0           p += 4;
957              
958             /* subject */
959 0           tok_start = p;
960 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
961 0 0         if (p == tok_start) return -1;
962 0           self->msg_subject_off = tok_start - self->rbuf;
963 0           self->msg_subject_len = p - tok_start;
964              
965 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
966              
967             /* sid */
968 0           tok_start = p;
969 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
970 0 0         if (p == tok_start) return -1;
971 0           self->msg_sid = 0;
972             {
973 0           char *s = tok_start;
974 0 0         while (s < p) {
975 0 0         if (*s < '0' || *s > '9') return -1;
    0          
976 0           self->msg_sid = self->msg_sid * 10 + (*s - '0');
977 0           s++;
978             }
979             }
980              
981 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
982              
983             /* Remaining tokens: [reply-to] <#bytes> — use token counting like HMSG */
984 0           int ntokens = 0;
985             char *tokens[3];
986             size_t token_lens[3];
987 0           char *tp = p;
988 0 0         while (tp < end && ntokens < 3) {
    0          
989 0 0         while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
    0          
    0          
990 0 0         if (tp >= end) break;
991 0           tokens[ntokens] = tp;
992 0 0         while (tp < end && *tp != ' ' && *tp != '\t') tp++;
    0          
    0          
993 0           token_lens[ntokens] = tp - tokens[ntokens];
994 0           ntokens++;
995             }
996              
997 0 0         if (ntokens == 1) {
998             /* no reply-to, just #bytes */
999 0           self->msg_reply_off = 0;
1000 0           self->msg_reply_len = 0;
1001 0           self->msg_total_len = 0;
1002 0           char *s = tokens[0]; char *e = s + token_lens[0];
1003 0 0         while (s < e) {
1004 0 0         if (*s < '0' || *s > '9') return -1;
    0          
1005 0           self->msg_total_len = self->msg_total_len * 10 + (*s - '0');
1006 0           s++;
1007             }
1008 0 0         } else if (ntokens == 2) {
1009             /* reply-to + #bytes */
1010 0           self->msg_reply_off = tokens[0] - self->rbuf;
1011 0           self->msg_reply_len = token_lens[0];
1012 0           self->msg_total_len = 0;
1013 0           char *s = tokens[1]; char *e = s + token_lens[1];
1014 0 0         while (s < e) {
1015 0 0         if (*s < '0' || *s > '9') return -1;
    0          
1016 0           self->msg_total_len = self->msg_total_len * 10 + (*s - '0');
1017 0           s++;
1018             }
1019             } else {
1020 0           return -1;
1021             }
1022              
1023 0           self->msg_hdr_len = 0;
1024 0           self->msg_type = MSG_TYPE_MSG;
1025 0           return 0;
1026             }
1027              
1028 0           static int nats_parse_hmsg_args(nats_t *self, char *line, size_t len)
1029             {
1030 0           char *p = line;
1031 0           char *end = line + len;
1032             char *tok_start;
1033              
1034 0 0         if (len < 5) return -1;
1035 0           p += 5;
1036              
1037             /* subject */
1038 0           tok_start = p;
1039 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1040 0 0         if (p == tok_start) return -1;
1041 0           self->msg_subject_off = tok_start - self->rbuf;
1042 0           self->msg_subject_len = p - tok_start;
1043              
1044 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1045              
1046             /* sid */
1047 0           tok_start = p;
1048 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1049 0 0         if (p == tok_start) return -1;
1050 0           self->msg_sid = 0;
1051             {
1052 0           char *s = tok_start;
1053 0 0         while (s < p) {
1054 0 0         if (*s < '0' || *s > '9') return -1;
    0          
1055 0           self->msg_sid = self->msg_sid * 10 + (*s - '0');
1056 0           s++;
1057             }
1058             }
1059              
1060 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1061              
1062 0           int ntokens = 0;
1063             char *tokens[4];
1064             size_t token_lens[4];
1065 0           char *tp = p;
1066 0 0         while (tp < end && ntokens < 4) {
    0          
1067 0 0         while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
    0          
    0          
1068 0 0         if (tp >= end) break;
1069 0           tokens[ntokens] = tp;
1070 0 0         while (tp < end && *tp != ' ' && *tp != '\t') tp++;
    0          
    0          
1071 0           token_lens[ntokens] = tp - tokens[ntokens];
1072 0           ntokens++;
1073             }
1074              
1075 0 0         if (ntokens == 2) {
1076 0           self->msg_reply_off = 0;
1077 0           self->msg_reply_len = 0;
1078 0           self->msg_hdr_len = 0;
1079 0           self->msg_total_len = 0;
1080 0 0         { char *s = tokens[0]; char *e = s + token_lens[0]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
    0          
    0          
1081 0 0         { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
    0          
    0          
1082 0 0         } else if (ntokens == 3) {
1083 0           self->msg_reply_off = tokens[0] - self->rbuf;
1084 0           self->msg_reply_len = token_lens[0];
1085 0           self->msg_hdr_len = 0;
1086 0           self->msg_total_len = 0;
1087 0 0         { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
    0          
    0          
1088 0 0         { char *s = tokens[2]; char *e = s + token_lens[2]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
    0          
    0          
1089             } else {
1090 0           return -1;
1091             }
1092              
1093 0           self->msg_type = MSG_TYPE_HMSG;
1094 0           return 0;
1095             }
1096              
1097 0           static void nats_process_msg(nats_t *self, char *payload, size_t len)
1098             {
1099 0           nats_sub_t *sub = nats_find_sub(self, self->msg_sid);
1100 0 0         if (!sub) return;
1101              
1102 0           self->msgs_in++;
1103 0           self->bytes_in += len;
1104 0           sub->received++;
1105              
1106 0           int max_msgs = sub->max_msgs;
1107 0           int received = sub->received;
1108 0           uint64_t sid = sub->sid;
1109              
1110 0 0         if (sub->cb) {
1111 0           dSP;
1112 0           ENTER; SAVETMPS;
1113 0 0         PUSHMARK(SP);
1114 0 0         EXTEND(SP, 4);
1115              
1116 0           PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_subject_off, self->msg_subject_len)));
1117              
1118 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
    0          
    0          
1119 0           PUSHs(sv_2mortal(newSVpvn(payload + self->msg_hdr_len, len - self->msg_hdr_len)));
1120             } else {
1121 0           PUSHs(sv_2mortal(newSVpvn(payload, len)));
1122             }
1123              
1124 0 0         if (self->msg_reply_len > 0) {
1125 0           PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_reply_off, self->msg_reply_len)));
1126             } else {
1127 0           PUSHs(&PL_sv_undef);
1128             }
1129              
1130 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
    0          
1131 0           PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
1132             }
1133              
1134 0           PUTBACK;
1135 0           call_sv(sub->cb, G_DISCARD);
1136 0 0         FREETMPS; LEAVE;
1137             }
1138              
1139 0 0         if (max_msgs > 0 && received >= max_msgs) {
    0          
1140 0           sub = nats_find_sub(self, sid);
1141 0 0         if (sub)
1142 0           nats_remove_sub(self, sub);
1143             }
1144             }
1145              
1146 0           static void nats_check_inbox_response(nats_t *self, const char *subject, size_t subject_len,
1147             const char *payload, size_t payload_len,
1148             const char *headers, size_t headers_len)
1149             {
1150 0           size_t pfx_len = strlen(self->inbox_prefix);
1151 0 0         if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
    0          
1152 0           return;
1153              
1154 0           const char *id_str = subject + pfx_len;
1155 0           size_t id_len = subject_len - pfx_len;
1156 0           uint64_t req_id = 0;
1157             size_t i;
1158 0 0         for (i = 0; i < id_len; i++) {
1159 0 0         if (id_str[i] < '0' || id_str[i] > '9') return;
    0          
1160 0           req_id = req_id * 10 + (id_str[i] - '0');
1161             }
1162              
1163             ngx_queue_t *q;
1164 0 0         ngx_queue_foreach(q, &self->req_queue) {
    0          
1165 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
1166 0 0         if (req->req_id == req_id) {
1167 0           ngx_queue_remove(q);
1168 0 0         if (req->timer_active) {
1169 0           ev_timer_stop(self->loop, &req->timer);
1170 0           req->timer_active = 0;
1171             }
1172 0 0         if (req->cb) {
1173 0           dSP;
1174 0 0         int is_no_responders = (headers && headers_len >= 12 &&
    0          
1175 0 0         memcmp(headers, "NATS/1.0 503", 12) == 0);
1176 0           ENTER; SAVETMPS;
1177 0 0         PUSHMARK(SP);
1178 0 0         EXTEND(SP, 3);
1179 0 0         if (is_no_responders) {
1180 0           PUSHs(&PL_sv_undef);
1181 0           PUSHs(sv_2mortal(newSVpvn("no responders", 13)));
1182             } else {
1183 0           PUSHs(sv_2mortal(newSVpvn(payload, payload_len)));
1184 0           PUSHs(&PL_sv_undef);
1185 0 0         if (headers && headers_len > 0)
    0          
1186 0           PUSHs(sv_2mortal(newSVpvn(headers, headers_len)));
1187             }
1188 0           PUTBACK;
1189 0           call_sv(req->cb, G_DISCARD);
1190 0 0         FREETMPS; LEAVE;
1191 0           SvREFCNT_dec(req->cb);
1192             }
1193 0           Safefree(req);
1194 0           return;
1195             }
1196             }
1197             }
1198              
1199 0           static void nats_process_line(nats_t *self, char *line, size_t len)
1200             {
1201 0 0         if (len > 0 && line[len-1] == '\r')
    0          
1202 0           len--;
1203              
1204 0 0         if (len == 0) return;
1205              
1206             /* INFO */
1207 0 0         if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
    0          
    0          
1208 0 0         (line[1] == 'N' || line[1] == 'n') &&
    0          
1209 0 0         (line[2] == 'F' || line[2] == 'f') &&
    0          
1210 0 0         (line[3] == 'O' || line[3] == 'o') &&
    0          
1211 0 0         line[4] == ' ') {
1212              
1213 0 0         CLEAR_HANDLER(self->server_info_json);
1214 0           self->server_info_json = newSVpvn(line + 5, len - 5);
1215              
1216             {
1217 0           char *p = line + 5;
1218 0           char *e = line + len;
1219             char *found;
1220              
1221 0           found = strstr(p, "\"max_payload\":");
1222 0 0         if (found && found < e) {
    0          
1223 0           found += 14;
1224 0           self->max_payload = 0;
1225 0 0         while (found < e && *found >= '0' && *found <= '9') {
    0          
    0          
1226 0           self->max_payload = self->max_payload * 10 + (*found - '0');
1227 0           found++;
1228             }
1229             }
1230              
1231 0           nats_parse_connect_urls(self, p, e - p);
1232              
1233             /* Parse nonce for NKey auth */
1234 0           found = strstr(p, "\"nonce\":\"");
1235 0 0         if (found && found < e) {
    0          
1236 0           found += 9;
1237 0           const char *nend = memchr(found, '"', e - found);
1238 0 0         if (nend) {
1239 0           size_t nlen = nend - found;
1240 0 0         if (self->server_nonce) Safefree(self->server_nonce);
1241 0           Newx(self->server_nonce, nlen + 1, char);
1242 0           memcpy(self->server_nonce, found, nlen);
1243 0           self->server_nonce[nlen] = '\0';
1244             }
1245             }
1246              
1247             /* Parse ldm (lame duck mode) */
1248 0           found = strstr(p, "\"ldm\":");
1249 0 0         if (found && found < e) {
    0          
1250 0           found += 6;
1251 0 0         while (found < e && (*found == ' ' || *found == '\t')) found++;
    0          
    0          
1252 0 0         if (found < e && *found == 't') {
    0          
1253 0 0         if (!self->ldm) {
1254 0           self->ldm = 1;
1255 0 0         if (self->on_ldm) {
1256 0           dSP;
1257 0           ENTER; SAVETMPS;
1258 0 0         PUSHMARK(SP);
1259 0           PUTBACK;
1260 0           call_sv(self->on_ldm, G_DISCARD);
1261 0 0         FREETMPS; LEAVE;
1262             }
1263             }
1264             }
1265             }
1266             }
1267              
1268 0 0         if (self->connecting) {
1269 0           nats_send_connect(self);
1270 0           nats_try_write(self);
1271             }
1272 0           return;
1273             }
1274              
1275             /* MSG */
1276 0 0         if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
    0          
    0          
    0          
    0          
1277 0 0         if (nats_parse_msg_args(self, line, len) == 0) {
1278 0 0         if (self->msg_total_len > (size_t)self->max_payload) {
1279 0           nats_emit_error(self, "server sent message exceeding max_payload");
1280 0           nats_cleanup(self);
1281 0           return;
1282             }
1283 0           self->parse_state = PARSE_MSG_BODY;
1284             }
1285 0           return;
1286             }
1287              
1288             /* HMSG */
1289 0 0         if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
    0          
    0          
    0          
    0          
    0          
1290 0 0         if (nats_parse_hmsg_args(self, line, len) == 0) {
1291 0 0         if (self->msg_total_len > (size_t)self->max_payload) {
1292 0           nats_emit_error(self, "server sent message exceeding max_payload");
1293 0           nats_cleanup(self);
1294 0           return;
1295             }
1296 0           self->parse_state = PARSE_MSG_BODY;
1297             }
1298 0           return;
1299             }
1300              
1301             /* PING */
1302 0 0         if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
    0          
    0          
    0          
    0          
1303 0           wbuf_append(self, "PONG\r\n", 6);
1304 0           nats_try_write(self);
1305 0           return;
1306             }
1307              
1308             /* PONG */
1309 0 0         if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
    0          
    0          
    0          
    0          
1310 0 0         if (self->pings_outstanding > 0)
1311 0           self->pings_outstanding--;
1312              
1313 0 0         if (self->connecting && !self->connected) {
    0          
1314 0           self->connecting = 0;
1315 0           self->connected = 1;
1316 0           self->reconnect_attempts = 0;
1317              
1318 0 0         if (self->connect_timer_active) {
1319 0           ev_timer_stop(self->loop, &self->connect_timer);
1320 0           self->connect_timer_active = 0;
1321             }
1322              
1323 0           nats_start_ping_timer(self);
1324              
1325 0 0         if (self->on_connect) {
1326 0           dSP;
1327 0           ENTER; SAVETMPS;
1328 0 0         PUSHMARK(SP);
1329 0           PUTBACK;
1330 0           call_sv(self->on_connect, G_DISCARD);
1331 0 0         FREETMPS; LEAVE;
1332             }
1333              
1334 0           nats_drain_waiting(self);
1335             }
1336              
1337             /* Fire flush/PONG callback (one per PONG, FIFO) */
1338 0 0         if (!ngx_queue_empty(&self->pong_cbs)) {
1339 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
1340 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
1341 0           ngx_queue_remove(pq);
1342 0 0         if (pcb->cb) {
1343 0           dSP;
1344 0           ENTER; SAVETMPS;
1345 0 0         PUSHMARK(SP);
1346 0           PUTBACK;
1347 0           call_sv(pcb->cb, G_DISCARD);
1348 0 0         FREETMPS; LEAVE;
1349 0           SvREFCNT_dec(pcb->cb);
1350 0 0         } else if (self->draining) {
1351             /* Drain marker: fire drain callback and disconnect */
1352 0           self->draining = 0;
1353 0 0         if (self->drain_cb) {
1354 0           dSP;
1355 0           ENTER; SAVETMPS;
1356 0 0         PUSHMARK(SP);
1357 0           PUTBACK;
1358 0           call_sv(self->drain_cb, G_DISCARD);
1359 0 0         FREETMPS; LEAVE;
1360 0 0         CLEAR_HANDLER(self->drain_cb);
1361             }
1362 0           Safefree(pcb);
1363 0           self->intentional_disconnect = 1;
1364 0           nats_cleanup(self);
1365 0           return;
1366             }
1367 0           Safefree(pcb);
1368             }
1369              
1370 0           return;
1371             }
1372              
1373             /* +OK */
1374 0 0         if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
    0          
    0          
    0          
1375 0           return;
1376             }
1377              
1378             /* -ERR */
1379 0 0         if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
    0          
    0          
    0          
    0          
1380 0           char *msg = line + 4;
1381 0           size_t mlen = len - 4;
1382 0 0         while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
    0          
    0          
1383 0 0         while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
    0          
1384              
1385             char errbuf[512];
1386 0           snprintf(errbuf, sizeof(errbuf), "%.*s", (int)mlen, msg);
1387              
1388 0 0         if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
    0          
1389 0           self->intentional_disconnect = 1;
1390             }
1391 0           nats_emit_error(self, errbuf);
1392 0           nats_cleanup(self);
1393 0           return;
1394             }
1395             }
1396              
1397             /* ================================================================
1398             * IO callbacks
1399             * ================================================================ */
1400              
1401 0           static void nats_on_read(struct ev_loop *loop, ev_io *w, int revents)
1402             {
1403 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, rio));
1404             (void)revents;
1405              
1406             #ifdef HAVE_OPENSSL
1407 0 0         if (self->ssl_handshaking) {
1408 0           int hret = nats_ssl_handshake(self);
1409 0 0         if (hret == 0) return;
1410 0 0         if (hret < 0) {
1411 0           nats_emit_error(self, "SSL handshake failed");
1412 0           nats_cleanup(self);
1413 0           return;
1414             }
1415 0           self->ssl_handshaking = 0;
1416 0 0         if (self->writing) {
1417 0           ev_io_stop(self->loop, &self->wio);
1418 0           self->writing = 0;
1419             }
1420             }
1421             #endif
1422              
1423 0           buf_ensure(&self->rbuf, &self->rbuf_cap, self->rbuf_len + BUF_INIT_SIZE);
1424              
1425 0           ssize_t n = nats_io_read(self, self->rbuf + self->rbuf_len,
1426 0           self->rbuf_cap - self->rbuf_len);
1427              
1428 0 0         if (n <= 0) {
1429 0 0         if (n == 0) {
1430 0           nats_cleanup(self);
1431 0 0         } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
    0          
    0          
1432 0           nats_emit_error(self, strerror(errno));
1433 0           nats_cleanup(self);
1434             }
1435 0           return;
1436             }
1437              
1438 0           self->rbuf_len += n;
1439              
1440 0           size_t consumed = 0;
1441              
1442 0 0         while (consumed < self->rbuf_len) {
1443 0 0         if (self->fd < 0) break;
1444              
1445 0 0         if (self->parse_state == PARSE_OP) {
1446 0           char *start = self->rbuf + consumed;
1447 0           char *nl = (char *)memchr(start, '\n', self->rbuf_len - consumed);
1448 0 0         if (!nl) break;
1449              
1450 0           size_t line_len = nl - start;
1451 0           nats_process_line(self, start, line_len);
1452 0           consumed += line_len + 1;
1453              
1454 0 0         } else if (self->parse_state == PARSE_MSG_BODY) {
1455 0           size_t need = self->msg_total_len + 2;
1456 0           size_t avail = self->rbuf_len - consumed;
1457 0 0         if (avail < need) break;
1458              
1459 0           char *payload = self->rbuf + consumed;
1460              
1461             /* Dispatch inbox responses, skip normal sub delivery for inbox */
1462 0 0         if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
    0          
1463 0           const char *hdrs = NULL;
1464 0           size_t hdrs_len = 0;
1465 0           const char *body = payload;
1466 0           size_t body_len = self->msg_total_len;
1467              
1468 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
    0          
1469 0 0         self->msg_hdr_len <= self->msg_total_len) {
1470 0           hdrs = payload;
1471 0           hdrs_len = self->msg_hdr_len;
1472 0           body = payload + self->msg_hdr_len;
1473 0           body_len = self->msg_total_len - self->msg_hdr_len;
1474             }
1475              
1476 0           self->msgs_in++;
1477 0           self->bytes_in += self->msg_total_len;
1478 0           nats_check_inbox_response(self, self->rbuf + self->msg_subject_off, self->msg_subject_len,
1479             body, body_len, hdrs, hdrs_len);
1480             } else {
1481 0           nats_process_msg(self, payload, self->msg_total_len);
1482             }
1483              
1484 0           consumed += need;
1485 0           self->parse_state = PARSE_OP;
1486             }
1487             }
1488              
1489 0 0         if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
    0          
    0          
1490 0           self->rbuf_len -= consumed;
1491 0 0         if (self->rbuf_len > 0)
1492 0           memmove(self->rbuf, self->rbuf + consumed, self->rbuf_len);
1493             }
1494             }
1495              
1496 0           static void nats_try_write(nats_t *self)
1497             {
1498 0 0         if (self->fd < 0) return;
1499              
1500 0 0         while (self->wbuf_off < self->wbuf_len) {
1501 0           ssize_t n = nats_io_write(self, self->wbuf + self->wbuf_off,
1502 0           self->wbuf_len - self->wbuf_off);
1503 0 0         if (n <= 0) {
1504 0 0         if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
    0          
    0          
1505 0 0         if (!self->writing) {
1506 0           ev_io_start(self->loop, &self->wio);
1507 0           self->writing = 1;
1508             }
1509 0           return;
1510             }
1511 0 0         if (errno == EINTR)
1512 0           continue;
1513 0           nats_emit_error(self, strerror(errno));
1514 0           nats_cleanup(self);
1515 0           return;
1516             }
1517 0           self->wbuf_off += n;
1518             }
1519              
1520 0           self->wbuf_off = 0;
1521 0           self->wbuf_len = 0;
1522              
1523 0 0         if (self->writing) {
1524 0           ev_io_stop(self->loop, &self->wio);
1525 0           self->writing = 0;
1526             }
1527             }
1528              
1529 0           static void nats_on_write(struct ev_loop *loop, ev_io *w, int revents)
1530             {
1531 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, wio));
1532             (void)loop; (void)revents;
1533              
1534 0 0         if (self->connecting && !self->connected) {
    0          
1535 0           int err = 0;
1536 0           socklen_t errlen = sizeof(err);
1537 0           getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1538 0 0         if (err) {
1539 0           nats_emit_error(self, strerror(err));
1540 0           nats_cleanup(self);
1541 0           return;
1542             }
1543             #ifdef HAVE_OPENSSL
1544 0 0         if (self->tls && !self->ssl) {
    0          
1545 0 0         if (nats_ssl_setup(self) != 0) {
1546 0           nats_emit_error(self, "SSL setup failed");
1547 0           nats_cleanup(self);
1548 0           return;
1549             }
1550 0           self->ssl_handshaking = 1;
1551             }
1552 0 0         if (self->ssl_handshaking) {
1553 0           int hret = nats_ssl_handshake(self);
1554 0 0         if (hret == 0) return; /* want read/write */
1555 0 0         if (hret < 0) {
1556 0           nats_emit_error(self, "SSL handshake failed");
1557 0           nats_cleanup(self);
1558 0           return;
1559             }
1560             /* handshake done, fall through to wait for INFO */
1561             }
1562             #endif
1563 0 0         if (self->writing) {
1564 0           ev_io_stop(self->loop, &self->wio);
1565 0           self->writing = 0;
1566             }
1567 0 0         if (!self->reading) {
1568 0           ev_io_start(self->loop, &self->rio);
1569 0           self->reading = 1;
1570             }
1571 0           return;
1572             }
1573              
1574 0           nats_try_write(self);
1575             }
1576              
1577             /* ================================================================
1578             * Timer callbacks
1579             * ================================================================ */
1580              
1581 0           static void nats_on_connect_timeout(struct ev_loop *loop, ev_timer *w, int revents)
1582             {
1583 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, connect_timer));
1584             (void)loop; (void)revents;
1585 0           self->connect_timer_active = 0;
1586 0           nats_emit_error(self, "connect timeout");
1587 0           nats_cleanup(self);
1588 0           }
1589              
1590 0           static void nats_on_reconnect_timer(struct ev_loop *loop, ev_timer *w, int revents)
1591             {
1592 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, reconnect_timer));
1593             (void)loop; (void)revents;
1594 0           self->reconnect_timer_active = 0;
1595 0           self->reconnect_attempts++;
1596 0           self->intentional_disconnect = 0;
1597 0 0         if (self->server_pool_count > 0)
1598 0           nats_next_server(self);
1599 0           nats_do_connect(self);
1600 0           }
1601              
1602 0           static void nats_start_ping_timer(nats_t *self)
1603             {
1604 0 0         if (self->ping_interval_ms <= 0) return;
1605 0           double interval = self->ping_interval_ms / 1000.0;
1606 0           ev_timer_set(&self->ping_timer, interval, interval);
1607 0           ev_timer_start(self->loop, &self->ping_timer);
1608 0           self->ping_timer_active = 1;
1609             }
1610              
1611 0           static void nats_on_ping_timer(struct ev_loop *loop, ev_timer *w, int revents)
1612             {
1613 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, ping_timer));
1614             (void)loop; (void)revents;
1615              
1616 0           self->pings_outstanding++;
1617 0 0         if (self->max_pings_outstanding > 0 &&
1618 0 0         self->pings_outstanding > self->max_pings_outstanding) {
1619 0           nats_emit_error(self, "stale connection");
1620 0           nats_cleanup(self);
1621 0           return;
1622             }
1623              
1624 0           wbuf_append(self, "PING\r\n", 6);
1625 0           nats_try_write(self);
1626             }
1627              
1628             /* ================================================================
1629             * Write coalescing via ev_prepare
1630             * ================================================================ */
1631              
1632 0           static void nats_on_prepare(struct ev_loop *loop, ev_prepare *w, int revents)
1633             {
1634 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, prepare_watcher));
1635             (void)loop; (void)revents;
1636              
1637 0 0         if (self->wbuf_dirty && self->fd >= 0) {
    0          
1638 0           self->wbuf_dirty = 0;
1639 0           nats_try_write(self);
1640             }
1641 0           }
1642              
1643 0           static void nats_schedule_write(nats_t *self)
1644             {
1645 0 0         if (self->batch_mode) return; /* writes batched, flushed on batch end */
1646              
1647 0           self->wbuf_dirty = 1;
1648 0 0         if (!self->prepare_active) {
1649 0           ev_prepare_start(self->loop, &self->prepare_watcher);
1650 0           self->prepare_active = 1;
1651             }
1652              
1653             /* Slow consumer detection */
1654 0 0         if (self->slow_consumer_bytes > 0 &&
1655 0 0         (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes) {
1656 0 0         if (self->on_slow_consumer) {
1657 0           dSP;
1658 0           ENTER; SAVETMPS;
1659 0 0         PUSHMARK(SP);
1660 0 0         EXTEND(SP, 1);
1661 0           PUSHs(sv_2mortal(newSVuv(self->wbuf_len - self->wbuf_off)));
1662 0           PUTBACK;
1663 0           call_sv(self->on_slow_consumer, G_DISCARD);
1664 0 0         FREETMPS; LEAVE;
1665             }
1666             }
1667             }
1668              
1669             /* ================================================================
1670             * Request timeout
1671             * ================================================================ */
1672              
1673 0           static void nats_on_request_timeout(struct ev_loop *loop, ev_timer *w, int revents)
1674             {
1675 0           nats_req_t *req = (nats_req_t *)((char *)w - offsetof(nats_req_t, timer));
1676             (void)loop; (void)revents;
1677 0           req->timer_active = 0;
1678              
1679 0           ngx_queue_remove(&req->queue);
1680              
1681 0 0         if (req->cb) {
1682 0           dSP;
1683 0           ENTER; SAVETMPS;
1684 0 0         PUSHMARK(SP);
1685 0 0         EXTEND(SP, 2);
1686 0           PUSHs(&PL_sv_undef);
1687 0           PUSHs(sv_2mortal(newSVpvn("request timeout", 15)));
1688 0           PUTBACK;
1689 0           call_sv(req->cb, G_DISCARD);
1690 0 0         FREETMPS; LEAVE;
1691 0           SvREFCNT_dec(req->cb);
1692             }
1693 0           Safefree(req);
1694 0           }
1695              
1696             /* ================================================================
1697             * Write queue (for buffering during connect/reconnect)
1698             * ================================================================ */
1699              
1700 0           static void nats_drain_waiting(nats_t *self)
1701             {
1702 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1703 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1704 0           nats_pub_t *pub = ngx_queue_data(q, nats_pub_t, queue);
1705 0           ngx_queue_remove(q);
1706 0           self->waiting_count--;
1707              
1708 0           wbuf_append(self, pub->data, pub->len);
1709 0           Safefree(pub->data);
1710 0           Safefree(pub);
1711             }
1712              
1713 0 0         if (self->wbuf_len > self->wbuf_off)
1714 0           nats_try_write(self);
1715 0           }
1716              
1717 0           static void nats_queue_write(nats_t *self, const char *data, size_t len)
1718             {
1719 0 0         if (self->connected) {
1720 0           wbuf_append(self, data, len);
1721 0           nats_schedule_write(self);
1722 0 0         } else if (self->connecting ||
1723 0 0         (self->reconnect_enabled && self->reconnect_timer_active)) {
    0          
1724             nats_pub_t *pub;
1725 0           Newxz(pub, 1, nats_pub_t);
1726 0           Newx(pub->data, len, char);
1727 0           memcpy(pub->data, data, len);
1728 0           pub->len = len;
1729 0           ngx_queue_insert_tail(&self->wait_queue, &pub->queue);
1730 0           self->waiting_count++;
1731             } else {
1732 0           croak("not connected");
1733             }
1734 0           }
1735              
1736             /* ================================================================
1737             * TCP connection
1738             * ================================================================ */
1739              
1740 0           static void nats_schedule_reconnect(nats_t *self)
1741             {
1742 0 0         if (!self->intentional_disconnect && self->reconnect_enabled) {
    0          
1743 0 0         if (self->max_reconnect_attempts == 0 ||
1744 0 0         self->reconnect_attempts < self->max_reconnect_attempts) {
1745             /* Exponential backoff with jitter */
1746 0           int shift = self->reconnect_attempts > 5 ? 5 : self->reconnect_attempts;
1747 0           int delay = self->reconnect_delay_ms * (1 << shift);
1748 0 0         if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
    0          
1749 0           delay = self->max_reconnect_delay_ms;
1750 0           double base = delay / 1000.0;
1751 0           double jitter = base * (0.5 + (double)(rand() % 1000) / 2000.0);
1752 0           ev_timer_set(&self->reconnect_timer, jitter, 0.0);
1753 0           ev_timer_start(self->loop, &self->reconnect_timer);
1754 0           self->reconnect_timer_active = 1;
1755             } else {
1756 0           nats_emit_error(self, "max reconnect attempts reached");
1757             }
1758             }
1759 0           }
1760              
1761 0           static void nats_free_server_pool(nats_t *self)
1762             {
1763 0 0         while (!ngx_queue_empty(&self->server_pool)) {
1764 0           ngx_queue_t *q = ngx_queue_head(&self->server_pool);
1765 0           nats_server_t *srv = ngx_queue_data(q, nats_server_t, queue);
1766 0           ngx_queue_remove(q);
1767 0           Safefree(srv->host);
1768 0           Safefree(srv);
1769 0           self->server_pool_count--;
1770             }
1771 0           }
1772              
1773 0           static void nats_parse_connect_urls(nats_t *self, const char *json, size_t len)
1774             {
1775 0           const char *key = "\"connect_urls\":[";
1776 0           const char *p = strstr(json, key);
1777 0 0         if (!p || p >= json + len) return;
    0          
1778              
1779 0           p += strlen(key);
1780 0           const char *end = json + len;
1781              
1782 0           nats_free_server_pool(self);
1783              
1784 0 0         while (p < end && *p != ']') {
    0          
1785 0 0         while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
    0          
    0          
    0          
1786 0 0         if (p >= end || *p == ']') break;
    0          
1787              
1788 0           const char *start = p;
1789 0 0         while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
    0          
    0          
    0          
1790              
1791 0           size_t url_len = p - start;
1792 0 0         if (url_len > 0) {
1793             /* Parse host:port */
1794 0           const char *colon = NULL;
1795             const char *s;
1796 0 0         for (s = start + url_len - 1; s >= start; s--) {
1797 0 0         if (*s == ':') { colon = s; break; }
1798             }
1799              
1800             nats_server_t *srv;
1801 0           Newxz(srv, 1, nats_server_t);
1802 0 0         if (colon && colon > start) {
    0          
1803 0           size_t hlen = colon - start;
1804 0           Newx(srv->host, hlen + 1, char);
1805 0           memcpy(srv->host, start, hlen);
1806 0           srv->host[hlen] = '\0';
1807 0           srv->port = 0;
1808 0           const char *dp = colon + 1;
1809 0 0         while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
    0          
    0          
1810 0           srv->port = srv->port * 10 + (*dp - '0');
1811 0           dp++;
1812             }
1813             } else {
1814 0           Newx(srv->host, url_len + 1, char);
1815 0           memcpy(srv->host, start, url_len);
1816 0           srv->host[url_len] = '\0';
1817 0           srv->port = 4222;
1818             }
1819 0           ngx_queue_insert_tail(&self->server_pool, &srv->queue);
1820 0           self->server_pool_count++;
1821             }
1822              
1823 0 0         while (p < end && *p == '"') p++;
    0          
1824             }
1825             }
1826              
1827 0           static void nats_next_server(nats_t *self)
1828             {
1829 0 0         if (ngx_queue_empty(&self->server_pool)) return;
1830              
1831             /* Rotate: move head to tail, use new head */
1832 0           ngx_queue_t *q = ngx_queue_head(&self->server_pool);
1833 0           nats_server_t *srv = ngx_queue_data(q, nats_server_t, queue);
1834              
1835 0 0         if (self->host) Safefree(self->host);
1836 0           size_t hlen = strlen(srv->host);
1837 0           Newx(self->host, hlen + 1, char);
1838 0           memcpy(self->host, srv->host, hlen + 1);
1839 0           self->port = srv->port;
1840              
1841             /* Rotate this server to end of pool */
1842 0           ngx_queue_remove(q);
1843 0           ngx_queue_insert_tail(&self->server_pool, q);
1844             }
1845              
1846 0           static void nats_connect_tcp(nats_t *self)
1847             {
1848             struct addrinfo hints, *res, *rp;
1849             char port_str[8];
1850 0           int fd = -1;
1851              
1852 0           memset(&hints, 0, sizeof(hints));
1853 0           hints.ai_family = AF_UNSPEC;
1854 0           hints.ai_socktype = SOCK_STREAM;
1855              
1856 0           snprintf(port_str, sizeof(port_str), "%d", self->port);
1857              
1858 0           int rv = getaddrinfo(self->host, port_str, &hints, &res);
1859 0 0         if (rv != 0) {
1860 0           nats_emit_error(self, gai_strerror(rv));
1861 0           nats_schedule_reconnect(self);
1862 0           return;
1863             }
1864              
1865 0 0         for (rp = res; rp != NULL; rp = rp->ai_next) {
1866 0           fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
1867 0 0         if (fd < 0) continue;
1868              
1869 0           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
1870              
1871 0           int flag = 1;
1872 0           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
1873              
1874 0 0         if (self->keepalive > 0) {
1875 0           setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
1876             #ifdef TCP_KEEPIDLE
1877 0           setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
1878             #endif
1879             }
1880              
1881 0           rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
1882 0 0         if (rv == 0 || errno == EINPROGRESS) break;
    0          
1883              
1884 0           close(fd);
1885 0           fd = -1;
1886             }
1887              
1888 0           freeaddrinfo(res);
1889              
1890 0 0         if (fd < 0) {
1891 0           nats_emit_error(self, "connection failed");
1892 0           nats_schedule_reconnect(self);
1893 0           return;
1894             }
1895              
1896 0           self->fd = fd;
1897 0           self->connecting = 1;
1898 0           self->connected = 0;
1899              
1900 0           ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
1901 0           ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);
1902              
1903 0 0         if (self->priority) {
1904 0           ev_set_priority(&self->rio, self->priority);
1905 0           ev_set_priority(&self->wio, self->priority);
1906             }
1907              
1908 0           ev_io_start(self->loop, &self->rio);
1909 0           self->reading = 1;
1910              
1911 0 0         if (rv != 0) {
1912 0           ev_io_start(self->loop, &self->wio);
1913 0           self->writing = 1;
1914             }
1915              
1916 0 0         if (self->connect_timeout_ms > 0) {
1917 0           ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
1918 0           ev_timer_start(self->loop, &self->connect_timer);
1919 0           self->connect_timer_active = 1;
1920             }
1921             }
1922              
1923 0           static void nats_connect_unix(nats_t *self)
1924             {
1925             struct sockaddr_un addr;
1926             int fd;
1927              
1928 0 0         if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
    0          
1929 0           nats_emit_error(self, "invalid unix socket path");
1930 0           return;
1931             }
1932              
1933 0           fd = socket(AF_UNIX, SOCK_STREAM, 0);
1934 0 0         if (fd < 0) {
1935 0           nats_schedule_reconnect(self);
1936 0           nats_emit_error(self, strerror(errno));
1937 0           return;
1938             }
1939              
1940 0           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
1941              
1942 0           memset(&addr, 0, sizeof(addr));
1943 0           addr.sun_family = AF_UNIX;
1944 0           strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);
1945              
1946 0           int rv = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
1947 0 0         if (rv != 0 && errno != EINPROGRESS) {
    0          
1948 0           close(fd);
1949 0           nats_schedule_reconnect(self);
1950 0           nats_emit_error(self, strerror(errno));
1951 0           return;
1952             }
1953              
1954 0           self->fd = fd;
1955 0           self->connecting = 1;
1956 0           self->connected = 0;
1957              
1958 0           ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
1959 0           ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);
1960              
1961 0 0         if (self->priority) {
1962 0           ev_set_priority(&self->rio, self->priority);
1963 0           ev_set_priority(&self->wio, self->priority);
1964             }
1965              
1966 0           ev_io_start(self->loop, &self->rio);
1967 0           self->reading = 1;
1968              
1969 0 0         if (rv != 0) {
1970 0           ev_io_start(self->loop, &self->wio);
1971 0           self->writing = 1;
1972             }
1973              
1974 0 0         if (self->connect_timeout_ms > 0) {
1975 0           ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
1976 0           ev_timer_start(self->loop, &self->connect_timer);
1977 0           self->connect_timer_active = 1;
1978             }
1979             }
1980              
1981             /* Helper: connect via path or host */
1982 0           static void nats_do_connect(nats_t *self)
1983             {
1984 0 0         if (self->path)
1985 0           nats_connect_unix(self);
1986             else
1987 0           nats_connect_tcp(self);
1988 0           }
1989              
1990             /* ================================================================
1991             * XS interface
1992             * ================================================================ */
1993              
1994             MODULE = EV::Nats PACKAGE = EV::Nats
1995              
1996             PROTOTYPES: DISABLE
1997              
1998             BOOT:
1999             {
2000 14 50         I_EV_API("EV::Nats");
    50          
    50          
2001             }
2002              
2003             EV::Nats
2004             new(class, ...)
2005             char *class
2006             PREINIT:
2007             nats_t *self;
2008             int i;
2009             CODE:
2010 0           Newxz(self, 1, nats_t);
2011 0           self->magic = NATS_MAGIC_ALIVE;
2012 0           self->fd = -1;
2013 0           self->loop = EV_DEFAULT;
2014 0           self->max_payload = DEFAULT_MAX_PAYLOAD;
2015 0           self->port = 4222;
2016 0           self->echo = 1;
2017 0           self->ping_interval_ms = 120000;
2018 0           self->max_pings_outstanding = 2;
2019 0           self->reconnect_delay_ms = 2000;
2020 0           self->max_reconnect_delay_ms = 30000;
2021 0           self->max_reconnect_attempts = 60;
2022              
2023 0           ngx_queue_init(&self->subs);
2024 0           ngx_queue_init(&self->wait_queue);
2025 0           ngx_queue_init(&self->req_queue);
2026              
2027 0           self->next_sid = 1;
2028              
2029 0           ev_timer_init(&self->connect_timer, nats_on_connect_timeout, 0., 0.);
2030 0           ev_timer_init(&self->reconnect_timer, nats_on_reconnect_timer, 0., 0.);
2031 0           ev_timer_init(&self->ping_timer, nats_on_ping_timer, 0., 0.);
2032 0           ev_prepare_init(&self->prepare_watcher, nats_on_prepare);
2033              
2034 0           ngx_queue_init(&self->server_pool);
2035 0           ngx_queue_init(&self->pong_cbs);
2036 0           self->sub_map = newHV();
2037              
2038 0           nats_setup_inbox(self);
2039              
2040 0 0         if (items > 1 && (items - 1) % 2 == 0) {
    0          
2041 0 0         for (i = 1; i < items; i += 2) {
2042 0           const char *key = SvPV_nolen(ST(i));
2043 0           SV *val = ST(i + 1);
2044              
2045 0 0         if (strcmp(key, "host") == 0) {
2046             STRLEN l;
2047 0           const char *s = SvPV(val, l);
2048 0           Newx(self->host, l + 1, char);
2049 0           Copy(s, self->host, l + 1, char);
2050             }
2051 0 0         else if (strcmp(key, "port") == 0)
2052 0           self->port = SvIV(val);
2053 0 0         else if (strcmp(key, "path") == 0) {
2054             STRLEN l;
2055 0           const char *s = SvPV(val, l);
2056 0           Newx(self->path, l + 1, char);
2057 0           Copy(s, self->path, l + 1, char);
2058             }
2059 0 0         else if (strcmp(key, "on_error") == 0)
2060 0           self->on_error = newSVsv(val);
2061 0 0         else if (strcmp(key, "on_connect") == 0)
2062 0           self->on_connect = newSVsv(val);
2063 0 0         else if (strcmp(key, "on_disconnect") == 0)
2064 0           self->on_disconnect = newSVsv(val);
2065 0 0         else if (strcmp(key, "user") == 0) {
2066 0           STRLEN l; const char *s = SvPV(val, l);
2067 0           Newx(self->user, l + 1, char);
2068 0           Copy(s, self->user, l + 1, char);
2069             }
2070 0 0         else if (strcmp(key, "pass") == 0) {
2071 0           STRLEN l; const char *s = SvPV(val, l);
2072 0           Newx(self->pass, l + 1, char);
2073 0           Copy(s, self->pass, l + 1, char);
2074             }
2075 0 0         else if (strcmp(key, "token") == 0) {
2076 0           STRLEN l; const char *s = SvPV(val, l);
2077 0           Newx(self->token, l + 1, char);
2078 0           Copy(s, self->token, l + 1, char);
2079             }
2080 0 0         else if (strcmp(key, "name") == 0) {
2081 0           STRLEN l; const char *s = SvPV(val, l);
2082 0           Newx(self->name, l + 1, char);
2083 0           Copy(s, self->name, l + 1, char);
2084             }
2085 0 0         else if (strcmp(key, "verbose") == 0)
2086 0           self->verbose = SvTRUE(val) ? 1 : 0;
2087 0 0         else if (strcmp(key, "pedantic") == 0)
2088 0           self->pedantic = SvTRUE(val) ? 1 : 0;
2089 0 0         else if (strcmp(key, "echo") == 0)
2090 0           self->echo = SvTRUE(val) ? 1 : 0;
2091 0 0         else if (strcmp(key, "no_responders") == 0)
2092 0           self->no_responders = SvTRUE(val) ? 1 : 0;
2093 0 0         else if (strcmp(key, "reconnect") == 0)
2094 0           self->reconnect_enabled = SvTRUE(val) ? 1 : 0;
2095 0 0         else if (strcmp(key, "reconnect_delay") == 0)
2096 0           self->reconnect_delay_ms = SvIV(val);
2097 0 0         else if (strcmp(key, "max_reconnect_attempts") == 0)
2098 0           self->max_reconnect_attempts = SvIV(val);
2099 0 0         else if (strcmp(key, "max_reconnect_delay") == 0)
2100 0           self->max_reconnect_delay_ms = SvIV(val);
2101 0 0         else if (strcmp(key, "connect_timeout") == 0)
2102 0           self->connect_timeout_ms = SvIV(val);
2103 0 0         else if (strcmp(key, "ping_interval") == 0)
2104 0           self->ping_interval_ms = SvIV(val);
2105 0 0         else if (strcmp(key, "max_pings_outstanding") == 0)
2106 0           self->max_pings_outstanding = SvIV(val);
2107 0 0         else if (strcmp(key, "priority") == 0)
2108 0           self->priority = SvIV(val);
2109 0 0         else if (strcmp(key, "keepalive") == 0)
2110 0           self->keepalive = SvIV(val);
2111 0 0         #ifdef HAVE_OPENSSL
2112 0           else if (strcmp(key, "tls") == 0)
2113 0 0         self->tls = SvTRUE(val) ? 1 : 0;
2114 0           else if (strcmp(key, "tls_ca_file") == 0) {
2115 0           STRLEN l; const char *s = SvPV(val, l);
2116 0           Newx(self->tls_ca_file, l + 1, char);
2117             Copy(s, self->tls_ca_file, l + 1, char);
2118 0 0         }
2119 0           else if (strcmp(key, "tls_skip_verify") == 0)
2120 0 0         self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
2121 0           else if (strcmp(key, "nkey_seed") == 0) {
2122 0           STRLEN l; const char *s = SvPV(val, l);
2123 0           Newx(self->nkey_seed, l + 1, char);
2124             Copy(s, self->nkey_seed, l + 1, char);
2125 0 0         }
2126 0           #endif
2127 0           else if (strcmp(key, "jwt") == 0) {
2128 0           STRLEN l; const char *s = SvPV(val, l);
2129             Newx(self->jwt, l + 1, char);
2130 0 0         Copy(s, self->jwt, l + 1, char);
2131 0           }
2132 0 0         else if (strcmp(key, "slow_consumer_bytes") == 0)
2133 0           self->slow_consumer_bytes = (size_t)SvUV(val);
2134 0 0         else if (strcmp(key, "on_slow_consumer") == 0)
2135 0           self->on_slow_consumer = newSVsv(val);
2136 0 0         else if (strcmp(key, "on_lame_duck") == 0)
2137 0           self->on_ldm = newSVsv(val);
2138             else if (strcmp(key, "loop") == 0)
2139 0           self->loop = (struct ev_loop *)SvIVx(SvRV(val));
2140             else
2141             warn("EV::Nats::new: unknown option '%s'", key);
2142             }
2143 0 0         }
    0          
2144 0            
2145             if (self->host || self->path)
2146 0           nats_do_connect(self);
2147              
2148             RETVAL = self;
2149             OUTPUT:
2150             RETVAL
2151              
2152             void
2153             connect(self, host, port = 4222)
2154             EV::Nats self
2155             char *host
2156             int port
2157             CODE:
2158 0 0         if (self->connected || self->connecting)
    0          
2159 0           croak("already connected");
2160              
2161 0 0         if (self->reconnect_timer_active) {
2162 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2163 0           self->reconnect_timer_active = 0;
2164             }
2165              
2166 0 0         if (self->host) { Safefree(self->host); self->host = NULL; }
2167 0 0         if (self->path) { Safefree(self->path); self->path = NULL; }
2168 0           STRLEN l = strlen(host);
2169 0           Newx(self->host, l + 1, char);
2170 0           Copy(host, self->host, l + 1, char);
2171 0           self->port = port;
2172 0           self->intentional_disconnect = 0;
2173              
2174 0           nats_connect_tcp(self);
2175              
2176             void
2177             connect_unix(self, path)
2178             EV::Nats self
2179             char *path
2180             CODE:
2181 0 0         if (self->connected || self->connecting)
    0          
2182 0           croak("already connected");
2183              
2184 0 0         if (self->reconnect_timer_active) {
2185 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2186 0           self->reconnect_timer_active = 0;
2187             }
2188              
2189 0 0         if (self->host) { Safefree(self->host); self->host = NULL; }
2190 0 0         if (self->path) Safefree(self->path);
2191 0           STRLEN l = strlen(path);
2192 0           Newx(self->path, l + 1, char);
2193 0           Copy(path, self->path, l + 1, char);
2194 0           self->intentional_disconnect = 0;
2195              
2196 0           nats_connect_unix(self);
2197              
2198             void
2199             disconnect(self)
2200             EV::Nats self
2201             CODE:
2202 0           self->intentional_disconnect = 1;
2203 0 0         if (self->reconnect_timer_active) {
2204 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2205 0           self->reconnect_timer_active = 0;
2206             }
2207 0           nats_skip_waiting(self);
2208 0           nats_cleanup(self);
2209              
2210             int
2211             is_connected(self)
2212             EV::Nats self
2213             CODE:
2214 0 0         RETVAL = self->connected;
2215             OUTPUT:
2216             RETVAL
2217              
2218             void
2219             publish(self, subject, payload = &PL_sv_undef, reply = NULL)
2220             EV::Nats self
2221             SV *subject
2222             SV *payload
2223             const char *reply
2224             PREINIT:
2225             char hdr[MAX_CONTROL_LINE];
2226             STRLEN subj_len, pay_len;
2227             const char *subj_pv, *pay_pv;
2228             CODE:
2229 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2230              
2231 0           subj_pv = SvPV(subject, subj_len);
2232              
2233 0 0         if (SvOK(payload)) {
2234 0           pay_pv = SvPV(payload, pay_len);
2235             } else {
2236 0           pay_pv = "";
2237 0           pay_len = 0;
2238             }
2239              
2240 0 0         if ((int)pay_len > self->max_payload)
2241 0           croak("payload exceeds max_payload (%d)", self->max_payload);
2242              
2243             int hdr_len;
2244 0 0         if (reply && *reply) {
    0          
2245 0           hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %s %lu\r\n",
2246             (int)subj_len, subj_pv, reply, (unsigned long)pay_len);
2247             } else {
2248 0           hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %lu\r\n",
2249             (int)subj_len, subj_pv, (unsigned long)pay_len);
2250             }
2251              
2252 0           self->msgs_out++;
2253 0           self->bytes_out += pay_len;
2254              
2255 0 0         if (self->connected) {
2256 0           wbuf_append(self, hdr, hdr_len);
2257 0 0         if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2258 0           wbuf_append(self, "\r\n", 2);
2259 0           nats_schedule_write(self);
2260             } else {
2261 0           size_t total = hdr_len + pay_len + 2;
2262             char *buf;
2263 0           Newx(buf, total, char);
2264 0           memcpy(buf, hdr, hdr_len);
2265 0 0         if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2266 0           buf[hdr_len + pay_len] = '\r';
2267 0           buf[hdr_len + pay_len + 1] = '\n';
2268 0           nats_queue_write(self, buf, total);
2269 0           Safefree(buf);
2270             }
2271              
2272             void
2273             hpublish(self, subject, headers, payload = &PL_sv_undef, reply = NULL)
2274             EV::Nats self
2275             SV *subject
2276             SV *headers
2277             SV *payload
2278             const char *reply
2279             PREINIT:
2280             char hdr[MAX_CONTROL_LINE];
2281             STRLEN subj_len, hdr_data_len, pay_len;
2282             const char *subj_pv, *hdr_data_pv, *pay_pv;
2283             CODE:
2284 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2285              
2286 0           subj_pv = SvPV(subject, subj_len);
2287 0           hdr_data_pv = SvPV(headers, hdr_data_len);
2288              
2289 0 0         if (SvOK(payload)) {
2290 0           pay_pv = SvPV(payload, pay_len);
2291             } else {
2292 0           pay_pv = "";
2293 0           pay_len = 0;
2294             }
2295              
2296 0           size_t total_size = hdr_data_len + pay_len;
2297 0 0         if ((int)total_size > self->max_payload)
2298 0           croak("message exceeds max_payload (%d)", self->max_payload);
2299              
2300 0           self->msgs_out++;
2301 0           self->bytes_out += total_size;
2302              
2303             int cmd_len;
2304 0 0         if (reply && *reply) {
    0          
2305 0           cmd_len = snprintf(hdr, sizeof(hdr), "HPUB %.*s %s %lu %lu\r\n",
2306             (int)subj_len, subj_pv, reply,
2307             (unsigned long)hdr_data_len, (unsigned long)total_size);
2308             } else {
2309 0           cmd_len = snprintf(hdr, sizeof(hdr), "HPUB %.*s %lu %lu\r\n",
2310             (int)subj_len, subj_pv,
2311             (unsigned long)hdr_data_len, (unsigned long)total_size);
2312             }
2313              
2314 0 0         if (self->connected) {
2315 0           wbuf_append(self, hdr, cmd_len);
2316 0           wbuf_append(self, hdr_data_pv, hdr_data_len);
2317 0 0         if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2318 0           wbuf_append(self, "\r\n", 2);
2319 0           nats_schedule_write(self);
2320             } else {
2321 0           size_t total = cmd_len + total_size + 2;
2322             char *buf;
2323 0           Newx(buf, total, char);
2324 0           memcpy(buf, hdr, cmd_len);
2325 0           memcpy(buf + cmd_len, hdr_data_pv, hdr_data_len);
2326 0 0         if (pay_len > 0) memcpy(buf + cmd_len + hdr_data_len, pay_pv, pay_len);
2327 0           buf[cmd_len + total_size] = '\r';
2328 0           buf[cmd_len + total_size + 1] = '\n';
2329 0           nats_queue_write(self, buf, total);
2330 0           Safefree(buf);
2331             }
2332              
2333             UV
2334             subscribe(self, subject, cb, queue_group = NULL)
2335             EV::Nats self
2336             SV *subject
2337             SV *cb
2338             const char *queue_group
2339             PREINIT:
2340             nats_sub_t *sub;
2341             char buf[MAX_CONTROL_LINE];
2342             STRLEN subj_len;
2343             const char *subj_pv;
2344             CODE:
2345 0           subj_pv = SvPV(subject, subj_len);
2346              
2347 0           Newxz(sub, 1, nats_sub_t);
2348 0           sub->sid = self->next_sid++;
2349 0           sub->subject = newSVpvn(subj_pv, subj_len);
2350 0           sub->cb = newSVsv(cb);
2351 0 0         sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
    0          
2352 0           sub->max_msgs = 0;
2353 0           sub->received = 0;
2354 0           nats_register_sub(self, sub);
2355              
2356             int n;
2357 0 0         if (queue_group && *queue_group) {
    0          
2358 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %s %" UVuf "\r\n",
2359 0           (int)subj_len, subj_pv, queue_group, (UV)sub->sid);
2360             } else {
2361 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %" UVuf "\r\n",
2362 0           (int)subj_len, subj_pv, (UV)sub->sid);
2363             }
2364              
2365 0 0         if (self->connected || self->connecting)
    0          
2366 0           nats_queue_write(self, buf, n);
2367              
2368 0 0         RETVAL = (UV)sub->sid;
2369             OUTPUT:
2370             RETVAL
2371              
2372             void
2373             unsubscribe(self, sid, max_msgs = 0)
2374             EV::Nats self
2375             UV sid
2376             int max_msgs
2377             PREINIT:
2378             char buf[64];
2379             CODE:
2380 0 0         if (max_msgs > 0) {
2381 0           nats_sub_t *sub = nats_find_sub(self, (uint64_t)sid);
2382 0 0         if (sub)
2383 0           sub->max_msgs = max_msgs;
2384              
2385 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf " %d\r\n", sid, max_msgs);
2386 0 0         if (self->connected)
2387 0           nats_queue_write(self, buf, n);
2388             } else {
2389 0           nats_sub_t *sub = nats_find_sub(self, (uint64_t)sid);
2390 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf "\r\n", sid);
2391 0 0         if (self->connected)
2392 0           nats_queue_write(self, buf, n);
2393 0 0         if (sub)
2394 0           nats_remove_sub(self, sub);
2395             }
2396              
2397             void
2398             request(self, subject, payload, cb, timeout_ms = 5000)
2399             EV::Nats self
2400             SV *subject
2401             SV *payload
2402             SV *cb
2403             int timeout_ms
2404             PREINIT:
2405             char reply_subj[80];
2406             char hdr[MAX_CONTROL_LINE];
2407             STRLEN subj_len, pay_len;
2408             const char *subj_pv, *pay_pv;
2409             CODE:
2410 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2411              
2412 0           subj_pv = SvPV(subject, subj_len);
2413 0 0         if (SvOK(payload)) {
2414 0           pay_pv = SvPV(payload, pay_len);
2415             } else {
2416 0           pay_pv = "";
2417 0           pay_len = 0;
2418             }
2419              
2420 0 0         if (!self->inbox_sub_sid) {
2421             char inbox_wild[48];
2422 0           snprintf(inbox_wild, sizeof(inbox_wild), "%s*", self->inbox_prefix);
2423              
2424             nats_sub_t *sub;
2425 0           Newxz(sub, 1, nats_sub_t);
2426 0           sub->sid = self->next_sid++;
2427 0           sub->subject = newSVpv(inbox_wild, 0);
2428 0           sub->cb = NULL;
2429 0           sub->queue_group = NULL;
2430 0           nats_register_sub(self, sub);
2431 0           self->inbox_sub_sid = sub->sid;
2432              
2433             char sbuf[MAX_CONTROL_LINE];
2434 0           int sn = snprintf(sbuf, sizeof(sbuf), "SUB %s %" UVuf "\r\n",
2435 0           inbox_wild, (UV)sub->sid);
2436 0           nats_queue_write(self, sbuf, sn);
2437             }
2438              
2439 0           uint64_t req_id = self->next_req_id++;
2440 0           snprintf(reply_subj, sizeof(reply_subj), "%s%" UVuf, self->inbox_prefix, (UV)req_id);
2441              
2442             nats_req_t *req;
2443 0           Newxz(req, 1, nats_req_t);
2444 0           req->req_id = req_id;
2445 0           req->cb = newSVsv(cb);
2446 0           req->self = self;
2447 0           ngx_queue_insert_tail(&self->req_queue, &req->queue);
2448              
2449 0 0         if (timeout_ms > 0) {
2450 0           ev_timer_init(&req->timer, nats_on_request_timeout, timeout_ms / 1000.0, 0.0);
2451 0           ev_timer_start(self->loop, &req->timer);
2452 0           req->timer_active = 1;
2453             }
2454              
2455 0           int hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %s %lu\r\n",
2456             (int)subj_len, subj_pv, reply_subj, (unsigned long)pay_len);
2457              
2458 0           size_t total = hdr_len + pay_len + 2;
2459             char *buf;
2460 0           Newx(buf, total, char);
2461 0           memcpy(buf, hdr, hdr_len);
2462 0 0         if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2463 0           buf[hdr_len + pay_len] = '\r';
2464 0           buf[hdr_len + pay_len + 1] = '\n';
2465              
2466 0           nats_queue_write(self, buf, total);
2467 0           Safefree(buf);
2468              
2469             void
2470             ping(self)
2471             EV::Nats self
2472             CODE:
2473 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2474 0           wbuf_append(self, "PING\r\n", 6);
2475 0           nats_try_write(self);
2476              
2477             void
2478             flush(self, cb = NULL)
2479             EV::Nats self
2480             SV *cb
2481             CODE:
2482 0 0         if (!self->connected)
2483 0           croak("not connected");
2484 0           wbuf_append(self, "PING\r\n", 6);
2485 0           nats_try_write(self);
2486 0 0         if (cb && SvOK(cb)) {
    0          
2487             nats_pong_cb_t *pcb;
2488 0           Newxz(pcb, 1, nats_pong_cb_t);
2489 0           pcb->cb = newSVsv(cb);
2490 0           ngx_queue_insert_tail(&self->pong_cbs, &pcb->queue);
2491             }
2492              
2493             SV *
2494             server_info(self)
2495             EV::Nats self
2496             CODE:
2497 0 0         if (self->server_info_json)
2498 0           RETVAL = newSVsv(self->server_info_json);
2499             else
2500 0           RETVAL = &PL_sv_undef;
2501             OUTPUT:
2502             RETVAL
2503              
2504             int
2505             max_payload(self, ...)
2506             EV::Nats self
2507             CODE:
2508 0 0         if (items > 1)
2509 0           self->max_payload = SvIV(ST(1));
2510 0 0         RETVAL = self->max_payload;
2511             OUTPUT:
2512             RETVAL
2513              
2514             int
2515             waiting_count(self)
2516             EV::Nats self
2517             CODE:
2518 0 0         RETVAL = self->waiting_count;
2519             OUTPUT:
2520             RETVAL
2521              
2522             void
2523             skip_waiting(self)
2524             EV::Nats self
2525             CODE:
2526 0           nats_skip_waiting(self);
2527              
2528             void
2529             reconnect(self, enable, delay_ms = 2000, max_attempts = 60)
2530             EV::Nats self
2531             int enable
2532             int delay_ms
2533             int max_attempts
2534             CODE:
2535 0           self->reconnect_enabled = enable;
2536 0           self->reconnect_delay_ms = delay_ms;
2537 0           self->max_reconnect_attempts = max_attempts;
2538              
2539             int
2540             reconnect_enabled(self)
2541             EV::Nats self
2542             CODE:
2543 0 0         RETVAL = self->reconnect_enabled;
2544             OUTPUT:
2545             RETVAL
2546              
2547             int
2548             connect_timeout(self, ...)
2549             EV::Nats self
2550             CODE:
2551 0 0         if (items > 1)
2552 0           self->connect_timeout_ms = SvIV(ST(1));
2553 0 0         RETVAL = self->connect_timeout_ms;
2554             OUTPUT:
2555             RETVAL
2556              
2557             int
2558             ping_interval(self, ...)
2559             EV::Nats self
2560             CODE:
2561 0 0         if (items > 1)
2562 0           self->ping_interval_ms = SvIV(ST(1));
2563 0 0         RETVAL = self->ping_interval_ms;
2564             OUTPUT:
2565             RETVAL
2566              
2567             int
2568             max_pings_outstanding(self, ...)
2569             EV::Nats self
2570             CODE:
2571 0 0         if (items > 1)
2572 0           self->max_pings_outstanding = SvIV(ST(1));
2573 0 0         RETVAL = self->max_pings_outstanding;
2574             OUTPUT:
2575             RETVAL
2576              
2577             int
2578             priority(self, ...)
2579             EV::Nats self
2580             CODE:
2581 0 0         if (items > 1)
2582 0           self->priority = SvIV(ST(1));
2583 0 0         RETVAL = self->priority;
2584             OUTPUT:
2585             RETVAL
2586              
2587             int
2588             keepalive(self, ...)
2589             EV::Nats self
2590             CODE:
2591 0 0         if (items > 1)
2592 0           self->keepalive = SvIV(ST(1));
2593 0 0         RETVAL = self->keepalive;
2594             OUTPUT:
2595             RETVAL
2596              
2597             void
2598             on_error(self, ...)
2599             EV::Nats self
2600             PPCODE:
2601 0 0         if (items > 1) {
2602 0 0         CLEAR_HANDLER(self->on_error);
2603 0 0         if (SvOK(ST(1)))
2604 0           self->on_error = newSVsv(ST(1));
2605             }
2606 0 0         if (GIMME_V != G_VOID && self->on_error)
    0          
2607 0           PUSHs(sv_2mortal(newSVsv(self->on_error)));
2608              
2609             void
2610             on_connect(self, ...)
2611             EV::Nats self
2612             PPCODE:
2613 0 0         if (items > 1) {
2614 0 0         CLEAR_HANDLER(self->on_connect);
2615 0 0         if (SvOK(ST(1)))
2616 0           self->on_connect = newSVsv(ST(1));
2617             }
2618 0 0         if (GIMME_V != G_VOID && self->on_connect)
    0          
2619 0           PUSHs(sv_2mortal(newSVsv(self->on_connect)));
2620              
2621             void
2622             on_disconnect(self, ...)
2623             EV::Nats self
2624             PPCODE:
2625 0 0         if (items > 1) {
2626 0 0         CLEAR_HANDLER(self->on_disconnect);
2627 0 0         if (SvOK(ST(1)))
2628 0           self->on_disconnect = newSVsv(ST(1));
2629             }
2630 0 0         if (GIMME_V != G_VOID && self->on_disconnect)
    0          
2631 0           PUSHs(sv_2mortal(newSVsv(self->on_disconnect)));
2632              
2633             #ifdef HAVE_OPENSSL
2634              
2635             void
2636             tls(self, enable, ca_file = NULL, skip_verify = 0)
2637             EV::Nats self
2638             int enable
2639             const char *ca_file
2640             int skip_verify
2641             CODE:
2642 0           self->tls = enable;
2643 0           self->tls_skip_verify = skip_verify;
2644 0 0         if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
2645 0 0         if (ca_file && *ca_file) {
    0          
2646 0           STRLEN l = strlen(ca_file);
2647 0           Newx(self->tls_ca_file, l + 1, char);
2648 0           Copy(ca_file, self->tls_ca_file, l + 1, char);
2649             }
2650              
2651             #endif
2652              
2653             void
2654             stats(self)
2655             EV::Nats self
2656             PPCODE:
2657 0 0         EXTEND(SP, 8);
2658 0           PUSHs(sv_2mortal(newSVpvs("msgs_in")));
2659 0           PUSHs(sv_2mortal(newSVuv(self->msgs_in)));
2660 0           PUSHs(sv_2mortal(newSVpvs("msgs_out")));
2661 0           PUSHs(sv_2mortal(newSVuv(self->msgs_out)));
2662 0           PUSHs(sv_2mortal(newSVpvs("bytes_in")));
2663 0           PUSHs(sv_2mortal(newSVuv(self->bytes_in)));
2664 0           PUSHs(sv_2mortal(newSVpvs("bytes_out")));
2665 0           PUSHs(sv_2mortal(newSVuv(self->bytes_out)));
2666              
2667             void
2668             reset_stats(self)
2669             EV::Nats self
2670             CODE:
2671 0           self->msgs_in = 0;
2672 0           self->msgs_out = 0;
2673 0           self->bytes_in = 0;
2674 0           self->bytes_out = 0;
2675              
2676             SV *
2677             new_inbox(self)
2678             EV::Nats self
2679             CODE:
2680             {
2681             char inbox[80];
2682 0           int len = snprintf(inbox, sizeof(inbox), "%s%" UVuf, self->inbox_prefix, (UV)self->next_req_id++);
2683 0           RETVAL = newSVpvn(inbox, len);
2684             }
2685             OUTPUT:
2686             RETVAL
2687              
2688             int
2689             subscription_count(self)
2690             EV::Nats self
2691             CODE:
2692 0 0         RETVAL = HvKEYS(self->sub_map);
    0          
2693             OUTPUT:
2694             RETVAL
2695              
2696             void
2697             batch(self, code)
2698             EV::Nats self
2699             SV *code
2700             CODE:
2701 0           self->batch_mode = 1;
2702             {
2703 0           dSP;
2704 0           ENTER; SAVETMPS;
2705 0 0         PUSHMARK(SP);
2706 0           PUTBACK;
2707 0           call_sv(code, G_DISCARD);
2708 0 0         FREETMPS; LEAVE;
2709             }
2710 0           self->batch_mode = 0;
2711 0 0         if (self->wbuf_len > self->wbuf_off) {
2712 0           self->wbuf_dirty = 1;
2713 0 0         if (!self->prepare_active) {
2714 0           ev_prepare_start(self->loop, &self->prepare_watcher);
2715 0           self->prepare_active = 1;
2716             }
2717             /* Check slow consumer after batch */
2718 0 0         if (self->slow_consumer_bytes > 0 &&
2719 0 0         (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes &&
2720 0 0         self->on_slow_consumer) {
2721 0           dSP;
2722 0           ENTER; SAVETMPS;
2723 0 0         PUSHMARK(SP);
2724 0 0         EXTEND(SP, 1);
2725 0           PUSHs(sv_2mortal(newSVuv(self->wbuf_len - self->wbuf_off)));
2726 0           PUTBACK;
2727 0           call_sv(self->on_slow_consumer, G_DISCARD);
2728 0 0         FREETMPS; LEAVE;
2729             }
2730             }
2731              
2732             void
2733             slow_consumer(self, bytes_threshold, cb = NULL)
2734             EV::Nats self
2735             UV bytes_threshold
2736             SV *cb
2737             CODE:
2738 0           self->slow_consumer_bytes = (size_t)bytes_threshold;
2739 0 0         CLEAR_HANDLER(self->on_slow_consumer);
2740 0 0         if (cb && SvOK(cb))
    0          
2741 0           self->on_slow_consumer = newSVsv(cb);
2742              
2743             void
2744             on_lame_duck(self, ...)
2745             EV::Nats self
2746             PPCODE:
2747 0 0         if (items > 1) {
2748 0 0         CLEAR_HANDLER(self->on_ldm);
2749 0 0         if (SvOK(ST(1)))
2750 0           self->on_ldm = newSVsv(ST(1));
2751             }
2752 0 0         if (GIMME_V != G_VOID && self->on_ldm)
    0          
2753 0           PUSHs(sv_2mortal(newSVsv(self->on_ldm)));
2754              
2755             #ifdef HAVE_OPENSSL
2756              
2757             void
2758             nkey_seed(self, seed)
2759             EV::Nats self
2760             const char *seed
2761             CODE:
2762 0 0         if (self->nkey_seed) Safefree(self->nkey_seed);
2763 0           STRLEN l = strlen(seed);
2764 0           Newx(self->nkey_seed, l + 1, char);
2765 0           Copy(seed, self->nkey_seed, l + 1, char);
2766              
2767             #endif
2768              
2769             void
2770             jwt(self, jwt_token)
2771             EV::Nats self
2772             const char *jwt_token
2773             CODE:
2774 0 0         if (self->jwt) Safefree(self->jwt);
2775 0           STRLEN l = strlen(jwt_token);
2776 0           Newx(self->jwt, l + 1, char);
2777 0           Copy(jwt_token, self->jwt, l + 1, char);
2778              
2779             void
2780             drain(self, cb = NULL)
2781             EV::Nats self
2782             SV *cb
2783             CODE:
2784 0 0         if (!self->connected || self->draining)
    0          
2785 0           return;
2786 0           self->draining = 1;
2787 0 0         CLEAR_HANDLER(self->drain_cb);
2788 0 0         if (cb && SvOK(cb))
    0          
2789 0           self->drain_cb = newSVsv(cb);
2790              
2791             /* Send UNSUB for all subscriptions */
2792             {
2793             ngx_queue_t *q;
2794             char buf[64];
2795 0 0         ngx_queue_foreach(q, &self->subs) {
    0          
2796 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
2797 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf "\r\n", (UV)sub->sid);
2798 0           wbuf_append(self, buf, n);
2799             }
2800             }
2801              
2802             /* Enqueue PING fence via pong_cbs — naturally ordered after any pending flush cbs.
2803             Use NULL cb; the PONG handler checks draining after firing pong_cb. */
2804 0           wbuf_append(self, "PING\r\n", 6);
2805 0           nats_try_write(self);
2806             {
2807             nats_pong_cb_t *pcb;
2808 0           Newxz(pcb, 1, nats_pong_cb_t);
2809 0           pcb->cb = NULL; /* drain completion marker */
2810 0           ngx_queue_insert_tail(&self->pong_cbs, &pcb->queue);
2811             }
2812              
2813             void
2814             DESTROY(self)
2815             EV::Nats self
2816             CODE:
2817 0 0         if (self->magic != NATS_MAGIC_ALIVE)
2818 0           return;
2819              
2820 0           self->magic = NATS_MAGIC_FREED;
2821 0           self->intentional_disconnect = 1;
2822              
2823 0 0         if (PL_dirty) {
2824 0 0         if (self->fd >= 0)
2825 0           close(self->fd);
2826 0           return;
2827             }
2828              
2829 0 0         CLEAR_HANDLER(self->on_error);
2830 0 0         CLEAR_HANDLER(self->on_connect);
2831 0 0         CLEAR_HANDLER(self->on_disconnect);
2832              
2833 0           nats_stop_watchers(self);
2834 0           nats_stop_timers(self);
2835              
2836 0 0         while (!ngx_queue_empty(&self->req_queue)) {
2837 0           ngx_queue_t *q = ngx_queue_head(&self->req_queue);
2838 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
2839 0           ngx_queue_remove(q);
2840 0 0         if (req->timer_active)
2841 0           ev_timer_stop(self->loop, &req->timer);
2842 0 0         CLEAR_HANDLER(req->cb);
2843 0           Safefree(req);
2844             }
2845 0           nats_skip_waiting(self);
2846              
2847 0 0         while (!ngx_queue_empty(&self->pong_cbs)) {
2848 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
2849 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
2850 0           ngx_queue_remove(pq);
2851 0 0         CLEAR_HANDLER(pcb->cb);
2852 0           Safefree(pcb);
2853             }
2854              
2855 0 0         while (!ngx_queue_empty(&self->subs)) {
2856 0           ngx_queue_t *q = ngx_queue_head(&self->subs);
2857 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
2858 0           nats_remove_sub(self, sub);
2859             }
2860              
2861 0 0         if (self->fd >= 0) {
2862 0           close(self->fd);
2863 0           self->fd = -1;
2864             }
2865              
2866 0 0         CLEAR_HANDLER(self->server_info_json);
2867 0 0         CLEAR_HANDLER(self->drain_cb);
2868              
2869 0           nats_free_server_pool(self);
2870 0 0         if (self->sub_map) {
2871 0           SvREFCNT_dec((SV *)self->sub_map);
2872 0           self->sub_map = NULL;
2873             }
2874              
2875 0           #ifdef HAVE_OPENSSL
2876 0 0         nats_ssl_cleanup(self);
2877 0 0         if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; }
2878             if (self->tls_ca_file) Safefree(self->tls_ca_file);
2879 0 0         #endif
2880 0 0          
2881 0 0         if (self->rbuf) Safefree(self->rbuf);
2882 0 0         if (self->wbuf) Safefree(self->wbuf);
2883 0 0         if (self->host) Safefree(self->host);
2884 0 0         if (self->path) Safefree(self->path);
2885 0 0         if (self->user) Safefree(self->user);
2886 0 0         if (self->pass) Safefree(self->pass);
2887 0 0         if (self->token) Safefree(self->token);
2888 0 0         if (self->name) Safefree(self->name);
2889 0 0         if (self->nkey_seed) Safefree(self->nkey_seed);
2890 0 0         if (self->jwt) Safefree(self->jwt);
2891 0 0         if (self->server_nonce) Safefree(self->server_nonce);
2892             CLEAR_HANDLER(self->on_slow_consumer);
2893 0           CLEAR_HANDLER(self->on_ldm);
2894              
2895             Safefree(self);