File Coverage

Nats.xs
Criterion Covered Total %
statement 120 1563 7.6
branch 41 1382 2.9
condition n/a
subroutine n/a
pod n/a
total 161 2945 5.4


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7             #include "ngx-queue.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15             #include
16             #include
17             #include
18             #include
19             #include
20             #include
21              
22             #ifdef HAVE_OPENSSL
23             #include
24             #include
25             #include
26             #endif
27              
28             /* ================================================================
29             * Constants
30             * ================================================================ */
31              
32             #define NATS_MAGIC_ALIVE 0xCA5E4A75
33              
34             #define BUF_INIT_SIZE 16384
35             #define MAX_CONTROL_LINE 4096
36              
37             #define DEFAULT_MAX_PAYLOAD (1024 * 1024)
38              
39             #define PARSE_OP 0
40             #define PARSE_MSG_BODY 1
41              
42             #define MSG_TYPE_MSG 0
43             #define MSG_TYPE_HMSG 1
44              
45             #define CLEAR_HANDLER(field) \
46             do { if (NULL != (field)) { SvREFCNT_dec(field); (field) = NULL; } } while(0)
47              
48             #define NATS_CROAK_UNLESS_CONNECTED(self) \
49             do { \
50             if (!(self)->connected && !(self)->connecting && \
51             !((self)->reconnect_enabled && (self)->reconnect_timer_active)) \
52             croak("not connected"); \
53             } while(0)
54              
55             /* ================================================================
56             * Type declarations
57             * ================================================================ */
58              
59             typedef struct nats_s nats_t;
60             typedef struct nats_sub_s nats_sub_t;
61             typedef struct nats_pub_s nats_pub_t;
62              
63             typedef nats_t* EV__Nats;
64             typedef struct ev_loop* EV__Loop;
65              
66             /* ================================================================
67             * Data structures
68             * ================================================================ */
69              
70             struct nats_sub_s {
71             uint64_t sid;
72             SV *subject;
73             SV *queue_group;
74             SV *cb;
75             int max_msgs;
76             int received;
77             ngx_queue_t queue;
78             };
79              
80             /* PONG callback entry (for flush) */
81             typedef struct nats_pong_cb_s {
82             SV *cb;
83             ngx_queue_t queue;
84             } nats_pong_cb_t;
85              
86             /* Server pool entry (parsed from INFO connect_urls) */
87             typedef struct nats_server_s {
88             char *host;
89             int port;
90             ngx_queue_t queue;
91             } nats_server_t;
92              
93             struct nats_pub_s {
94             char *data;
95             size_t len;
96             ngx_queue_t queue;
97             };
98              
99             struct nats_s {
100             unsigned int magic;
101             struct ev_loop *loop;
102             int fd;
103             int connected;
104             int connecting;
105              
106             ev_io rio, wio;
107             int reading, writing;
108              
109             char *rbuf;
110             size_t rbuf_len, rbuf_cap;
111              
112             char *wbuf;
113             size_t wbuf_len, wbuf_off, wbuf_cap;
114              
115             SV *on_error;
116             SV *on_connect;
117             SV *on_disconnect;
118              
119             ngx_queue_t subs;
120             HV *sub_map; /* SID -> nats_sub_t* hash for O(1) lookup */
121             uint64_t next_sid;
122              
123             ngx_queue_t wait_queue;
124             int waiting_count;
125              
126             /* Write coalescing */
127             ev_prepare prepare_watcher;
128             int prepare_active;
129             int wbuf_dirty;
130              
131             char *host;
132             int port;
133             char *path; /* Unix socket path */
134              
135             int reconnect_enabled;
136             int reconnect_delay_ms;
137             int max_reconnect_delay_ms;
138             int max_reconnect_attempts;
139             int reconnect_attempts;
140             ev_timer reconnect_timer;
141             int reconnect_timer_active;
142             int intentional_disconnect;
143              
144             int connect_timeout_ms;
145             ev_timer connect_timer;
146             int connect_timer_active;
147              
148             int ping_interval_ms;
149             ev_timer ping_timer;
150             int ping_timer_active;
151             int pings_outstanding;
152             int max_pings_outstanding;
153              
154             int max_payload;
155             SV *server_info_json;
156              
157             char *user;
158             char *pass;
159             char *token;
160             char *name;
161             int verbose;
162             int pedantic;
163             int echo;
164             int no_responders;
165              
166             char inbox_prefix[32];
167             uint64_t next_req_id;
168             ngx_queue_t req_queue;
169             uint64_t inbox_sub_sid;
170              
171             int parse_state;
172             int msg_type;
173             /* MSG/HMSG fields: absolute offsets into rbuf (safe across Renew) */
174             size_t msg_subject_off;
175             size_t msg_subject_len;
176             size_t msg_reply_off; /* msg_reply_len == 0 means no reply */
177             size_t msg_reply_len;
178             uint64_t msg_sid;
179             size_t msg_hdr_len;
180             size_t msg_total_len;
181              
182             int priority;
183             int keepalive;
184              
185             /* Stats */
186             UV msgs_in;
187             UV msgs_out;
188             UV bytes_in;
189             UV bytes_out;
190              
191             /* Server pool for cluster failover */
192             ngx_queue_t server_pool;
193             int server_pool_count;
194              
195             /* Drain state */
196             int draining;
197             SV *drain_cb;
198              
199             /* Slow consumer detection */
200             size_t slow_consumer_bytes; /* wbuf threshold, 0 = disabled */
201             SV *on_slow_consumer;
202              
203             /* Batch mode */
204             int batch_mode;
205              
206             /* PONG callback queue (for flush) */
207             ngx_queue_t pong_cbs;
208              
209             /* NKey auth */
210             char *nkey_seed; /* Ed25519 seed (base32-encoded) */
211             char *jwt;
212             char *server_nonce; /* nonce from INFO for signing */
213              
214             /* Lame duck mode (leaf node graceful shutdown) */
215             int ldm;
216             SV *on_ldm;
217              
218             /* TLS */
219             #ifdef HAVE_OPENSSL
220             SSL_CTX *ssl_ctx;
221             SSL *ssl;
222             int tls;
223             int tls_skip_verify;
224             char *tls_ca_file;
225             int ssl_handshaking;
226             #endif
227             };
228              
229             typedef struct nats_req_s {
230             uint64_t req_id;
231             SV *cb;
232             ev_timer timer;
233             int timer_active;
234             nats_t *self;
235             ngx_queue_t queue;
236             } nats_req_t;
237              
238             /* ================================================================
239             * Forward declarations
240             * ================================================================ */
241              
242             static void nats_connect_tcp(nats_t *self);
243             static void nats_connect_unix(nats_t *self);
244             static void nats_do_connect(nats_t *self);
245             static void nats_schedule_reconnect(nats_t *self);
246             static void nats_next_server(nats_t *self);
247             static void nats_free_server_pool(nats_t *self);
248             static void nats_on_read(struct ev_loop *loop, ev_io *w, int revents);
249             static void nats_on_prepare(struct ev_loop *loop, ev_prepare *w, int revents);
250             static void nats_parse_connect_urls(nats_t *self, const char *json, size_t len);
251             static void nats_on_write(struct ev_loop *loop, ev_io *w, int revents);
252             static void nats_on_connect_timeout(struct ev_loop *loop, ev_timer *w, int revents);
253             static void nats_on_reconnect_timer(struct ev_loop *loop, ev_timer *w, int revents);
254             static void nats_on_ping_timer(struct ev_loop *loop, ev_timer *w, int revents);
255             static void nats_send_connect(nats_t *self);
256             static void nats_try_write(nats_t *self);
257             static void nats_process_line(nats_t *self, char *line, size_t len);
258             static void nats_process_msg(nats_t *self, char *payload, size_t len);
259             static void nats_emit_error(nats_t *self, const char *err);
260             static void nats_cleanup(nats_t *self);
261             static void nats_start_ping_timer(nats_t *self);
262             static void nats_drain_waiting(nats_t *self);
263             static void nats_setup_inbox(nats_t *self);
264             static nats_sub_t *nats_find_sub(nats_t *self, uint64_t sid);
265             static void nats_remove_sub(nats_t *self, nats_sub_t *sub);
266             static void nats_cancel_all_requests(nats_t *self, const char *err);
267             static void nats_resub_all(nats_t *self);
268              
269             /* ================================================================
270             * String helpers
271             * ================================================================ */
272              
273             /* Replace *dst with a fresh copy of NUL-terminated src; frees existing. */
274 4           static void nats_set_str(char **dst, const char *src)
275             {
276 4 50         if (*dst) { Safefree(*dst); *dst = NULL; }
277 4 50         if (src) {
278 4           size_t l = strlen(src);
279 4           Newx(*dst, l + 1, char);
280 4           memcpy(*dst, src, l + 1);
281             }
282 4           }
283              
284             /* Replace *dst with a copy of an SV's bytes (handles non-NUL-terminated PV). */
285 0           static void nats_set_str_sv(char **dst, SV *val)
286             {
287             STRLEN l;
288 0           const char *s = SvPV(val, l);
289 0 0         if (*dst) { Safefree(*dst); *dst = NULL; }
290 0           Newx(*dst, l + 1, char);
291 0           memcpy(*dst, s, l);
292 0           (*dst)[l] = '\0';
293 0           }
294              
295             /* ================================================================
296             * Buffer helpers
297             * ================================================================ */
298              
299 0           static void buf_ensure(char **buf, size_t *cap, size_t needed)
300             {
301 0 0         if (needed <= *cap)
302 0           return;
303 0 0         size_t newcap = *cap ? *cap : BUF_INIT_SIZE;
304 0 0         while (newcap < needed)
305 0           newcap *= 2;
306 0           Renew(*buf, newcap, char);
307 0           *cap = newcap;
308             }
309              
310 0           static void wbuf_append(nats_t *self, const char *data, size_t len)
311             {
312 0           size_t used = self->wbuf_len - self->wbuf_off;
313 0 0         if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
    0          
314 0 0         if (used > 0)
315 0           memmove(self->wbuf, self->wbuf + self->wbuf_off, used);
316 0           self->wbuf_len = used;
317 0           self->wbuf_off = 0;
318             }
319 0           buf_ensure(&self->wbuf, &self->wbuf_cap, self->wbuf_len + len);
320 0           memcpy(self->wbuf + self->wbuf_len, data, len);
321 0           self->wbuf_len += len;
322 0           }
323              
324             /* ================================================================
325             * NKey helpers (base32 + Ed25519)
326             * ================================================================ */
327              
328             #ifdef HAVE_OPENSSL
329             #include
330              
331             /* NATS base32 decode (RFC 4648, no padding) */
332 0           static int nats_base32_decode(const char *src, size_t src_len, unsigned char *dst, size_t *dst_len)
333             {
334             static const int8_t b32_tab[128] = {
335             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
336             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
337             -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,
338             -1,-1,26,27,28,29,30,31,-1,-1,-1,-1,-1,-1,-1,-1, /* 2-7 */
339             -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14, /* A-O */
340             15,16,17,18,19,20,21,22,23,24,25,-1,-1,-1,-1,-1, /* P-Z */
341             -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14, /* a-o */
342             15,16,17,18,19,20,21,22,23,24,25,-1,-1,-1,-1,-1, /* p-z */
343             };
344 0           uint64_t buf = 0;
345 0           int bits = 0;
346 0           size_t di = 0;
347             size_t i;
348              
349 0 0         for (i = 0; i < src_len; i++) {
350 0           unsigned char c = (unsigned char)src[i];
351 0 0         if (c == '=' || c == ' ') continue;
    0          
352 0 0         if (c >= 128 || b32_tab[c] < 0) return -1;
    0          
353 0           buf = (buf << 5) | b32_tab[c];
354 0           bits += 5;
355 0 0         if (bits >= 8) {
356 0           bits -= 8;
357 0           dst[di++] = (unsigned char)(buf >> bits);
358             }
359             }
360 0           *dst_len = di;
361 0           return 0;
362             }
363              
364             /* NATS base32 encode (RFC 4648, no padding). Streams 8-bit input through
365             a 5-bit accumulator. Returns number of chars written, or -1 if dst is
366             too small. */
367 0           static int nats_base32_encode(const unsigned char *src, size_t src_len,
368             char *dst, size_t dst_size)
369             {
370             static const char b32[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567";
371 0           size_t di = 0;
372 0           uint64_t buf = 0;
373 0           int bits = 0;
374             size_t i;
375 0 0         for (i = 0; i < src_len; i++) {
376 0           buf = (buf << 8) | src[i];
377 0           bits += 8;
378 0 0         while (bits >= 5) {
379 0 0         if (di >= dst_size) return -1;
380 0           bits -= 5;
381 0           dst[di++] = b32[(buf >> bits) & 0x1F];
382             }
383             }
384 0 0         if (bits > 0) {
385 0 0         if (di >= dst_size) return -1;
386 0           dst[di++] = b32[(buf << (5 - bits)) & 0x1F];
387             }
388 0           return (int)di;
389             }
390              
391             /* NATS base64url encode (no padding) */
392 0           static int nats_base64url_encode(const unsigned char *src, size_t src_len, char *dst, size_t dst_size)
393             {
394             static const char b64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
395 0           size_t di = 0;
396             size_t i;
397 0 0         for (i = 0; i + 2 < src_len; i += 3) {
398 0 0         if (di + 4 > dst_size) return -1;
399 0           uint32_t v = ((uint32_t)src[i] << 16) | ((uint32_t)src[i+1] << 8) | src[i+2];
400 0           dst[di++] = b64[(v >> 18) & 0x3F];
401 0           dst[di++] = b64[(v >> 12) & 0x3F];
402 0           dst[di++] = b64[(v >> 6) & 0x3F];
403 0           dst[di++] = b64[v & 0x3F];
404             }
405 0 0         if (i < src_len) {
406 0 0         if (di + 4 > dst_size) return -1;
407 0           uint32_t v = (uint32_t)src[i] << 16;
408 0 0         if (i + 1 < src_len) v |= (uint32_t)src[i+1] << 8;
409 0           dst[di++] = b64[(v >> 18) & 0x3F];
410 0           dst[di++] = b64[(v >> 12) & 0x3F];
411 0 0         if (i + 1 < src_len) dst[di++] = b64[(v >> 6) & 0x3F];
412             }
413 0 0         if (di < dst_size) dst[di] = '\0';
414 0           return (int)di;
415             }
416              
417             /* CRC-16/XMODEM (CRC-CCITT, poly 0x1021, init 0x0000) — what NATS NKeys
418             use. NOT CRC-16/IBM. Used for seed/pubkey integrity. */
419 0           static uint16_t nats_crc16(const unsigned char *data, size_t len)
420             {
421 0           uint16_t crc = 0;
422             size_t i;
423             int j;
424 0 0         for (i = 0; i < len; i++) {
425 0           crc ^= ((uint16_t)data[i]) << 8;
426 0 0         for (j = 0; j < 8; j++) {
427 0 0         if (crc & 0x8000) crc = (uint16_t)((crc << 1) ^ 0x1021);
428 0           else crc = (uint16_t)(crc << 1);
429             }
430             }
431 0           return crc;
432             }
433              
434 0           static int nats_nkey_sign(const char *seed_encoded, const char *nonce, size_t nonce_len,
435             char *sig_out, size_t sig_out_size)
436             {
437             unsigned char raw[64];
438 0           size_t raw_len = 0;
439              
440 0 0         if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
441 0           return -1;
442 0 0         if (raw_len < 36) return -1; /* prefix(2) + seed(32) + CRC(2) */
443              
444             /* Validate CRC16 */
445 0           uint16_t expected_crc = (uint16_t)raw[raw_len - 2] | ((uint16_t)raw[raw_len - 1] << 8);
446 0           uint16_t actual_crc = nats_crc16(raw, raw_len - 2);
447 0 0         if (expected_crc != actual_crc) return -1;
448              
449 0           unsigned char *seed = raw + 2;
450              
451 0           EVP_PKEY *pkey = EVP_PKEY_new_raw_private_key(EVP_PKEY_ED25519, NULL, seed, 32);
452 0 0         if (!pkey) return -1;
453              
454 0           EVP_MD_CTX *ctx = EVP_MD_CTX_new();
455 0 0         if (!ctx) { EVP_PKEY_free(pkey); return -1; }
456              
457             unsigned char sig[64];
458 0           size_t sig_len = sizeof(sig);
459              
460 0           int ok = (EVP_DigestSignInit(ctx, NULL, NULL, NULL, pkey) == 1 &&
461 0           EVP_DigestSign(ctx, sig, &sig_len, (const unsigned char *)nonce, nonce_len) == 1);
462              
463 0           EVP_MD_CTX_free(ctx);
464 0           EVP_PKEY_free(pkey);
465              
466 0 0         if (!ok) return -1;
467              
468 0           return nats_base64url_encode(sig, sig_len, sig_out, sig_out_size);
469             }
470              
471 0           static int nats_nkey_public(const char *seed_encoded, char *pub_out, size_t pub_out_size)
472             {
473             unsigned char raw[64];
474 0           size_t raw_len = 0;
475              
476 0 0         if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
477 0           return -1;
478 0 0         if (raw_len < 36) return -1;
479              
480 0           uint16_t expected_crc = (uint16_t)raw[raw_len - 2] | ((uint16_t)raw[raw_len - 1] << 8);
481 0 0         if (nats_crc16(raw, raw_len - 2) != expected_crc) return -1;
482              
483 0           unsigned char *seed = raw + 2;
484              
485 0           EVP_PKEY *pkey = EVP_PKEY_new_raw_private_key(EVP_PKEY_ED25519, NULL, seed, 32);
486 0 0         if (!pkey) return -1;
487              
488             unsigned char pub[32];
489 0           size_t pub_len = 32;
490 0           int ok = EVP_PKEY_get_raw_public_key(pkey, pub, &pub_len) == 1;
491 0           EVP_PKEY_free(pkey);
492 0 0         if (!ok) return -1;
493              
494             /* Derive public-key prefix from the role embedded in the seed prefix.
495             Seed encoding (NATS Go nkeys EncodeSeed):
496             raw[0] = PrefixByteSeed | (role >> 5)
497             raw[1] = (role & 31) << 3
498             so role = ((raw[0] & 7) << 5) | (raw[1] >> 3). */
499 0           unsigned char role = (unsigned char)(((raw[0] & 0x07) << 5) | (raw[1] >> 3));
500             unsigned char pub_prefix;
501 0           switch (role) {
502 0           case 0xA0: pub_prefix = 0xA0; break; /* User */
503 0           case 0x70: pub_prefix = 0x70; break; /* Operator */
504 0           case 0x00: pub_prefix = 0x00; break; /* Account */
505 0           default: pub_prefix = 0xA0; break; /* default to User */
506             }
507              
508             /* Build: prefix(1) + pubkey(32) + CRC16(2) = 35 bytes */
509             unsigned char full[35];
510 0           full[0] = pub_prefix;
511 0           memcpy(full + 1, pub, 32);
512 0           uint16_t crc = nats_crc16(full, 33);
513 0           full[33] = crc & 0xFF;
514 0           full[34] = (crc >> 8) & 0xFF;
515              
516             /* Base32 encode 35 bytes -> 56 chars (35*8 = 280 bits, 280/5 = 56). */
517 0 0         int n = nats_base32_encode(full, sizeof(full), pub_out,
518             pub_out_size > 0 ? pub_out_size - 1 : 0);
519 0 0         if (n < 0) return -1;
520 0 0         if ((size_t)n < pub_out_size) pub_out[n] = '\0';
521 0           return n;
522             }
523             #endif
524              
525             /* ================================================================
526             * I/O helpers (with optional TLS)
527             * ================================================================ */
528              
529 0           static ssize_t nats_io_read(nats_t *self, void *buf, size_t len)
530             {
531             #ifdef HAVE_OPENSSL
532 0 0         if (self->ssl) {
533 0 0         int n = SSL_read(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
534 0 0         if (n <= 0) {
535 0           int err = SSL_get_error(self->ssl, n);
536 0 0         if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
    0          
537 0           errno = EAGAIN;
538 0           return -1;
539             }
540 0 0         if (err == SSL_ERROR_ZERO_RETURN) return 0;
541 0           errno = EIO;
542 0           return -1;
543             }
544 0           return n;
545             }
546             #endif
547 0           return read(self->fd, buf, len);
548             }
549              
550 0           static ssize_t nats_io_write(nats_t *self, const void *buf, size_t len)
551             {
552             #ifdef HAVE_OPENSSL
553 0 0         if (self->ssl) {
554 0 0         int n = SSL_write(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
555 0 0         if (n <= 0) {
556 0           int err = SSL_get_error(self->ssl, n);
557 0 0         if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
    0          
558 0           errno = EAGAIN;
559 0           return -1;
560             }
561 0           errno = EIO;
562 0           return -1;
563             }
564 0           return n;
565             }
566             #endif
567 0           return write(self->fd, buf, len);
568             }
569              
570             #ifdef HAVE_OPENSSL
571 0           static int nats_ssl_setup(nats_t *self)
572             {
573 0 0         if (!self->ssl_ctx) {
574 0           self->ssl_ctx = SSL_CTX_new(TLS_client_method());
575 0 0         if (!self->ssl_ctx) return -1;
576              
577 0 0         if (self->tls_ca_file) {
578 0 0         if (!SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL))
579 0           return -1;
580             } else {
581 0           SSL_CTX_set_default_verify_paths(self->ssl_ctx);
582             }
583              
584 0 0         if (!self->tls_skip_verify)
585 0           SSL_CTX_set_verify(self->ssl_ctx, SSL_VERIFY_PEER, NULL);
586             }
587              
588 0           self->ssl = SSL_new(self->ssl_ctx);
589 0 0         if (!self->ssl) return -1;
590              
591 0           SSL_set_fd(self->ssl, self->fd);
592 0 0         if (self->host) {
593 0           SSL_set_tlsext_host_name(self->ssl, self->host);
594 0 0         if (!self->tls_skip_verify) {
595 0           X509_VERIFY_PARAM *vpm = SSL_get0_param(self->ssl);
596             /* set1_ip_asc parses host as a textual IP and returns 0 on
597             failure — fall back to set1_host for DNS names. */
598 0 0         if (vpm && !X509_VERIFY_PARAM_set1_ip_asc(vpm, self->host)) {
    0          
599 0           X509_VERIFY_PARAM_set_hostflags(vpm, X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS);
600 0 0         if (!X509_VERIFY_PARAM_set1_host(vpm, self->host, 0))
601 0           return -1;
602             }
603             }
604             }
605              
606 0           return 0;
607             }
608              
609 6           static void nats_ssl_cleanup(nats_t *self)
610             {
611 6 50         if (self->ssl) {
612 0           SSL_free(self->ssl);
613 0           self->ssl = NULL;
614             }
615 6           self->ssl_handshaking = 0;
616 6           }
617              
618 0           static int nats_ssl_handshake(nats_t *self)
619             {
620 0           int ret = SSL_connect(self->ssl);
621 0 0         if (ret == 1) {
622 0           self->ssl_handshaking = 0;
623 0           return 1;
624             }
625 0           int err = SSL_get_error(self->ssl, ret);
626 0 0         if (err == SSL_ERROR_WANT_READ) {
627 0 0         if (!self->reading) {
628 0           ev_io_start(self->loop, &self->rio);
629 0           self->reading = 1;
630             }
631 0           return 0;
632             }
633 0 0         if (err == SSL_ERROR_WANT_WRITE) {
634 0 0         if (!self->writing) {
635 0           ev_io_start(self->loop, &self->wio);
636 0           self->writing = 1;
637             }
638 0           return 0;
639             }
640 0           return -1;
641             }
642              
643             /* Emit ": ", clean up the connection. */
644 0           static void nats_ssl_fail(nats_t *self, const char *prefix)
645             {
646             char errbuf[256];
647             char msg[320];
648 0           unsigned long e = ERR_peek_last_error();
649 0 0         if (e) {
650 0           ERR_error_string_n(e, errbuf, sizeof(errbuf));
651 0           ERR_clear_error();
652             } else {
653 0           snprintf(errbuf, sizeof(errbuf), "unknown SSL error");
654             }
655 0           snprintf(msg, sizeof(msg), "%s: %s", prefix, errbuf);
656 0           nats_emit_error(self, msg);
657 0           nats_cleanup(self);
658 0           }
659             #endif
660              
661             /* ================================================================
662             * JSON string escaper (for CONNECT command)
663             * ================================================================ */
664              
665 0           static int json_escape_string(char *dst, size_t dst_size, const char *src)
666             {
667 0           size_t di = 0;
668 0           const unsigned char *s = (const unsigned char *)src;
669              
670             /* Need room for opening quote, at least one char, closing quote, NUL */
671 0 0         if (dst_size < 3) {
672 0 0         if (dst_size > 0) dst[0] = '\0';
673 0           return 0;
674             }
675              
676 0           dst[di++] = '"';
677              
678 0 0         for (; *s; s++) {
679 0 0         size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
    0          
    0          
680 0 0         if (di + need + 2 > dst_size) break; /* +2 for closing quote + NUL */
681              
682 0 0         if (*s == '"') {
683 0           dst[di++] = '\\'; dst[di++] = '"';
684 0 0         } else if (*s == '\\') {
685 0           dst[di++] = '\\'; dst[di++] = '\\';
686 0 0         } else if (*s < 0x20) {
687 0           di += snprintf(dst + di, dst_size - di, "\\u%04x", *s);
688             } else {
689 0           dst[di++] = *s;
690             }
691             }
692              
693 0           dst[di++] = '"';
694 0           dst[di] = '\0';
695 0           return (int)di;
696             }
697              
698             /* ================================================================
699             * Random hex for inbox prefix
700             * ================================================================ */
701              
702             /* Read nbytes from /dev/urandom; fall back to rand() per byte. */
703 6           static void nats_random_bytes(unsigned char *out, int nbytes)
704             {
705 6           int got = 0;
706 6           int fd = open("/dev/urandom", O_RDONLY);
707 6 50         if (fd >= 0) {
708 12 100         while (got < nbytes) {
709 6           ssize_t n = read(fd, out + got, nbytes - got);
710 6 50         if (n > 0) got += n;
711 0 0         else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
    0          
    0          
712             }
713 6           close(fd);
714             }
715             int i;
716 6 50         for (i = got; i < nbytes; i++)
717 0           out[i] = (unsigned char)(rand() & 0xFF);
718 6           }
719              
720 6           static void nats_gen_hex(char *buf, int nbytes)
721             {
722             static const char hex[] = "0123456789abcdef";
723             unsigned char rnd[16];
724 6 50         if (nbytes < 0) nbytes = 0;
725 6 50         if (nbytes > (int)sizeof(rnd)) nbytes = (int)sizeof(rnd);
726 6           nats_random_bytes(rnd, nbytes);
727             int i;
728 54 100         for (i = 0; i < nbytes; i++) {
729 48           buf[i*2] = hex[rnd[i] >> 4];
730 48           buf[i*2+1] = hex[rnd[i] & 0x0F];
731             }
732 6           }
733              
734 6           static void nats_setup_inbox(nats_t *self)
735             {
736             char hex[20];
737 6           nats_gen_hex(hex, 8);
738 6           snprintf(self->inbox_prefix, sizeof(self->inbox_prefix),
739             "_INBOX.%.16s.", hex);
740 6           self->next_req_id = 1;
741 6           }
742              
743             /* ================================================================
744             * Subscription management
745             * ================================================================ */
746              
747 0           static nats_sub_t *nats_find_sub(nats_t *self, uint64_t sid)
748             {
749             char key[24];
750 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sid);
751 0           SV **svp = hv_fetch(self->sub_map, key, klen, 0);
752 0 0         if (svp && SvIOK(*svp))
    0          
753 0           return INT2PTR(nats_sub_t *, SvIVX(*svp));
754 0           return NULL;
755             }
756              
757 0           static void nats_register_sub(nats_t *self, nats_sub_t *sub)
758             {
759             char key[24];
760 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sub->sid);
761 0           hv_store(self->sub_map, key, klen, newSViv(PTR2IV(sub)), 0);
762 0           ngx_queue_insert_tail(&self->subs, &sub->queue);
763 0           }
764              
765 0           static void nats_unregister_sub(nats_t *self, nats_sub_t *sub)
766             {
767             char key[24];
768 0           int klen = snprintf(key, sizeof(key), "%" UVuf, (UV)sub->sid);
769 0           hv_delete(self->sub_map, key, klen, G_DISCARD);
770 0           }
771              
772 0           static void nats_remove_sub(nats_t *self, nats_sub_t *sub)
773             {
774 0           nats_unregister_sub(self, sub);
775 0           ngx_queue_remove(&sub->queue);
776 0 0         CLEAR_HANDLER(sub->subject);
777 0 0         CLEAR_HANDLER(sub->queue_group);
778 0 0         CLEAR_HANDLER(sub->cb);
779 0           Safefree(sub);
780 0           }
781              
782 0           static void nats_resub_all(nats_t *self)
783             {
784             ngx_queue_t *q;
785             char buf[MAX_CONTROL_LINE];
786              
787 0 0         ngx_queue_foreach(q, &self->subs) {
    0          
788 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
789             STRLEN slen;
790 0           const char *subj = SvPV(sub->subject, slen);
791             int n;
792              
793 0 0         if (sub->queue_group) {
794             STRLEN glen;
795 0           const char *grp = SvPV(sub->queue_group, glen);
796 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %.*s %" UVuf "\r\n",
797 0           (int)slen, subj, (int)glen, grp, (UV)sub->sid);
798             } else {
799 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %" UVuf "\r\n",
800 0           (int)slen, subj, (UV)sub->sid);
801             }
802 0           wbuf_append(self, buf, n);
803              
804             /* Restore auto-unsub if partially consumed */
805 0 0         if (sub->max_msgs > 0) {
806 0           int remaining = sub->max_msgs - sub->received;
807 0 0         if (remaining < 1) remaining = 1;
808 0           n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf " %d\r\n",
809 0           (UV)sub->sid, remaining);
810 0           wbuf_append(self, buf, n);
811             }
812             }
813 0           }
814              
815             /* ================================================================
816             * Connection management
817             * ================================================================ */
818              
819 6           static void nats_stop_watchers(nats_t *self)
820             {
821 6 50         if (self->reading) {
822 0           ev_io_stop(self->loop, &self->rio);
823 0           self->reading = 0;
824             }
825 6 50         if (self->writing) {
826 0           ev_io_stop(self->loop, &self->wio);
827 0           self->writing = 0;
828             }
829 6           }
830              
831 6           static void nats_stop_timers(nats_t *self)
832             {
833 6 50         if (self->connect_timer_active) {
834 0           ev_timer_stop(self->loop, &self->connect_timer);
835 0           self->connect_timer_active = 0;
836             }
837 6 50         if (self->reconnect_timer_active) {
838 0           ev_timer_stop(self->loop, &self->reconnect_timer);
839 0           self->reconnect_timer_active = 0;
840             }
841 6 50         if (self->ping_timer_active) {
842 0           ev_timer_stop(self->loop, &self->ping_timer);
843 0           self->ping_timer_active = 0;
844             }
845 6 50         if (self->prepare_active) {
846 0           ev_prepare_stop(self->loop, &self->prepare_watcher);
847 0           self->prepare_active = 0;
848             }
849 6           }
850              
851 0           static void nats_cancel_all_requests(nats_t *self, const char *err)
852             {
853 0           dSP;
854 0 0         while (!ngx_queue_empty(&self->req_queue)) {
855 0           ngx_queue_t *q = ngx_queue_head(&self->req_queue);
856 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
857 0           ngx_queue_remove(q);
858              
859 0 0         if (req->timer_active) {
860 0           ev_timer_stop(self->loop, &req->timer);
861 0           req->timer_active = 0;
862             }
863              
864 0 0         if (req->cb) {
865 0           ENTER; SAVETMPS;
866 0 0         PUSHMARK(SP);
867 0 0         EXTEND(SP, 2);
868 0           PUSHs(&PL_sv_undef);
869 0           PUSHs(sv_2mortal(newSVpv(err, 0)));
870 0           PUTBACK;
871 0           call_sv(req->cb, G_DISCARD);
872 0 0         FREETMPS; LEAVE;
873 0           SvREFCNT_dec(req->cb);
874             }
875 0           Safefree(req);
876             }
877 0           }
878              
879 6           static void nats_skip_waiting(nats_t *self)
880             {
881 6 50         while (!ngx_queue_empty(&self->wait_queue)) {
882 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
883 0           nats_pub_t *pub = ngx_queue_data(q, nats_pub_t, queue);
884 0           ngx_queue_remove(q);
885 0 0         if (pub->data)
886 0           Safefree(pub->data);
887 0           Safefree(pub);
888 0           self->waiting_count--;
889             }
890 6           }
891              
892 0           static void nats_cleanup(nats_t *self)
893             {
894 0 0         int was_connected = self->connected || self->connecting;
    0          
895              
896 0           nats_stop_watchers(self);
897 0           nats_stop_timers(self);
898              
899 0           self->connected = 0;
900 0           self->connecting = 0;
901 0           self->pings_outstanding = 0;
902 0           self->parse_state = PARSE_OP;
903 0           self->ldm = 0; /* lame-duck flag is per-connection */
904              
905             #ifdef HAVE_OPENSSL
906 0           nats_ssl_cleanup(self);
907             #endif
908              
909 0 0         if (self->fd >= 0) {
910 0           close(self->fd);
911 0           self->fd = -1;
912             }
913              
914 0           self->rbuf_len = 0;
915 0           self->wbuf_len = 0;
916 0           self->wbuf_off = 0;
917              
918 0           nats_cancel_all_requests(self, "disconnected");
919              
920             /* Drain pending PONG callbacks (flush + drain markers). Fire each
921             cb with a single error arg so callers learn the flush failed
922             rather than hang forever waiting for a PONG that won't arrive. */
923 0 0         while (!ngx_queue_empty(&self->pong_cbs)) {
924 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
925 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
926 0           ngx_queue_remove(pq);
927 0 0         if (pcb->cb) {
928 0           dSP;
929 0           ENTER; SAVETMPS;
930 0 0         PUSHMARK(SP);
931 0 0         EXTEND(SP, 1);
932 0           PUSHs(sv_2mortal(newSVpvn("disconnected", 12)));
933 0           PUTBACK;
934 0           call_sv(pcb->cb, G_DISCARD);
935 0 0         FREETMPS; LEAVE;
936 0           SvREFCNT_dec(pcb->cb);
937             }
938 0           Safefree(pcb);
939             }
940             /* Drain marker (drain_cb) is a separate field, fired on PONG normally;
941             on cleanup it would otherwise leak. Fire it with error too. */
942 0 0         if (self->draining && self->drain_cb) {
    0          
943 0           dSP;
944 0           ENTER; SAVETMPS;
945 0 0         PUSHMARK(SP);
946 0 0         EXTEND(SP, 1);
947 0           PUSHs(sv_2mortal(newSVpvn("disconnected", 12)));
948 0           PUTBACK;
949 0           call_sv(self->drain_cb, G_DISCARD);
950 0 0         FREETMPS; LEAVE;
951 0 0         CLEAR_HANDLER(self->drain_cb);
952 0           self->draining = 0;
953             }
954              
955 0 0         if (was_connected && self->on_disconnect) {
    0          
956 0           dSP;
957 0           ENTER; SAVETMPS;
958 0 0         PUSHMARK(SP);
959 0           PUTBACK;
960 0           call_sv(self->on_disconnect, G_DISCARD);
961 0 0         FREETMPS; LEAVE;
962             }
963              
964 0           nats_schedule_reconnect(self);
965 0           }
966              
967 0           static void nats_emit_error(nats_t *self, const char *err)
968             {
969 0 0         if (self->on_error) {
970 0           dSP;
971 0           ENTER; SAVETMPS;
972 0 0         PUSHMARK(SP);
973 0 0         EXTEND(SP, 1);
974 0           PUSHs(sv_2mortal(newSVpv(err, 0)));
975 0           PUTBACK;
976 0           call_sv(self->on_error, G_DISCARD);
977 0 0         FREETMPS; LEAVE;
978             } else {
979 0           croak("EV::Nats: %s", err);
980             }
981 0           }
982              
983             /* ================================================================
984             * CONNECT command builder
985             * ================================================================ */
986              
987             #define CONNECT_BUF_SIZE 8192
988             #define CONNECT_APPEND(fmt, ...) \
989             do { \
990             if (off >= (int)sizeof(buf)) { overflow = 1; break; } \
991             int _n = snprintf(buf + off, sizeof(buf) - off, fmt, ##__VA_ARGS__); \
992             if (_n < 0 || _n >= (int)(sizeof(buf) - off)) { overflow = 1; break; } \
993             off += _n; \
994             } while(0)
995              
996 0           static void nats_send_connect(nats_t *self)
997             {
998             char buf[CONNECT_BUF_SIZE];
999 0           int off = 0;
1000 0           int overflow = 0;
1001             char escaped[1024];
1002              
1003 0 0         CONNECT_APPEND("CONNECT {");
    0          
    0          
1004 0 0         CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
    0          
    0          
    0          
    0          
    0          
1005             self->verbose ? "true" : "false",
1006             self->pedantic ? "true" : "false",
1007             self->echo ? "true" : "false");
1008 0 0         CONNECT_APPEND(",\"protocol\":1");
    0          
    0          
1009 0 0         if (self->no_responders)
1010 0 0         CONNECT_APPEND(",\"no_responders\":true");
    0          
    0          
1011 0 0         CONNECT_APPEND(",\"headers\":true");
    0          
    0          
1012 0 0         CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.02\"");
    0          
    0          
1013              
1014 0 0         if (self->user) {
1015 0           int elen = json_escape_string(escaped, sizeof(escaped), self->user);
1016 0 0         CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
    0          
    0          
1017 0 0         if (self->pass) {
1018 0           elen = json_escape_string(escaped, sizeof(escaped), self->pass);
1019 0 0         CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
    0          
    0          
1020             }
1021             }
1022 0 0         if (self->token) {
1023 0           int elen = json_escape_string(escaped, sizeof(escaped), self->token);
1024 0 0         CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
    0          
    0          
1025             }
1026 0 0         if (self->name) {
1027 0           int elen = json_escape_string(escaped, sizeof(escaped), self->name);
1028 0 0         CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
    0          
    0          
1029             }
1030 0 0         if (self->jwt) {
1031 0           int elen = json_escape_string(escaped, sizeof(escaped), self->jwt);
1032 0 0         CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
    0          
    0          
1033             }
1034             #ifdef HAVE_OPENSSL
1035 0 0         if (self->nkey_seed && self->server_nonce) {
    0          
1036             /* Sign nonce with NKey */
1037             char sig[128];
1038             char pub[64];
1039 0 0         if (nats_nkey_sign(self->nkey_seed, self->server_nonce,
1040 0 0         strlen(self->server_nonce), sig, sizeof(sig)) > 0 &&
1041 0           nats_nkey_public(self->nkey_seed, pub, sizeof(pub)) > 0) {
1042 0 0         CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
    0          
    0          
1043             }
1044             }
1045             #endif
1046              
1047 0 0         CONNECT_APPEND("}\r\n");
    0          
    0          
1048              
1049 0 0         if (overflow) {
1050 0           nats_emit_error(self, "CONNECT command too long (auth/name fields)");
1051 0           nats_cleanup(self);
1052 0           return;
1053             }
1054              
1055 0           wbuf_append(self, buf, off);
1056              
1057 0           nats_resub_all(self);
1058              
1059 0           wbuf_append(self, "PING\r\n", 6);
1060             }
1061              
1062             /* ================================================================
1063             * Protocol parser
1064             * ================================================================ */
1065              
1066             /* Parse a decimal token as size_t. Returns 0 on success, -1 on
1067             non-digits, empty input, or overflow. */
1068 0           static int nats_parse_decimal(const char *tok, size_t tok_len, size_t *out)
1069             {
1070 0           size_t v = 0;
1071             size_t i;
1072 0 0         if (tok_len == 0) return -1;
1073 0 0         for (i = 0; i < tok_len; i++) {
1074 0 0         if (tok[i] < '0' || tok[i] > '9') return -1;
    0          
1075 0           size_t d = (size_t)(tok[i] - '0');
1076 0 0         if (v > (SIZE_MAX - d) / 10) return -1;
1077 0           v = v * 10 + d;
1078             }
1079 0           *out = v;
1080 0           return 0;
1081             }
1082              
1083 0           static int nats_parse_msg_args(nats_t *self, char *line, size_t len)
1084             {
1085 0           char *p = line;
1086 0           char *end = line + len;
1087             char *tok_start;
1088              
1089 0 0         if (len < 4) return -1;
1090 0           p += 4;
1091              
1092             /* subject */
1093 0           tok_start = p;
1094 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1095 0 0         if (p == tok_start) return -1;
1096 0           self->msg_subject_off = tok_start - self->rbuf;
1097 0           self->msg_subject_len = p - tok_start;
1098              
1099 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1100              
1101             /* sid */
1102 0           tok_start = p;
1103 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1104             {
1105             size_t sid;
1106 0 0         if (nats_parse_decimal(tok_start, p - tok_start, &sid) != 0) return -1;
1107 0           self->msg_sid = (uint64_t)sid;
1108             }
1109              
1110 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1111              
1112             /* Remaining tokens: [reply-to] <#bytes> — use token counting like HMSG */
1113 0           int ntokens = 0;
1114             char *tokens[3];
1115             size_t token_lens[3];
1116 0           char *tp = p;
1117 0 0         while (tp < end && ntokens < 3) {
    0          
1118 0 0         while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
    0          
    0          
1119 0 0         if (tp >= end) break;
1120 0           tokens[ntokens] = tp;
1121 0 0         while (tp < end && *tp != ' ' && *tp != '\t') tp++;
    0          
    0          
1122 0           token_lens[ntokens] = tp - tokens[ntokens];
1123 0           ntokens++;
1124             }
1125              
1126 0 0         if (ntokens == 1) {
1127             /* no reply-to, just #bytes */
1128 0           self->msg_reply_off = 0;
1129 0           self->msg_reply_len = 0;
1130 0 0         if (nats_parse_decimal(tokens[0], token_lens[0], &self->msg_total_len) != 0)
1131 0           return -1;
1132 0 0         } else if (ntokens == 2) {
1133             /* reply-to + #bytes */
1134 0           self->msg_reply_off = tokens[0] - self->rbuf;
1135 0           self->msg_reply_len = token_lens[0];
1136 0 0         if (nats_parse_decimal(tokens[1], token_lens[1], &self->msg_total_len) != 0)
1137 0           return -1;
1138             } else {
1139 0           return -1;
1140             }
1141              
1142 0           self->msg_hdr_len = 0;
1143 0           self->msg_type = MSG_TYPE_MSG;
1144 0           return 0;
1145             }
1146              
1147 0           static int nats_parse_hmsg_args(nats_t *self, char *line, size_t len)
1148             {
1149 0           char *p = line;
1150 0           char *end = line + len;
1151             char *tok_start;
1152              
1153 0 0         if (len < 5) return -1;
1154 0           p += 5;
1155              
1156             /* subject */
1157 0           tok_start = p;
1158 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1159 0 0         if (p == tok_start) return -1;
1160 0           self->msg_subject_off = tok_start - self->rbuf;
1161 0           self->msg_subject_len = p - tok_start;
1162              
1163 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1164              
1165             /* sid */
1166 0           tok_start = p;
1167 0 0         while (p < end && *p != ' ' && *p != '\t') p++;
    0          
    0          
1168             {
1169             size_t sid;
1170 0 0         if (nats_parse_decimal(tok_start, p - tok_start, &sid) != 0) return -1;
1171 0           self->msg_sid = (uint64_t)sid;
1172             }
1173              
1174 0 0         while (p < end && (*p == ' ' || *p == '\t')) p++;
    0          
    0          
1175              
1176 0           int ntokens = 0;
1177             char *tokens[4];
1178             size_t token_lens[4];
1179 0           char *tp = p;
1180 0 0         while (tp < end && ntokens < 4) {
    0          
1181 0 0         while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
    0          
    0          
1182 0 0         if (tp >= end) break;
1183 0           tokens[ntokens] = tp;
1184 0 0         while (tp < end && *tp != ' ' && *tp != '\t') tp++;
    0          
    0          
1185 0           token_lens[ntokens] = tp - tokens[ntokens];
1186 0           ntokens++;
1187             }
1188              
1189             int hdr_idx, len_idx;
1190 0 0         if (ntokens == 2) {
1191 0           self->msg_reply_off = 0;
1192 0           self->msg_reply_len = 0;
1193 0           hdr_idx = 0; len_idx = 1;
1194 0 0         } else if (ntokens == 3) {
1195 0           self->msg_reply_off = tokens[0] - self->rbuf;
1196 0           self->msg_reply_len = token_lens[0];
1197 0           hdr_idx = 1; len_idx = 2;
1198             } else {
1199 0           return -1;
1200             }
1201 0 0         if (nats_parse_decimal(tokens[hdr_idx], token_lens[hdr_idx], &self->msg_hdr_len) != 0
1202 0 0         || nats_parse_decimal(tokens[len_idx], token_lens[len_idx], &self->msg_total_len) != 0) {
1203 0           return -1;
1204             }
1205              
1206 0           self->msg_type = MSG_TYPE_HMSG;
1207 0           return 0;
1208             }
1209              
1210 0           static void nats_process_msg(nats_t *self, char *payload, size_t len)
1211             {
1212 0           nats_sub_t *sub = nats_find_sub(self, self->msg_sid);
1213 0 0         if (!sub) return;
1214              
1215 0           self->msgs_in++;
1216 0           self->bytes_in += len;
1217 0           sub->received++;
1218              
1219 0           int max_msgs = sub->max_msgs;
1220 0           int received = sub->received;
1221 0           uint64_t sid = sub->sid;
1222              
1223 0 0         if (sub->cb) {
1224 0           dSP;
1225 0           ENTER; SAVETMPS;
1226 0 0         PUSHMARK(SP);
1227 0 0         EXTEND(SP, 4);
1228              
1229 0           PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_subject_off, self->msg_subject_len)));
1230              
1231 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
    0          
    0          
1232 0           PUSHs(sv_2mortal(newSVpvn(payload + self->msg_hdr_len, len - self->msg_hdr_len)));
1233             } else {
1234 0           PUSHs(sv_2mortal(newSVpvn(payload, len)));
1235             }
1236              
1237 0 0         if (self->msg_reply_len > 0) {
1238 0           PUSHs(sv_2mortal(newSVpvn(self->rbuf + self->msg_reply_off, self->msg_reply_len)));
1239             } else {
1240 0           PUSHs(&PL_sv_undef);
1241             }
1242              
1243 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
    0          
1244 0           PUSHs(sv_2mortal(newSVpvn(payload, self->msg_hdr_len)));
1245             }
1246              
1247 0           PUTBACK;
1248 0           call_sv(sub->cb, G_DISCARD);
1249 0 0         FREETMPS; LEAVE;
1250             }
1251              
1252 0 0         if (max_msgs > 0 && received >= max_msgs) {
    0          
1253 0           sub = nats_find_sub(self, sid);
1254 0 0         if (sub)
1255 0           nats_remove_sub(self, sub);
1256             }
1257             }
1258              
1259 0           static void nats_check_inbox_response(nats_t *self, const char *subject, size_t subject_len,
1260             const char *payload, size_t payload_len,
1261             const char *headers, size_t headers_len)
1262             {
1263 0           size_t pfx_len = strlen(self->inbox_prefix);
1264 0 0         if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
    0          
1265 0           return;
1266              
1267 0           const char *id_str = subject + pfx_len;
1268 0           size_t id_len = subject_len - pfx_len;
1269 0           uint64_t req_id = 0;
1270             size_t i;
1271 0 0         for (i = 0; i < id_len; i++) {
1272 0 0         if (id_str[i] < '0' || id_str[i] > '9') return;
    0          
1273 0           req_id = req_id * 10 + (id_str[i] - '0');
1274             }
1275              
1276             ngx_queue_t *q;
1277 0 0         ngx_queue_foreach(q, &self->req_queue) {
    0          
1278 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
1279 0 0         if (req->req_id == req_id) {
1280 0           ngx_queue_remove(q);
1281 0 0         if (req->timer_active) {
1282 0           ev_timer_stop(self->loop, &req->timer);
1283 0           req->timer_active = 0;
1284             }
1285 0 0         if (req->cb) {
1286 0           dSP;
1287 0 0         int is_no_responders = (headers && headers_len >= 12 &&
    0          
1288 0 0         memcmp(headers, "NATS/1.0 503", 12) == 0);
1289 0           ENTER; SAVETMPS;
1290 0 0         PUSHMARK(SP);
1291 0 0         EXTEND(SP, 3);
1292 0 0         if (is_no_responders) {
1293 0           PUSHs(&PL_sv_undef);
1294 0           PUSHs(sv_2mortal(newSVpvn("no responders", 13)));
1295             } else {
1296 0           PUSHs(sv_2mortal(newSVpvn(payload, payload_len)));
1297 0           PUSHs(&PL_sv_undef);
1298 0 0         if (headers && headers_len > 0)
    0          
1299 0           PUSHs(sv_2mortal(newSVpvn(headers, headers_len)));
1300             }
1301 0           PUTBACK;
1302 0           call_sv(req->cb, G_DISCARD);
1303 0 0         FREETMPS; LEAVE;
1304 0           SvREFCNT_dec(req->cb);
1305             }
1306 0           Safefree(req);
1307 0           return;
1308             }
1309             }
1310             }
1311              
1312 0           static void nats_process_line(nats_t *self, char *line, size_t len)
1313             {
1314 0 0         if (len > 0 && line[len-1] == '\r')
    0          
1315 0           len--;
1316              
1317 0 0         if (len == 0) return;
1318              
1319             /* INFO */
1320 0 0         if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
    0          
    0          
1321 0 0         (line[1] == 'N' || line[1] == 'n') &&
    0          
1322 0 0         (line[2] == 'F' || line[2] == 'f') &&
    0          
1323 0 0         (line[3] == 'O' || line[3] == 'o') &&
    0          
1324 0 0         line[4] == ' ') {
1325              
1326 0 0         CLEAR_HANDLER(self->server_info_json);
1327 0           self->server_info_json = newSVpvn(line + 5, len - 5);
1328              
1329             {
1330 0           char *p = line + 5;
1331 0           char *e = line + len;
1332             char *found;
1333              
1334 0           found = strstr(p, "\"max_payload\":");
1335 0 0         if (found && found < e) {
    0          
1336 0           found += 14;
1337 0           self->max_payload = 0;
1338 0 0         while (found < e && *found >= '0' && *found <= '9') {
    0          
    0          
1339 0           self->max_payload = self->max_payload * 10 + (*found - '0');
1340 0           found++;
1341             }
1342             }
1343              
1344 0           nats_parse_connect_urls(self, p, e - p);
1345              
1346             /* Parse nonce for NKey auth */
1347 0           found = strstr(p, "\"nonce\":\"");
1348 0 0         if (found && found < e) {
    0          
1349 0           found += 9;
1350 0           const char *nend = memchr(found, '"', e - found);
1351 0 0         if (nend) {
1352 0           size_t nlen = nend - found;
1353 0 0         if (self->server_nonce) Safefree(self->server_nonce);
1354 0           Newx(self->server_nonce, nlen + 1, char);
1355 0           memcpy(self->server_nonce, found, nlen);
1356 0           self->server_nonce[nlen] = '\0';
1357             }
1358             }
1359              
1360             /* Parse ldm (lame duck mode) */
1361 0           found = strstr(p, "\"ldm\":");
1362 0 0         if (found && found < e) {
    0          
1363 0           found += 6;
1364 0 0         while (found < e && (*found == ' ' || *found == '\t')) found++;
    0          
    0          
1365 0 0         if (found < e && *found == 't') {
    0          
1366 0 0         if (!self->ldm) {
1367 0           self->ldm = 1;
1368 0 0         if (self->on_ldm) {
1369 0           dSP;
1370 0           ENTER; SAVETMPS;
1371 0 0         PUSHMARK(SP);
1372 0           PUTBACK;
1373 0           call_sv(self->on_ldm, G_DISCARD);
1374 0 0         FREETMPS; LEAVE;
1375             }
1376             }
1377             }
1378             }
1379             }
1380              
1381 0 0         if (self->connecting) {
1382             #ifdef HAVE_OPENSSL
1383 0 0         if (self->tls && !self->ssl) {
    0          
1384 0 0         if (nats_ssl_setup(self) != 0) {
1385 0           nats_ssl_fail(self, "SSL setup failed");
1386 0           return;
1387             }
1388 0           self->ssl_handshaking = 1;
1389 0           int hret = nats_ssl_handshake(self);
1390 0 0         if (hret < 0) {
1391 0           nats_ssl_fail(self, "SSL handshake failed");
1392 0           return;
1393             }
1394 0 0         if (hret == 1) {
1395 0           self->ssl_handshaking = 0;
1396 0           nats_send_connect(self);
1397 0           nats_try_write(self);
1398             }
1399             /* hret == 0: handshake in progress; nats_on_read will resume. */
1400 0           return;
1401             }
1402             #endif
1403 0           nats_send_connect(self);
1404 0           nats_try_write(self);
1405             }
1406 0           return;
1407             }
1408              
1409             /* MSG */
1410 0 0         if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
    0          
    0          
    0          
    0          
1411 0 0         if (nats_parse_msg_args(self, line, len) == 0) {
1412 0 0         if (self->msg_total_len > (size_t)self->max_payload) {
1413 0           nats_emit_error(self, "server sent message exceeding max_payload");
1414 0           nats_cleanup(self);
1415 0           return;
1416             }
1417 0           self->parse_state = PARSE_MSG_BODY;
1418             }
1419 0           return;
1420             }
1421              
1422             /* HMSG */
1423 0 0         if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
    0          
    0          
    0          
    0          
    0          
1424 0 0         if (nats_parse_hmsg_args(self, line, len) == 0) {
1425 0 0         if (self->msg_total_len > (size_t)self->max_payload) {
1426 0           nats_emit_error(self, "server sent message exceeding max_payload");
1427 0           nats_cleanup(self);
1428 0           return;
1429             }
1430 0           self->parse_state = PARSE_MSG_BODY;
1431             }
1432 0           return;
1433             }
1434              
1435             /* PING */
1436 0 0         if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
    0          
    0          
    0          
    0          
1437 0           wbuf_append(self, "PONG\r\n", 6);
1438 0           nats_try_write(self);
1439 0           return;
1440             }
1441              
1442             /* PONG */
1443 0 0         if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
    0          
    0          
    0          
    0          
1444 0 0         if (self->pings_outstanding > 0)
1445 0           self->pings_outstanding--;
1446              
1447 0 0         if (self->connecting && !self->connected) {
    0          
1448 0           self->connecting = 0;
1449 0           self->connected = 1;
1450 0           self->reconnect_attempts = 0;
1451              
1452 0 0         if (self->connect_timer_active) {
1453 0           ev_timer_stop(self->loop, &self->connect_timer);
1454 0           self->connect_timer_active = 0;
1455             }
1456              
1457 0           nats_start_ping_timer(self);
1458              
1459 0 0         if (self->on_connect) {
1460 0           dSP;
1461 0           ENTER; SAVETMPS;
1462 0 0         PUSHMARK(SP);
1463 0           PUTBACK;
1464 0           call_sv(self->on_connect, G_DISCARD);
1465 0 0         FREETMPS; LEAVE;
1466             }
1467              
1468 0           nats_drain_waiting(self);
1469             }
1470              
1471             /* Fire flush/PONG callback (one per PONG, FIFO) */
1472 0 0         if (!ngx_queue_empty(&self->pong_cbs)) {
1473 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
1474 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
1475 0           ngx_queue_remove(pq);
1476 0 0         if (pcb->cb) {
1477 0           dSP;
1478 0           ENTER; SAVETMPS;
1479 0 0         PUSHMARK(SP);
1480 0 0         EXTEND(SP, 1);
1481 0           PUSHs(&PL_sv_undef); /* success: no error */
1482 0           PUTBACK;
1483 0           call_sv(pcb->cb, G_DISCARD);
1484 0 0         FREETMPS; LEAVE;
1485 0           SvREFCNT_dec(pcb->cb);
1486 0 0         } else if (self->draining) {
1487             /* Drain marker: fire drain callback and disconnect */
1488 0           self->draining = 0;
1489 0 0         if (self->drain_cb) {
1490 0           dSP;
1491 0           ENTER; SAVETMPS;
1492 0 0         PUSHMARK(SP);
1493 0 0         EXTEND(SP, 1);
1494 0           PUSHs(&PL_sv_undef); /* success: no error */
1495 0           PUTBACK;
1496 0           call_sv(self->drain_cb, G_DISCARD);
1497 0 0         FREETMPS; LEAVE;
1498 0 0         CLEAR_HANDLER(self->drain_cb);
1499             }
1500 0           Safefree(pcb);
1501 0           self->intentional_disconnect = 1;
1502 0           nats_cleanup(self);
1503 0           return;
1504             }
1505 0           Safefree(pcb);
1506             }
1507              
1508 0           return;
1509             }
1510              
1511             /* +OK */
1512 0 0         if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
    0          
    0          
    0          
1513 0           return;
1514             }
1515              
1516             /* -ERR */
1517 0 0         if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
    0          
    0          
    0          
    0          
1518 0           char *msg = line + 4;
1519 0           size_t mlen = len - 4;
1520 0 0         while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
    0          
    0          
1521 0 0         while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
    0          
1522              
1523             char errbuf[512];
1524 0           snprintf(errbuf, sizeof(errbuf), "%.*s", (int)mlen, msg);
1525              
1526 0 0         if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
    0          
1527 0           self->intentional_disconnect = 1;
1528             }
1529 0           nats_emit_error(self, errbuf);
1530 0           nats_cleanup(self);
1531 0           return;
1532             }
1533             }
1534              
1535             /* ================================================================
1536             * IO callbacks
1537             * ================================================================ */
1538              
1539 0           static void nats_on_read(struct ev_loop *loop, ev_io *w, int revents)
1540             {
1541 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, rio));
1542             (void)revents;
1543              
1544             #ifdef HAVE_OPENSSL
1545 0 0         if (self->ssl_handshaking) {
1546 0           int hret = nats_ssl_handshake(self);
1547 0 0         if (hret == 0) return;
1548 0 0         if (hret < 0) {
1549 0           nats_ssl_fail(self, "SSL handshake failed");
1550 0           return;
1551             }
1552 0           self->ssl_handshaking = 0;
1553 0 0         if (self->writing) {
1554 0           ev_io_stop(self->loop, &self->wio);
1555 0           self->writing = 0;
1556             }
1557             /* Handshake completed after the post-INFO upgrade. Send CONNECT
1558             over the now-encrypted channel. */
1559 0 0         if (self->connecting) {
1560 0           nats_send_connect(self);
1561 0           nats_try_write(self);
1562             }
1563             }
1564             #endif
1565              
1566 0           buf_ensure(&self->rbuf, &self->rbuf_cap, self->rbuf_len + BUF_INIT_SIZE);
1567              
1568 0           ssize_t n = nats_io_read(self, self->rbuf + self->rbuf_len,
1569 0           self->rbuf_cap - self->rbuf_len);
1570              
1571 0 0         if (n <= 0) {
1572 0 0         if (n == 0) {
1573 0           nats_cleanup(self);
1574 0 0         } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
    0          
    0          
1575 0           nats_emit_error(self, strerror(errno));
1576 0           nats_cleanup(self);
1577             }
1578 0           return;
1579             }
1580              
1581 0           self->rbuf_len += n;
1582              
1583 0           size_t consumed = 0;
1584              
1585 0 0         while (consumed < self->rbuf_len) {
1586 0 0         if (self->fd < 0) break;
1587             #ifdef HAVE_OPENSSL
1588             /* If processing INFO triggered a TLS upgrade, stop parsing rbuf as
1589             plaintext — subsequent bytes belong to the TLS handshake. */
1590 0 0         if (self->ssl_handshaking) break;
1591             #endif
1592              
1593 0 0         if (self->parse_state == PARSE_OP) {
1594 0           char *start = self->rbuf + consumed;
1595 0           char *nl = (char *)memchr(start, '\n', self->rbuf_len - consumed);
1596 0 0         if (!nl) break;
1597              
1598 0           size_t line_len = nl - start;
1599 0           nats_process_line(self, start, line_len);
1600 0           consumed += line_len + 1;
1601              
1602 0 0         } else if (self->parse_state == PARSE_MSG_BODY) {
1603 0           size_t need = self->msg_total_len + 2;
1604 0           size_t avail = self->rbuf_len - consumed;
1605 0 0         if (avail < need) break;
1606              
1607 0           char *payload = self->rbuf + consumed;
1608              
1609             /* Dispatch inbox responses, skip normal sub delivery for inbox */
1610 0 0         if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
    0          
1611 0           const char *hdrs = NULL;
1612 0           size_t hdrs_len = 0;
1613 0           const char *body = payload;
1614 0           size_t body_len = self->msg_total_len;
1615              
1616 0 0         if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
    0          
1617 0 0         self->msg_hdr_len <= self->msg_total_len) {
1618 0           hdrs = payload;
1619 0           hdrs_len = self->msg_hdr_len;
1620 0           body = payload + self->msg_hdr_len;
1621 0           body_len = self->msg_total_len - self->msg_hdr_len;
1622             }
1623              
1624 0           self->msgs_in++;
1625 0           self->bytes_in += self->msg_total_len;
1626 0           nats_check_inbox_response(self, self->rbuf + self->msg_subject_off, self->msg_subject_len,
1627             body, body_len, hdrs, hdrs_len);
1628             } else {
1629 0           nats_process_msg(self, payload, self->msg_total_len);
1630             }
1631              
1632 0           consumed += need;
1633 0           self->parse_state = PARSE_OP;
1634             }
1635             }
1636              
1637 0 0         if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
    0          
    0          
1638 0           self->rbuf_len -= consumed;
1639 0 0         if (self->rbuf_len > 0)
1640 0           memmove(self->rbuf, self->rbuf + consumed, self->rbuf_len);
1641             }
1642             }
1643              
1644 0           static void nats_try_write(nats_t *self)
1645             {
1646 0 0         if (self->fd < 0) return;
1647              
1648 0 0         while (self->wbuf_off < self->wbuf_len) {
1649 0           ssize_t n = nats_io_write(self, self->wbuf + self->wbuf_off,
1650 0           self->wbuf_len - self->wbuf_off);
1651 0 0         if (n <= 0) {
1652 0 0         if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
    0          
    0          
1653 0 0         if (!self->writing) {
1654 0           ev_io_start(self->loop, &self->wio);
1655 0           self->writing = 1;
1656             }
1657 0           return;
1658             }
1659 0 0         if (errno == EINTR)
1660 0           continue;
1661 0           nats_emit_error(self, strerror(errno));
1662 0           nats_cleanup(self);
1663 0           return;
1664             }
1665 0           self->wbuf_off += n;
1666             }
1667              
1668 0           self->wbuf_off = 0;
1669 0           self->wbuf_len = 0;
1670              
1671 0 0         if (self->writing) {
1672 0           ev_io_stop(self->loop, &self->wio);
1673 0           self->writing = 0;
1674             }
1675             }
1676              
1677 0           static void nats_on_write(struct ev_loop *loop, ev_io *w, int revents)
1678             {
1679 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, wio));
1680             (void)loop; (void)revents;
1681              
1682 0 0         if (self->connecting && !self->connected) {
    0          
1683 0           int err = 0;
1684 0           socklen_t errlen = sizeof(err);
1685 0           getsockopt(self->fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
1686 0 0         if (err) {
1687 0           nats_emit_error(self, strerror(err));
1688 0           nats_cleanup(self);
1689 0           return;
1690             }
1691             /* TCP connect complete. NATS speaks plain text until INFO arrives;
1692             if TLS is configured, we upgrade once we've parsed INFO. So just
1693             switch to reading and wait for the server's INFO greeting. */
1694 0 0         if (self->writing) {
1695 0           ev_io_stop(self->loop, &self->wio);
1696 0           self->writing = 0;
1697             }
1698 0 0         if (!self->reading) {
1699 0           ev_io_start(self->loop, &self->rio);
1700 0           self->reading = 1;
1701             }
1702 0           return;
1703             }
1704              
1705 0           nats_try_write(self);
1706             }
1707              
1708             /* ================================================================
1709             * Timer callbacks
1710             * ================================================================ */
1711              
1712 0           static void nats_on_connect_timeout(struct ev_loop *loop, ev_timer *w, int revents)
1713             {
1714 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, connect_timer));
1715             (void)loop; (void)revents;
1716 0           self->connect_timer_active = 0;
1717 0           nats_emit_error(self, "connect timeout");
1718 0           nats_cleanup(self);
1719 0           }
1720              
1721 0           static void nats_on_reconnect_timer(struct ev_loop *loop, ev_timer *w, int revents)
1722             {
1723 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, reconnect_timer));
1724             (void)loop; (void)revents;
1725 0           self->reconnect_timer_active = 0;
1726 0           self->reconnect_attempts++;
1727 0           self->intentional_disconnect = 0;
1728 0 0         if (self->server_pool_count > 0)
1729 0           nats_next_server(self);
1730 0           nats_do_connect(self);
1731 0           }
1732              
1733 0           static void nats_start_ping_timer(nats_t *self)
1734             {
1735 0 0         if (self->ping_interval_ms <= 0) return;
1736 0           double interval = self->ping_interval_ms / 1000.0;
1737 0           ev_timer_set(&self->ping_timer, interval, interval);
1738 0           ev_timer_start(self->loop, &self->ping_timer);
1739 0           self->ping_timer_active = 1;
1740             }
1741              
1742 0           static void nats_on_ping_timer(struct ev_loop *loop, ev_timer *w, int revents)
1743             {
1744 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, ping_timer));
1745             (void)loop; (void)revents;
1746              
1747 0           self->pings_outstanding++;
1748 0 0         if (self->max_pings_outstanding > 0 &&
1749 0 0         self->pings_outstanding > self->max_pings_outstanding) {
1750 0           nats_emit_error(self, "stale connection");
1751 0           nats_cleanup(self);
1752 0           return;
1753             }
1754              
1755 0           wbuf_append(self, "PING\r\n", 6);
1756 0           nats_try_write(self);
1757             }
1758              
1759             /* ================================================================
1760             * Write coalescing via ev_prepare
1761             * ================================================================ */
1762              
1763 0           static void nats_on_prepare(struct ev_loop *loop, ev_prepare *w, int revents)
1764             {
1765 0           nats_t *self = (nats_t *)((char *)w - offsetof(nats_t, prepare_watcher));
1766             (void)loop; (void)revents;
1767              
1768 0 0         if (self->wbuf_dirty && self->fd >= 0) {
    0          
1769 0           self->wbuf_dirty = 0;
1770 0           nats_try_write(self);
1771             }
1772 0           }
1773              
1774 0           static void nats_schedule_write(nats_t *self)
1775             {
1776 0 0         if (self->batch_mode) return; /* writes batched, flushed on batch end */
1777              
1778 0           self->wbuf_dirty = 1;
1779 0 0         if (!self->prepare_active) {
1780 0           ev_prepare_start(self->loop, &self->prepare_watcher);
1781 0           self->prepare_active = 1;
1782             }
1783              
1784             /* Slow consumer detection */
1785 0 0         if (self->slow_consumer_bytes > 0 &&
1786 0 0         (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes) {
1787 0 0         if (self->on_slow_consumer) {
1788 0           dSP;
1789 0           ENTER; SAVETMPS;
1790 0 0         PUSHMARK(SP);
1791 0 0         EXTEND(SP, 1);
1792 0           PUSHs(sv_2mortal(newSVuv(self->wbuf_len - self->wbuf_off)));
1793 0           PUTBACK;
1794 0           call_sv(self->on_slow_consumer, G_DISCARD);
1795 0 0         FREETMPS; LEAVE;
1796             }
1797             }
1798             }
1799              
1800             /* ================================================================
1801             * Request timeout
1802             * ================================================================ */
1803              
1804 0           static void nats_on_request_timeout(struct ev_loop *loop, ev_timer *w, int revents)
1805             {
1806 0           nats_req_t *req = (nats_req_t *)((char *)w - offsetof(nats_req_t, timer));
1807             (void)loop; (void)revents;
1808 0           req->timer_active = 0;
1809              
1810 0           ngx_queue_remove(&req->queue);
1811              
1812 0 0         if (req->cb) {
1813 0           dSP;
1814 0           ENTER; SAVETMPS;
1815 0 0         PUSHMARK(SP);
1816 0 0         EXTEND(SP, 2);
1817 0           PUSHs(&PL_sv_undef);
1818 0           PUSHs(sv_2mortal(newSVpvn("request timeout", 15)));
1819 0           PUTBACK;
1820 0           call_sv(req->cb, G_DISCARD);
1821 0 0         FREETMPS; LEAVE;
1822 0           SvREFCNT_dec(req->cb);
1823             }
1824 0           Safefree(req);
1825 0           }
1826              
1827             /* ================================================================
1828             * Write queue (for buffering during connect/reconnect)
1829             * ================================================================ */
1830              
1831 0           static void nats_drain_waiting(nats_t *self)
1832             {
1833 0 0         while (!ngx_queue_empty(&self->wait_queue)) {
1834 0           ngx_queue_t *q = ngx_queue_head(&self->wait_queue);
1835 0           nats_pub_t *pub = ngx_queue_data(q, nats_pub_t, queue);
1836 0           ngx_queue_remove(q);
1837 0           self->waiting_count--;
1838              
1839 0           wbuf_append(self, pub->data, pub->len);
1840 0           Safefree(pub->data);
1841 0           Safefree(pub);
1842             }
1843              
1844 0 0         if (self->wbuf_len > self->wbuf_off)
1845 0           nats_try_write(self);
1846 0           }
1847              
1848 0           static void nats_queue_write(nats_t *self, const char *data, size_t len)
1849             {
1850 0 0         if (self->connected) {
1851 0           wbuf_append(self, data, len);
1852 0           nats_schedule_write(self);
1853 0 0         } else if (self->connecting ||
1854 0 0         (self->reconnect_enabled && self->reconnect_timer_active)) {
    0          
1855             nats_pub_t *pub;
1856 0           Newxz(pub, 1, nats_pub_t);
1857 0           Newx(pub->data, len, char);
1858 0           memcpy(pub->data, data, len);
1859 0           pub->len = len;
1860 0           ngx_queue_insert_tail(&self->wait_queue, &pub->queue);
1861 0           self->waiting_count++;
1862             } else {
1863 0           croak("not connected");
1864             }
1865 0           }
1866              
1867             /* ================================================================
1868             * TCP connection
1869             * ================================================================ */
1870              
1871 0           static void nats_schedule_reconnect(nats_t *self)
1872             {
1873             /* If a user callback (on_disconnect/on_error) already kicked off a fresh
1874             connect, don't stomp on it. */
1875 0 0         if (self->fd >= 0 || self->connecting || self->reconnect_timer_active)
    0          
    0          
1876 0           return;
1877 0 0         if (!self->intentional_disconnect && self->reconnect_enabled) {
    0          
1878 0 0         if (self->max_reconnect_attempts == 0 ||
1879 0 0         self->reconnect_attempts < self->max_reconnect_attempts) {
1880             /* Exponential backoff with jitter */
1881 0           int shift = self->reconnect_attempts > 5 ? 5 : self->reconnect_attempts;
1882 0           int delay = self->reconnect_delay_ms * (1 << shift);
1883 0 0         if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
    0          
1884 0           delay = self->max_reconnect_delay_ms;
1885 0           double base = delay / 1000.0;
1886             unsigned char r[2];
1887 0           nats_random_bytes(r, 2);
1888 0           unsigned int rv = ((unsigned)r[0] << 8) | r[1];
1889 0           double jitter = base * (0.5 + (double)(rv % 1000) / 2000.0);
1890 0           ev_timer_set(&self->reconnect_timer, jitter, 0.0);
1891 0           ev_timer_start(self->loop, &self->reconnect_timer);
1892 0           self->reconnect_timer_active = 1;
1893             } else {
1894 0           nats_emit_error(self, "max reconnect attempts reached");
1895             }
1896             }
1897             }
1898              
1899 6           static void nats_free_server_pool(nats_t *self)
1900             {
1901 6 50         while (!ngx_queue_empty(&self->server_pool)) {
1902 0           ngx_queue_t *q = ngx_queue_head(&self->server_pool);
1903 0           nats_server_t *srv = ngx_queue_data(q, nats_server_t, queue);
1904 0           ngx_queue_remove(q);
1905 0           Safefree(srv->host);
1906 0           Safefree(srv);
1907 0           self->server_pool_count--;
1908             }
1909 6           }
1910              
1911 0           static void nats_parse_connect_urls(nats_t *self, const char *json, size_t len)
1912             {
1913 0           const char *key = "\"connect_urls\":[";
1914 0           const char *p = strstr(json, key);
1915 0 0         if (!p || p >= json + len) return;
    0          
1916              
1917 0           p += strlen(key);
1918 0           const char *end = json + len;
1919              
1920 0           nats_free_server_pool(self);
1921              
1922 0 0         while (p < end && *p != ']') {
    0          
1923 0 0         while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
    0          
    0          
    0          
1924 0 0         if (p >= end || *p == ']') break;
    0          
1925              
1926 0           const char *start = p;
1927 0 0         while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
    0          
    0          
    0          
1928              
1929 0           size_t url_len = p - start;
1930 0 0         if (url_len > 0) {
1931             /* Parse host:port */
1932 0           const char *colon = NULL;
1933             const char *s;
1934 0 0         for (s = start + url_len - 1; s >= start; s--) {
1935 0 0         if (*s == ':') { colon = s; break; }
1936             }
1937              
1938             nats_server_t *srv;
1939 0           Newxz(srv, 1, nats_server_t);
1940 0 0         if (colon && colon > start) {
    0          
1941 0           size_t hlen = colon - start;
1942 0           Newx(srv->host, hlen + 1, char);
1943 0           memcpy(srv->host, start, hlen);
1944 0           srv->host[hlen] = '\0';
1945 0           srv->port = 0;
1946 0           const char *dp = colon + 1;
1947 0 0         while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
    0          
    0          
1948 0           srv->port = srv->port * 10 + (*dp - '0');
1949 0           dp++;
1950             }
1951             } else {
1952 0           Newx(srv->host, url_len + 1, char);
1953 0           memcpy(srv->host, start, url_len);
1954 0           srv->host[url_len] = '\0';
1955 0           srv->port = 4222;
1956             }
1957 0           ngx_queue_insert_tail(&self->server_pool, &srv->queue);
1958 0           self->server_pool_count++;
1959             }
1960              
1961 0 0         while (p < end && *p == '"') p++;
    0          
1962             }
1963             }
1964              
1965 0           static void nats_next_server(nats_t *self)
1966             {
1967 0 0         if (ngx_queue_empty(&self->server_pool)) return;
1968              
1969             /* Rotate: move head to tail, use new head */
1970 0           ngx_queue_t *q = ngx_queue_head(&self->server_pool);
1971 0           nats_server_t *srv = ngx_queue_data(q, nats_server_t, queue);
1972              
1973 0           nats_set_str(&self->host, srv->host);
1974 0           self->port = srv->port;
1975              
1976             /* Rotate this server to end of pool */
1977 0           ngx_queue_remove(q);
1978 0           ngx_queue_insert_tail(&self->server_pool, q);
1979             }
1980              
1981 0           static void nats_connect_tcp(nats_t *self)
1982             {
1983             struct addrinfo hints, *res, *rp;
1984             char port_str[8];
1985 0           int fd = -1;
1986              
1987 0           memset(&hints, 0, sizeof(hints));
1988 0           hints.ai_family = AF_UNSPEC;
1989 0           hints.ai_socktype = SOCK_STREAM;
1990              
1991 0           snprintf(port_str, sizeof(port_str), "%d", self->port);
1992              
1993 0           int rv = getaddrinfo(self->host, port_str, &hints, &res);
1994 0 0         if (rv != 0) {
1995 0           nats_emit_error(self, gai_strerror(rv));
1996 0           nats_schedule_reconnect(self);
1997 0           return;
1998             }
1999              
2000 0 0         for (rp = res; rp != NULL; rp = rp->ai_next) {
2001 0           fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
2002 0 0         if (fd < 0) continue;
2003              
2004 0           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
2005              
2006 0           int flag = 1;
2007 0           setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
2008              
2009 0 0         if (self->keepalive > 0) {
2010 0           setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
2011             #ifdef TCP_KEEPIDLE
2012 0           setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &self->keepalive, sizeof(self->keepalive));
2013             #endif
2014             }
2015              
2016 0           rv = connect(fd, rp->ai_addr, rp->ai_addrlen);
2017 0 0         if (rv == 0 || errno == EINPROGRESS) break;
    0          
2018              
2019 0           close(fd);
2020 0           fd = -1;
2021             }
2022              
2023 0           freeaddrinfo(res);
2024              
2025 0 0         if (fd < 0) {
2026 0           nats_emit_error(self, "connection failed");
2027 0           nats_schedule_reconnect(self);
2028 0           return;
2029             }
2030              
2031 0           self->fd = fd;
2032 0           self->connecting = 1;
2033 0           self->connected = 0;
2034              
2035 0           ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
2036 0           ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);
2037              
2038 0 0         if (self->priority) {
2039 0           ev_set_priority(&self->rio, self->priority);
2040 0           ev_set_priority(&self->wio, self->priority);
2041             }
2042              
2043 0           ev_io_start(self->loop, &self->rio);
2044 0           self->reading = 1;
2045              
2046 0 0         if (rv != 0) {
2047 0           ev_io_start(self->loop, &self->wio);
2048 0           self->writing = 1;
2049             }
2050              
2051 0 0         if (self->connect_timeout_ms > 0) {
2052 0           ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
2053 0           ev_timer_start(self->loop, &self->connect_timer);
2054 0           self->connect_timer_active = 1;
2055             }
2056             }
2057              
2058 0           static void nats_connect_unix(nats_t *self)
2059             {
2060             struct sockaddr_un addr;
2061             int fd;
2062              
2063 0 0         if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
    0          
2064 0           nats_emit_error(self, "invalid unix socket path");
2065 0           nats_schedule_reconnect(self);
2066 0           return;
2067             }
2068              
2069 0           fd = socket(AF_UNIX, SOCK_STREAM, 0);
2070 0 0         if (fd < 0) {
2071 0           nats_emit_error(self, strerror(errno));
2072 0           nats_schedule_reconnect(self);
2073 0           return;
2074             }
2075              
2076 0           fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
2077              
2078 0           memset(&addr, 0, sizeof(addr));
2079 0           addr.sun_family = AF_UNIX;
2080 0           strncpy(addr.sun_path, self->path, sizeof(addr.sun_path) - 1);
2081              
2082 0           int rv = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
2083 0 0         if (rv != 0 && errno != EINPROGRESS) {
    0          
2084 0           close(fd);
2085 0           nats_emit_error(self, strerror(errno));
2086 0           nats_schedule_reconnect(self);
2087 0           return;
2088             }
2089              
2090 0           self->fd = fd;
2091 0           self->connecting = 1;
2092 0           self->connected = 0;
2093              
2094 0           ev_io_init(&self->rio, nats_on_read, fd, EV_READ);
2095 0           ev_io_init(&self->wio, nats_on_write, fd, EV_WRITE);
2096              
2097 0 0         if (self->priority) {
2098 0           ev_set_priority(&self->rio, self->priority);
2099 0           ev_set_priority(&self->wio, self->priority);
2100             }
2101              
2102 0           ev_io_start(self->loop, &self->rio);
2103 0           self->reading = 1;
2104              
2105 0 0         if (rv != 0) {
2106 0           ev_io_start(self->loop, &self->wio);
2107 0           self->writing = 1;
2108             }
2109              
2110 0 0         if (self->connect_timeout_ms > 0) {
2111 0           ev_timer_set(&self->connect_timer, self->connect_timeout_ms / 1000.0, 0.0);
2112 0           ev_timer_start(self->loop, &self->connect_timer);
2113 0           self->connect_timer_active = 1;
2114             }
2115             }
2116              
2117             /* Helper: connect via path or host */
2118 0           static void nats_do_connect(nats_t *self)
2119             {
2120 0 0         if (self->path)
2121 0           nats_connect_unix(self);
2122             else
2123 0           nats_connect_tcp(self);
2124 0           }
2125              
2126             /* ================================================================
2127             * XS interface
2128             * ================================================================ */
2129              
2130             MODULE = EV::Nats PACKAGE = EV::Nats
2131              
2132             PROTOTYPES: DISABLE
2133              
2134             BOOT:
2135             {
2136 17 50         I_EV_API("EV::Nats");
    50          
    50          
2137             {
2138 17           HV *stash = gv_stashpv("EV::Nats", GV_ADD);
2139             #ifdef HAVE_OPENSSL
2140 17           newCONSTSUB(stash, "HAS_TLS", newSViv(1));
2141 17           newCONSTSUB(stash, "HAS_NKEY", newSViv(1));
2142             #else
2143             newCONSTSUB(stash, "HAS_TLS", newSViv(0));
2144             newCONSTSUB(stash, "HAS_NKEY", newSViv(0));
2145             #endif
2146             }
2147             }
2148              
2149             EV::Nats
2150             new(class, ...)
2151             char *class
2152             PREINIT:
2153             nats_t *self;
2154             int i;
2155             CODE:
2156 6           Newxz(self, 1, nats_t);
2157 6           self->magic = NATS_MAGIC_ALIVE;
2158 6           self->fd = -1;
2159 6           self->loop = EV_DEFAULT;
2160 6           self->max_payload = DEFAULT_MAX_PAYLOAD;
2161 6           self->port = 4222;
2162 6           self->echo = 1;
2163 6           self->ping_interval_ms = 120000;
2164 6           self->max_pings_outstanding = 2;
2165 6           self->reconnect_delay_ms = 2000;
2166 6           self->max_reconnect_delay_ms = 30000;
2167 6           self->max_reconnect_attempts = 60;
2168              
2169 6           ngx_queue_init(&self->subs);
2170 6           ngx_queue_init(&self->wait_queue);
2171 6           ngx_queue_init(&self->req_queue);
2172              
2173 6           self->next_sid = 1;
2174              
2175 6           ev_timer_init(&self->connect_timer, nats_on_connect_timeout, 0., 0.);
2176 6           ev_timer_init(&self->reconnect_timer, nats_on_reconnect_timer, 0., 0.);
2177 6           ev_timer_init(&self->ping_timer, nats_on_ping_timer, 0., 0.);
2178 6           ev_prepare_init(&self->prepare_watcher, nats_on_prepare);
2179              
2180 6           ngx_queue_init(&self->server_pool);
2181 6           ngx_queue_init(&self->pong_cbs);
2182 6           self->sub_map = newHV();
2183              
2184 6           nats_setup_inbox(self);
2185              
2186 6 50         if (items > 1 && (items - 1) % 2 == 0) {
    0          
2187 0 0         for (i = 1; i < items; i += 2) {
2188 0           const char *key = SvPV_nolen(ST(i));
2189 0           SV *val = ST(i + 1);
2190              
2191 0 0         if (strcmp(key, "host") == 0) nats_set_str_sv(&self->host, val);
2192 0 0         else if (strcmp(key, "port") == 0) self->port = SvIV(val);
2193 0 0         else if (strcmp(key, "path") == 0) nats_set_str_sv(&self->path, val);
2194 0 0         else if (strcmp(key, "user") == 0) nats_set_str_sv(&self->user, val);
2195 0 0         else if (strcmp(key, "pass") == 0) nats_set_str_sv(&self->pass, val);
2196 0 0         else if (strcmp(key, "token") == 0) nats_set_str_sv(&self->token, val);
2197 0 0         else if (strcmp(key, "name") == 0) nats_set_str_sv(&self->name, val);
2198 0 0         else if (strcmp(key, "on_error") == 0)
2199 0           self->on_error = newSVsv(val);
2200 0 0         else if (strcmp(key, "on_connect") == 0)
2201 0           self->on_connect = newSVsv(val);
2202 0 0         else if (strcmp(key, "on_disconnect") == 0)
2203 0           self->on_disconnect = newSVsv(val);
2204 0 0         else if (strcmp(key, "verbose") == 0)
2205 0           self->verbose = SvTRUE(val) ? 1 : 0;
2206 0 0         else if (strcmp(key, "pedantic") == 0)
2207 0           self->pedantic = SvTRUE(val) ? 1 : 0;
2208 0 0         else if (strcmp(key, "echo") == 0)
2209 0           self->echo = SvTRUE(val) ? 1 : 0;
2210 0 0         else if (strcmp(key, "no_responders") == 0)
2211 0           self->no_responders = SvTRUE(val) ? 1 : 0;
2212 0 0         else if (strcmp(key, "reconnect") == 0)
2213 0           self->reconnect_enabled = SvTRUE(val) ? 1 : 0;
2214 0 0         else if (strcmp(key, "reconnect_delay") == 0)
2215 0           self->reconnect_delay_ms = SvIV(val);
2216 0 0         else if (strcmp(key, "max_reconnect_attempts") == 0)
2217 0           self->max_reconnect_attempts = SvIV(val);
2218 0 0         else if (strcmp(key, "max_reconnect_delay") == 0)
2219 0           self->max_reconnect_delay_ms = SvIV(val);
2220 0 0         else if (strcmp(key, "connect_timeout") == 0)
2221 0           self->connect_timeout_ms = SvIV(val);
2222 0 0         else if (strcmp(key, "ping_interval") == 0)
2223 0           self->ping_interval_ms = SvIV(val);
2224 0 0         else if (strcmp(key, "max_pings_outstanding") == 0)
2225 0           self->max_pings_outstanding = SvIV(val);
2226 0 0         else if (strcmp(key, "priority") == 0)
2227 0           self->priority = SvIV(val);
2228 0 0         else if (strcmp(key, "keepalive") == 0)
2229 0           self->keepalive = SvIV(val);
2230 0 0         #ifdef HAVE_OPENSSL
2231 0           else if (strcmp(key, "tls") == 0)
2232 0 0         self->tls = SvTRUE(val) ? 1 : 0;
2233 0           else if (strcmp(key, "tls_ca_file") == 0)
2234 0 0         nats_set_str_sv(&self->tls_ca_file, val);
2235 0           else if (strcmp(key, "tls_skip_verify") == 0)
2236 0 0         self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
2237 0           else if (strcmp(key, "nkey_seed") == 0)
2238 0 0         nats_set_str_sv(&self->nkey_seed, val);
2239 0           #endif
2240 0 0         else if (strcmp(key, "jwt") == 0)
2241 0           nats_set_str_sv(&self->jwt, val);
2242 0 0         else if (strcmp(key, "slow_consumer_bytes") == 0)
2243 0           self->slow_consumer_bytes = (size_t)SvUV(val);
2244 0 0         else if (strcmp(key, "on_slow_consumer") == 0)
2245 0           self->on_slow_consumer = newSVsv(val);
2246 0 0         else if (strcmp(key, "on_lame_duck") == 0)
2247 0           self->on_ldm = newSVsv(val);
2248             else if (strcmp(key, "loop") == 0)
2249 0           self->loop = (struct ev_loop *)SvIVx(SvRV(val));
2250             else
2251             warn("EV::Nats::new: unknown option '%s'", key);
2252             }
2253 6 50         }
    50          
2254 0            
2255             if (self->host || self->path)
2256 6           nats_do_connect(self);
2257              
2258             RETVAL = self;
2259             OUTPUT:
2260             RETVAL
2261              
2262             void
2263             connect(self, host, port = 4222)
2264             EV::Nats self
2265             char *host
2266             int port
2267             CODE:
2268 0 0         if (self->connected || self->connecting)
    0          
2269 0           croak("already connected");
2270              
2271 0 0         if (self->reconnect_timer_active) {
2272 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2273 0           self->reconnect_timer_active = 0;
2274             }
2275              
2276 0           nats_set_str(&self->path, NULL);
2277 0           nats_set_str(&self->host, host);
2278 0           self->port = port;
2279 0           self->intentional_disconnect = 0;
2280              
2281 0           nats_connect_tcp(self);
2282              
2283             void
2284             connect_unix(self, path)
2285             EV::Nats self
2286             char *path
2287             CODE:
2288 0 0         if (self->connected || self->connecting)
    0          
2289 0           croak("already connected");
2290              
2291 0 0         if (self->reconnect_timer_active) {
2292 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2293 0           self->reconnect_timer_active = 0;
2294             }
2295              
2296 0           nats_set_str(&self->host, NULL);
2297 0           nats_set_str(&self->path, path);
2298 0           self->intentional_disconnect = 0;
2299              
2300 0           nats_connect_unix(self);
2301              
2302             void
2303             disconnect(self)
2304             EV::Nats self
2305             CODE:
2306 0           self->intentional_disconnect = 1;
2307 0 0         if (self->reconnect_timer_active) {
2308 0           ev_timer_stop(self->loop, &self->reconnect_timer);
2309 0           self->reconnect_timer_active = 0;
2310             }
2311 0           nats_skip_waiting(self);
2312 0           nats_cleanup(self);
2313              
2314             int
2315             is_connected(self)
2316             EV::Nats self
2317             CODE:
2318 0 0         RETVAL = self->connected;
2319             OUTPUT:
2320             RETVAL
2321              
2322             void
2323             publish(self, subject, payload = &PL_sv_undef, reply = NULL)
2324             EV::Nats self
2325             SV *subject
2326             SV *payload
2327             const char *reply
2328             PREINIT:
2329             char hdr[MAX_CONTROL_LINE];
2330             STRLEN subj_len, pay_len;
2331             const char *subj_pv, *pay_pv;
2332             CODE:
2333 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2334              
2335 0           subj_pv = SvPV(subject, subj_len);
2336              
2337 0 0         if (SvOK(payload)) {
2338 0           pay_pv = SvPV(payload, pay_len);
2339             } else {
2340 0           pay_pv = "";
2341 0           pay_len = 0;
2342             }
2343              
2344 0 0         if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
    0          
2345 0           croak("payload exceeds max_payload (%d)", self->max_payload);
2346              
2347             int hdr_len;
2348 0 0         if (reply && *reply) {
    0          
2349 0           hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %s %lu\r\n",
2350             (int)subj_len, subj_pv, reply, (unsigned long)pay_len);
2351             } else {
2352 0           hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %lu\r\n",
2353             (int)subj_len, subj_pv, (unsigned long)pay_len);
2354             }
2355              
2356 0           self->msgs_out++;
2357 0           self->bytes_out += pay_len;
2358              
2359 0 0         if (self->connected) {
2360 0           wbuf_append(self, hdr, hdr_len);
2361 0 0         if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2362 0           wbuf_append(self, "\r\n", 2);
2363 0           nats_schedule_write(self);
2364             } else {
2365 0           size_t total = hdr_len + pay_len + 2;
2366             char *buf;
2367 0           Newx(buf, total, char);
2368 0           memcpy(buf, hdr, hdr_len);
2369 0 0         if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2370 0           buf[hdr_len + pay_len] = '\r';
2371 0           buf[hdr_len + pay_len + 1] = '\n';
2372 0           nats_queue_write(self, buf, total);
2373 0           Safefree(buf);
2374             }
2375              
2376             void
2377             hpublish(self, subject, headers, payload = &PL_sv_undef, reply = NULL)
2378             EV::Nats self
2379             SV *subject
2380             SV *headers
2381             SV *payload
2382             const char *reply
2383             PREINIT:
2384             char hdr[MAX_CONTROL_LINE];
2385             STRLEN subj_len, hdr_data_len, pay_len;
2386             const char *subj_pv, *hdr_data_pv, *pay_pv;
2387             CODE:
2388 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2389              
2390 0           subj_pv = SvPV(subject, subj_len);
2391 0           hdr_data_pv = SvPV(headers, hdr_data_len);
2392              
2393 0 0         if (SvOK(payload)) {
2394 0           pay_pv = SvPV(payload, pay_len);
2395             } else {
2396 0           pay_pv = "";
2397 0           pay_len = 0;
2398             }
2399              
2400 0           size_t total_size = hdr_data_len + pay_len;
2401 0 0         if (self->max_payload > 0 && total_size > (size_t)self->max_payload)
    0          
2402 0           croak("message exceeds max_payload (%d)", self->max_payload);
2403              
2404 0           self->msgs_out++;
2405 0           self->bytes_out += total_size;
2406              
2407             int cmd_len;
2408 0 0         if (reply && *reply) {
    0          
2409 0           cmd_len = snprintf(hdr, sizeof(hdr), "HPUB %.*s %s %lu %lu\r\n",
2410             (int)subj_len, subj_pv, reply,
2411             (unsigned long)hdr_data_len, (unsigned long)total_size);
2412             } else {
2413 0           cmd_len = snprintf(hdr, sizeof(hdr), "HPUB %.*s %lu %lu\r\n",
2414             (int)subj_len, subj_pv,
2415             (unsigned long)hdr_data_len, (unsigned long)total_size);
2416             }
2417              
2418 0 0         if (self->connected) {
2419 0           wbuf_append(self, hdr, cmd_len);
2420 0           wbuf_append(self, hdr_data_pv, hdr_data_len);
2421 0 0         if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2422 0           wbuf_append(self, "\r\n", 2);
2423 0           nats_schedule_write(self);
2424             } else {
2425 0           size_t total = cmd_len + total_size + 2;
2426             char *buf;
2427 0           Newx(buf, total, char);
2428 0           memcpy(buf, hdr, cmd_len);
2429 0           memcpy(buf + cmd_len, hdr_data_pv, hdr_data_len);
2430 0 0         if (pay_len > 0) memcpy(buf + cmd_len + hdr_data_len, pay_pv, pay_len);
2431 0           buf[cmd_len + total_size] = '\r';
2432 0           buf[cmd_len + total_size + 1] = '\n';
2433 0           nats_queue_write(self, buf, total);
2434 0           Safefree(buf);
2435             }
2436              
2437             UV
2438             subscribe(self, subject, cb, queue_group = NULL)
2439             EV::Nats self
2440             SV *subject
2441             SV *cb
2442             const char *queue_group
2443             PREINIT:
2444             nats_sub_t *sub;
2445             char buf[MAX_CONTROL_LINE];
2446             STRLEN subj_len;
2447             const char *subj_pv;
2448             CODE:
2449 0           subj_pv = SvPV(subject, subj_len);
2450              
2451 0           Newxz(sub, 1, nats_sub_t);
2452 0           sub->sid = self->next_sid++;
2453 0           sub->subject = newSVpvn(subj_pv, subj_len);
2454 0           sub->cb = newSVsv(cb);
2455 0 0         sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
    0          
2456 0           sub->max_msgs = 0;
2457 0           sub->received = 0;
2458 0           nats_register_sub(self, sub);
2459              
2460             int n;
2461 0 0         if (queue_group && *queue_group) {
    0          
2462 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %s %" UVuf "\r\n",
2463 0           (int)subj_len, subj_pv, queue_group, (UV)sub->sid);
2464             } else {
2465 0           n = snprintf(buf, sizeof(buf), "SUB %.*s %" UVuf "\r\n",
2466 0           (int)subj_len, subj_pv, (UV)sub->sid);
2467             }
2468              
2469 0 0         if (self->connected || self->connecting)
    0          
2470 0           nats_queue_write(self, buf, n);
2471              
2472 0 0         RETVAL = (UV)sub->sid;
2473             OUTPUT:
2474             RETVAL
2475              
2476             void
2477             unsubscribe(self, sid, max_msgs = 0)
2478             EV::Nats self
2479             UV sid
2480             int max_msgs
2481             PREINIT:
2482             char buf[64];
2483             CODE:
2484             /* Queue UNSUB while connected/connecting/reconnecting, mirroring
2485             subscribe(). When fully disconnected (no reconnect armed) the
2486             local-only update is enough; nats_resub_all will not resubscribe
2487             a removed sub on the next reconnect. */
2488 0 0         int can_queue = self->connected || self->connecting
2489 0 0         || (self->reconnect_enabled && self->reconnect_timer_active);
    0          
    0          
2490              
2491 0 0         if (max_msgs > 0) {
2492 0           nats_sub_t *sub = nats_find_sub(self, (uint64_t)sid);
2493 0 0         if (sub)
2494 0           sub->max_msgs = max_msgs;
2495              
2496 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf " %d\r\n", sid, max_msgs);
2497 0 0         if (can_queue)
2498 0           nats_queue_write(self, buf, n);
2499             } else {
2500 0           nats_sub_t *sub = nats_find_sub(self, (uint64_t)sid);
2501 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf "\r\n", sid);
2502 0 0         if (can_queue)
2503 0           nats_queue_write(self, buf, n);
2504 0 0         if (sub)
2505 0           nats_remove_sub(self, sub);
2506             }
2507              
2508             void
2509             request(self, subject, payload, cb, timeout_ms = 5000)
2510             EV::Nats self
2511             SV *subject
2512             SV *payload
2513             SV *cb
2514             int timeout_ms
2515             PREINIT:
2516             char reply_subj[80];
2517             char hdr[MAX_CONTROL_LINE];
2518             STRLEN subj_len, pay_len;
2519             const char *subj_pv, *pay_pv;
2520             CODE:
2521 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2522              
2523 0           subj_pv = SvPV(subject, subj_len);
2524 0 0         if (SvOK(payload)) {
2525 0           pay_pv = SvPV(payload, pay_len);
2526             } else {
2527 0           pay_pv = "";
2528 0           pay_len = 0;
2529             }
2530              
2531 0 0         if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
    0          
2532 0           croak("payload exceeds max_payload (%d)", self->max_payload);
2533              
2534 0 0         if (!self->inbox_sub_sid) {
2535             char inbox_wild[48];
2536 0           snprintf(inbox_wild, sizeof(inbox_wild), "%s*", self->inbox_prefix);
2537              
2538             nats_sub_t *sub;
2539 0           Newxz(sub, 1, nats_sub_t);
2540 0           sub->sid = self->next_sid++;
2541 0           sub->subject = newSVpv(inbox_wild, 0);
2542 0           sub->cb = NULL;
2543 0           sub->queue_group = NULL;
2544 0           nats_register_sub(self, sub);
2545 0           self->inbox_sub_sid = sub->sid;
2546              
2547             char sbuf[MAX_CONTROL_LINE];
2548 0           int sn = snprintf(sbuf, sizeof(sbuf), "SUB %s %" UVuf "\r\n",
2549 0           inbox_wild, (UV)sub->sid);
2550 0           nats_queue_write(self, sbuf, sn);
2551             }
2552              
2553 0           uint64_t req_id = self->next_req_id++;
2554 0           snprintf(reply_subj, sizeof(reply_subj), "%s%" UVuf, self->inbox_prefix, (UV)req_id);
2555              
2556             nats_req_t *req;
2557 0           Newxz(req, 1, nats_req_t);
2558 0           req->req_id = req_id;
2559 0           req->cb = newSVsv(cb);
2560 0           req->self = self;
2561 0           ngx_queue_insert_tail(&self->req_queue, &req->queue);
2562              
2563 0 0         if (timeout_ms > 0) {
2564 0           ev_timer_init(&req->timer, nats_on_request_timeout, timeout_ms / 1000.0, 0.0);
2565 0           ev_timer_start(self->loop, &req->timer);
2566 0           req->timer_active = 1;
2567             }
2568              
2569 0           int hdr_len = snprintf(hdr, sizeof(hdr), "PUB %.*s %s %lu\r\n",
2570             (int)subj_len, subj_pv, reply_subj, (unsigned long)pay_len);
2571              
2572 0           size_t total = hdr_len + pay_len + 2;
2573             char *buf;
2574 0           Newx(buf, total, char);
2575 0           memcpy(buf, hdr, hdr_len);
2576 0 0         if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2577 0           buf[hdr_len + pay_len] = '\r';
2578 0           buf[hdr_len + pay_len + 1] = '\n';
2579              
2580 0           nats_queue_write(self, buf, total);
2581 0           Safefree(buf);
2582              
2583             void
2584             ping(self)
2585             EV::Nats self
2586             CODE:
2587 0 0         NATS_CROAK_UNLESS_CONNECTED(self);
    0          
    0          
    0          
2588 0           wbuf_append(self, "PING\r\n", 6);
2589 0           nats_try_write(self);
2590              
2591             void
2592             flush(self, cb = NULL)
2593             EV::Nats self
2594             SV *cb
2595             CODE:
2596 0 0         if (!self->connected)
2597 0           croak("not connected");
2598 0           wbuf_append(self, "PING\r\n", 6);
2599 0           nats_try_write(self);
2600 0 0         if (cb && SvOK(cb)) {
    0          
2601             nats_pong_cb_t *pcb;
2602 0           Newxz(pcb, 1, nats_pong_cb_t);
2603 0           pcb->cb = newSVsv(cb);
2604 0           ngx_queue_insert_tail(&self->pong_cbs, &pcb->queue);
2605             }
2606              
2607             SV *
2608             server_info(self)
2609             EV::Nats self
2610             CODE:
2611 0 0         if (self->server_info_json)
2612 0           RETVAL = newSVsv(self->server_info_json);
2613             else
2614 0           RETVAL = &PL_sv_undef;
2615             OUTPUT:
2616             RETVAL
2617              
2618             int
2619             max_payload(self, ...)
2620             EV::Nats self
2621             CODE:
2622 0 0         if (items > 1)
2623 0           self->max_payload = SvIV(ST(1));
2624 0 0         RETVAL = self->max_payload;
2625             OUTPUT:
2626             RETVAL
2627              
2628             int
2629             waiting_count(self)
2630             EV::Nats self
2631             CODE:
2632 0 0         RETVAL = self->waiting_count;
2633             OUTPUT:
2634             RETVAL
2635              
2636             void
2637             skip_waiting(self)
2638             EV::Nats self
2639             CODE:
2640 0           nats_skip_waiting(self);
2641              
2642             void
2643             reconnect(self, enable, ...)
2644             EV::Nats self
2645             int enable
2646             CODE:
2647 0           self->reconnect_enabled = enable;
2648 0 0         if (items > 2) self->reconnect_delay_ms = SvIV(ST(2));
2649 0 0         if (items > 3) self->max_reconnect_attempts = SvIV(ST(3));
2650              
2651             int
2652             reconnect_enabled(self)
2653             EV::Nats self
2654             CODE:
2655 0 0         RETVAL = self->reconnect_enabled;
2656             OUTPUT:
2657             RETVAL
2658              
2659             int
2660             connect_timeout(self, ...)
2661             EV::Nats self
2662             CODE:
2663 0 0         if (items > 1)
2664 0           self->connect_timeout_ms = SvIV(ST(1));
2665 0 0         RETVAL = self->connect_timeout_ms;
2666             OUTPUT:
2667             RETVAL
2668              
2669             int
2670             ping_interval(self, ...)
2671             EV::Nats self
2672             CODE:
2673 0 0         if (items > 1)
2674 0           self->ping_interval_ms = SvIV(ST(1));
2675 0 0         RETVAL = self->ping_interval_ms;
2676             OUTPUT:
2677             RETVAL
2678              
2679             int
2680             max_pings_outstanding(self, ...)
2681             EV::Nats self
2682             CODE:
2683 0 0         if (items > 1)
2684 0           self->max_pings_outstanding = SvIV(ST(1));
2685 0 0         RETVAL = self->max_pings_outstanding;
2686             OUTPUT:
2687             RETVAL
2688              
2689             int
2690             priority(self, ...)
2691             EV::Nats self
2692             CODE:
2693 0 0         if (items > 1)
2694 0           self->priority = SvIV(ST(1));
2695 0 0         RETVAL = self->priority;
2696             OUTPUT:
2697             RETVAL
2698              
2699             int
2700             keepalive(self, ...)
2701             EV::Nats self
2702             CODE:
2703 0 0         if (items > 1)
2704 0           self->keepalive = SvIV(ST(1));
2705 0 0         RETVAL = self->keepalive;
2706             OUTPUT:
2707             RETVAL
2708              
2709             void
2710             on_error(self, ...)
2711             EV::Nats self
2712             PPCODE:
2713 0 0         if (items > 1) {
2714 0 0         CLEAR_HANDLER(self->on_error);
2715 0 0         if (SvOK(ST(1)))
2716 0           self->on_error = newSVsv(ST(1));
2717             }
2718 0 0         if (GIMME_V != G_VOID && self->on_error)
    0          
2719 0           PUSHs(sv_2mortal(newSVsv(self->on_error)));
2720              
2721             void
2722             on_connect(self, ...)
2723             EV::Nats self
2724             PPCODE:
2725 0 0         if (items > 1) {
2726 0 0         CLEAR_HANDLER(self->on_connect);
2727 0 0         if (SvOK(ST(1)))
2728 0           self->on_connect = newSVsv(ST(1));
2729             }
2730 0 0         if (GIMME_V != G_VOID && self->on_connect)
    0          
2731 0           PUSHs(sv_2mortal(newSVsv(self->on_connect)));
2732              
2733             void
2734             on_disconnect(self, ...)
2735             EV::Nats self
2736             PPCODE:
2737 0 0         if (items > 1) {
2738 0 0         CLEAR_HANDLER(self->on_disconnect);
2739 0 0         if (SvOK(ST(1)))
2740 0           self->on_disconnect = newSVsv(ST(1));
2741             }
2742 0 0         if (GIMME_V != G_VOID && self->on_disconnect)
    0          
2743 0           PUSHs(sv_2mortal(newSVsv(self->on_disconnect)));
2744              
2745             #ifdef HAVE_OPENSSL
2746              
2747             void
2748             tls(self, enable, ca_file = NULL, skip_verify = 0)
2749             EV::Nats self
2750             int enable
2751             const char *ca_file
2752             int skip_verify
2753             CODE:
2754 0           self->tls = enable;
2755 0           self->tls_skip_verify = skip_verify;
2756 0 0         nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);
    0          
2757              
2758             #endif
2759              
2760             void
2761             stats(self)
2762             EV::Nats self
2763             PPCODE:
2764 0 0         EXTEND(SP, 8);
2765 0           PUSHs(sv_2mortal(newSVpvs("msgs_in")));
2766 0           PUSHs(sv_2mortal(newSVuv(self->msgs_in)));
2767 0           PUSHs(sv_2mortal(newSVpvs("msgs_out")));
2768 0           PUSHs(sv_2mortal(newSVuv(self->msgs_out)));
2769 0           PUSHs(sv_2mortal(newSVpvs("bytes_in")));
2770 0           PUSHs(sv_2mortal(newSVuv(self->bytes_in)));
2771 0           PUSHs(sv_2mortal(newSVpvs("bytes_out")));
2772 0           PUSHs(sv_2mortal(newSVuv(self->bytes_out)));
2773              
2774             void
2775             reset_stats(self)
2776             EV::Nats self
2777             CODE:
2778 0           self->msgs_in = 0;
2779 0           self->msgs_out = 0;
2780 0           self->bytes_in = 0;
2781 0           self->bytes_out = 0;
2782              
2783             SV *
2784             new_inbox(self)
2785             EV::Nats self
2786             CODE:
2787             {
2788             char inbox[80];
2789 0           int len = snprintf(inbox, sizeof(inbox), "%s%" UVuf, self->inbox_prefix, (UV)self->next_req_id++);
2790 0           RETVAL = newSVpvn(inbox, len);
2791             }
2792             OUTPUT:
2793             RETVAL
2794              
2795             int
2796             subscription_count(self)
2797             EV::Nats self
2798             CODE:
2799 0 0         RETVAL = HvKEYS(self->sub_map);
    0          
2800             OUTPUT:
2801             RETVAL
2802              
2803             void
2804             batch(self, code)
2805             EV::Nats self
2806             SV *code
2807             CODE:
2808 0           self->batch_mode = 1;
2809             {
2810 0           dSP;
2811 0           ENTER; SAVETMPS;
2812 0 0         PUSHMARK(SP);
2813 0           PUTBACK;
2814 0           call_sv(code, G_DISCARD);
2815 0 0         FREETMPS; LEAVE;
2816             }
2817 0           self->batch_mode = 0;
2818 0 0         if (self->wbuf_len > self->wbuf_off) {
2819 0           self->wbuf_dirty = 1;
2820 0 0         if (!self->prepare_active) {
2821 0           ev_prepare_start(self->loop, &self->prepare_watcher);
2822 0           self->prepare_active = 1;
2823             }
2824             /* Check slow consumer after batch */
2825 0 0         if (self->slow_consumer_bytes > 0 &&
2826 0 0         (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes &&
2827 0 0         self->on_slow_consumer) {
2828 0           dSP;
2829 0           ENTER; SAVETMPS;
2830 0 0         PUSHMARK(SP);
2831 0 0         EXTEND(SP, 1);
2832 0           PUSHs(sv_2mortal(newSVuv(self->wbuf_len - self->wbuf_off)));
2833 0           PUTBACK;
2834 0           call_sv(self->on_slow_consumer, G_DISCARD);
2835 0 0         FREETMPS; LEAVE;
2836             }
2837             }
2838              
2839             void
2840             slow_consumer(self, bytes_threshold, cb = NULL)
2841             EV::Nats self
2842             UV bytes_threshold
2843             SV *cb
2844             CODE:
2845 0           self->slow_consumer_bytes = (size_t)bytes_threshold;
2846 0 0         CLEAR_HANDLER(self->on_slow_consumer);
2847 0 0         if (cb && SvOK(cb))
    0          
2848 0           self->on_slow_consumer = newSVsv(cb);
2849              
2850             void
2851             on_lame_duck(self, ...)
2852             EV::Nats self
2853             PPCODE:
2854 0 0         if (items > 1) {
2855 0 0         CLEAR_HANDLER(self->on_ldm);
2856 0 0         if (SvOK(ST(1)))
2857 0           self->on_ldm = newSVsv(ST(1));
2858             }
2859 0 0         if (GIMME_V != G_VOID && self->on_ldm)
    0          
2860 0           PUSHs(sv_2mortal(newSVsv(self->on_ldm)));
2861              
2862             #ifdef HAVE_OPENSSL
2863              
2864             void
2865             nkey_seed(self, seed)
2866             EV::Nats self
2867             const char *seed
2868             CODE:
2869 2           nats_set_str(&self->nkey_seed, seed);
2870              
2871             SV *
2872             nkey_public_from_seed(class, seed)
2873             SV *class
2874             const char *seed
2875             CODE:
2876             (void)class;
2877             {
2878             char pub[64];
2879 0           int n = nats_nkey_public(seed, pub, sizeof(pub));
2880 0 0         if (n <= 0)
2881 0           croak("invalid NKey seed");
2882 0           RETVAL = newSVpvn(pub, n);
2883             }
2884             OUTPUT:
2885             RETVAL
2886              
2887             SV *
2888             nkey_generate_user_seed(class)
2889             SV *class
2890             CODE:
2891             (void)class;
2892             {
2893             unsigned char raw[36];
2894             /* S+User seed: b1 = PrefixByteSeed | (PrefixByteUser >> 5) = 0x95;
2895             b2 = (PrefixByteUser & 31) << 3 = 0x00. */
2896 0           raw[0] = 0x95;
2897 0           raw[1] = 0x00;
2898 0           nats_random_bytes(raw + 2, 32);
2899 0           uint16_t crc = nats_crc16(raw, 34);
2900 0           raw[34] = crc & 0xFF;
2901 0           raw[35] = (crc >> 8) & 0xFF;
2902             /* 36 bytes -> 58 base32 chars (288 bits, 57 full chars + 1 flush). */
2903             char enc[60];
2904 0           int n = nats_base32_encode(raw, sizeof(raw), enc, sizeof(enc));
2905 0 0         if (n < 0) croak("base32 buffer overflow");
2906 0           RETVAL = newSVpvn(enc, n);
2907             }
2908             OUTPUT:
2909             RETVAL
2910              
2911             #endif
2912              
2913             void
2914             jwt(self, jwt_token)
2915             EV::Nats self
2916             const char *jwt_token
2917             CODE:
2918 2           nats_set_str(&self->jwt, jwt_token);
2919              
2920             void
2921             drain(self, cb = NULL)
2922             EV::Nats self
2923             SV *cb
2924             CODE:
2925 0 0         if (!self->connected || self->draining)
    0          
2926 0           return;
2927 0           self->draining = 1;
2928 0 0         CLEAR_HANDLER(self->drain_cb);
2929 0 0         if (cb && SvOK(cb))
    0          
2930 0           self->drain_cb = newSVsv(cb);
2931              
2932             /* Send UNSUB for all subscriptions */
2933             {
2934             ngx_queue_t *q;
2935             char buf[64];
2936 0 0         ngx_queue_foreach(q, &self->subs) {
    0          
2937 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
2938 0           int n = snprintf(buf, sizeof(buf), "UNSUB %" UVuf "\r\n", (UV)sub->sid);
2939 0           wbuf_append(self, buf, n);
2940             }
2941             }
2942              
2943             /* Enqueue PING fence via pong_cbs — naturally ordered after any pending flush cbs.
2944             Use NULL cb; the PONG handler checks draining after firing pong_cb. */
2945 0           wbuf_append(self, "PING\r\n", 6);
2946 0           nats_try_write(self);
2947             {
2948             nats_pong_cb_t *pcb;
2949 0           Newxz(pcb, 1, nats_pong_cb_t);
2950 0           pcb->cb = NULL; /* drain completion marker */
2951 0           ngx_queue_insert_tail(&self->pong_cbs, &pcb->queue);
2952             }
2953              
2954             void
2955             DESTROY(self)
2956             EV::Nats self
2957             CODE:
2958 6 50         if (self->magic != NATS_MAGIC_ALIVE)
2959 0           return;
2960              
2961 6           self->magic = 0;
2962 6           self->intentional_disconnect = 1;
2963              
2964 6 50         if (PL_dirty) {
2965 0 0         if (self->fd >= 0)
2966 0           close(self->fd);
2967 0           return;
2968             }
2969              
2970 6 50         CLEAR_HANDLER(self->on_error);
2971 6 50         CLEAR_HANDLER(self->on_connect);
2972 6 50         CLEAR_HANDLER(self->on_disconnect);
2973              
2974 6           nats_stop_watchers(self);
2975 6           nats_stop_timers(self);
2976              
2977 6 50         while (!ngx_queue_empty(&self->req_queue)) {
2978 0           ngx_queue_t *q = ngx_queue_head(&self->req_queue);
2979 0           nats_req_t *req = ngx_queue_data(q, nats_req_t, queue);
2980 0           ngx_queue_remove(q);
2981 0 0         if (req->timer_active)
2982 0           ev_timer_stop(self->loop, &req->timer);
2983 0 0         CLEAR_HANDLER(req->cb);
2984 0           Safefree(req);
2985             }
2986 6           nats_skip_waiting(self);
2987              
2988 6 50         while (!ngx_queue_empty(&self->pong_cbs)) {
2989 0           ngx_queue_t *pq = ngx_queue_head(&self->pong_cbs);
2990 0           nats_pong_cb_t *pcb = ngx_queue_data(pq, nats_pong_cb_t, queue);
2991 0           ngx_queue_remove(pq);
2992 0 0         CLEAR_HANDLER(pcb->cb);
2993 0           Safefree(pcb);
2994             }
2995              
2996 6 50         while (!ngx_queue_empty(&self->subs)) {
2997 0           ngx_queue_t *q = ngx_queue_head(&self->subs);
2998 0           nats_sub_t *sub = ngx_queue_data(q, nats_sub_t, queue);
2999 0           nats_remove_sub(self, sub);
3000             }
3001              
3002 6 50         if (self->fd >= 0) {
3003 0           close(self->fd);
3004 0           self->fd = -1;
3005             }
3006              
3007 6 50         CLEAR_HANDLER(self->server_info_json);
3008 6 50         CLEAR_HANDLER(self->drain_cb);
3009              
3010 6           nats_free_server_pool(self);
3011 6 50         if (self->sub_map) {
3012 6           SvREFCNT_dec((SV *)self->sub_map);
3013 6           self->sub_map = NULL;
3014             }
3015              
3016 6           #ifdef HAVE_OPENSSL
3017 6 50         nats_ssl_cleanup(self);
3018 6           if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; }
3019             Safefree(self->tls_ca_file);
3020 6           #endif
3021 6            
3022 6           Safefree(self->rbuf);
3023 6           Safefree(self->wbuf);
3024 6           Safefree(self->host);
3025 6           Safefree(self->path);
3026 6           Safefree(self->user);
3027 6           Safefree(self->pass);
3028 6           Safefree(self->token);
3029 6           Safefree(self->name);
3030 6           Safefree(self->nkey_seed);
3031 6 50         Safefree(self->jwt);
3032 6 50         Safefree(self->server_nonce);
3033             CLEAR_HANDLER(self->on_slow_consumer);
3034 6           CLEAR_HANDLER(self->on_ldm);
3035              
3036             Safefree(self);