Branch Coverage

Nats.xs
Criterion Covered Total %
branch 41 1382 2.9


line true false branch
276 0 4 if (*dst) { Safefree(*dst); *dst = NULL; }
277 4 0 if (src) {
289 0 0 if (*dst) { Safefree(*dst); *dst = NULL; }
301 0 0 if (needed <= *cap)
303 0 0 size_t newcap = *cap ? *cap : BUF_INIT_SIZE;
304 0 0 while (newcap < needed)
313 0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
314 0 0 if (used > 0)
349 0 0 for (i = 0; i < src_len; i++) {
351 0 0 if (c == '=' || c == ' ') continue;
0 0 if (c == '=' || c == ' ') continue;
352 0 0 if (c >= 128 || b32_tab[c] < 0) return -1;
0 0 if (c >= 128 || b32_tab[c] < 0) return -1;
355 0 0 if (bits >= 8) {
375 0 0 for (i = 0; i < src_len; i++) {
378 0 0 while (bits >= 5) {
379 0 0 if (di >= dst_size) return -1;
384 0 0 if (bits > 0) {
385 0 0 if (di >= dst_size) return -1;
397 0 0 for (i = 0; i + 2 < src_len; i += 3) {
398 0 0 if (di + 4 > dst_size) return -1;
405 0 0 if (i < src_len) {
406 0 0 if (di + 4 > dst_size) return -1;
408 0 0 if (i + 1 < src_len) v |= (uint32_t)src[i+1] << 8;
411 0 0 if (i + 1 < src_len) dst[di++] = b64[(v >> 6) & 0x3F];
413 0 0 if (di < dst_size) dst[di] = '\0';
424 0 0 for (i = 0; i < len; i++) {
426 0 0 for (j = 0; j < 8; j++) {
427 0 0 if (crc & 0x8000) crc = (uint16_t)((crc << 1) ^ 0x1021);
440 0 0 if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
442 0 0 if (raw_len < 36) return -1; /* prefix(2) + seed(32) + CRC(2) */
447 0 0 if (expected_crc != actual_crc) return -1;
452 0 0 if (!pkey) return -1;
455 0 0 if (!ctx) { EVP_PKEY_free(pkey); return -1; }
466 0 0 if (!ok) return -1;
476 0 0 if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
478 0 0 if (raw_len < 36) return -1;
481 0 0 if (nats_crc16(raw, raw_len - 2) != expected_crc) return -1;
486 0 0 if (!pkey) return -1;
492 0 0 if (!ok) return -1;
517 0 0 int n = nats_base32_encode(full, sizeof(full), pub_out,
519 0 0 if (n < 0) return -1;
520 0 0 if ((size_t)n < pub_out_size) pub_out[n] = '\0';
532 0 0 if (self->ssl) {
533 0 0 int n = SSL_read(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
534 0 0 if (n <= 0) {
536 0 0 if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
0 0 if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
540 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
553 0 0 if (self->ssl) {
554 0 0 int n = SSL_write(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
555 0 0 if (n <= 0) {
557 0 0 if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
0 0 if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
573 0 0 if (!self->ssl_ctx) {
575 0 0 if (!self->ssl_ctx) return -1;
577 0 0 if (self->tls_ca_file) {
578 0 0 if (!SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL))
584 0 0 if (!self->tls_skip_verify)
589 0 0 if (!self->ssl) return -1;
592 0 0 if (self->host) {
594 0 0 if (!self->tls_skip_verify) {
598 0 0 if (vpm && !X509_VERIFY_PARAM_set1_ip_asc(vpm, self->host)) {
0 0 if (vpm && !X509_VERIFY_PARAM_set1_ip_asc(vpm, self->host)) {
600 0 0 if (!X509_VERIFY_PARAM_set1_host(vpm, self->host, 0))
611 0 6 if (self->ssl) {
621 0 0 if (ret == 1) {
626 0 0 if (err == SSL_ERROR_WANT_READ) {
627 0 0 if (!self->reading) {
633 0 0 if (err == SSL_ERROR_WANT_WRITE) {
634 0 0 if (!self->writing) {
649 0 0 if (e) {
671 0 0 if (dst_size < 3) {
672 0 0 if (dst_size > 0) dst[0] = '\0';
678 0 0 for (; *s; s++) {
679 0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
680 0 0 if (di + need + 2 > dst_size) break; /* +2 for closing quote + NUL */
682 0 0 if (*s == '"') {
684 0 0 } else if (*s == '\\') {
686 0 0 } else if (*s < 0x20) {
707 6 0 if (fd >= 0) {
708 6 6 while (got < nbytes) {
710 6 0 if (n > 0) got += n;
711 0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
716 0 6 for (i = got; i < nbytes; i++)
724 0 6 if (nbytes < 0) nbytes = 0;
725 0 6 if (nbytes > (int)sizeof(rnd)) nbytes = (int)sizeof(rnd);
728 48 6 for (i = 0; i < nbytes; i++) {
752 0 0 if (svp && SvIOK(*svp))
0 0 if (svp && SvIOK(*svp))
776 0 0 CLEAR_HANDLER(sub->subject);
777 0 0 CLEAR_HANDLER(sub->queue_group);
778 0 0 CLEAR_HANDLER(sub->cb);
787 0 0 ngx_queue_foreach(q, &self->subs) {
0 0 ngx_queue_foreach(q, &self->subs) {
793 0 0 if (sub->queue_group) {
805 0 0 if (sub->max_msgs > 0) {
807 0 0 if (remaining < 1) remaining = 1;
821 0 6 if (self->reading) {
825 0 6 if (self->writing) {
833 0 6 if (self->connect_timer_active) {
837 0 6 if (self->reconnect_timer_active) {
841 0 6 if (self->ping_timer_active) {
845 0 6 if (self->prepare_active) {
854 0 0 while (!ngx_queue_empty(&self->req_queue)) {
859 0 0 if (req->timer_active) {
864 0 0 if (req->cb) {
866 0 0 PUSHMARK(SP);
867 0 0 EXTEND(SP, 2);
872 0 0 FREETMPS; LEAVE;
881 0 6 while (!ngx_queue_empty(&self->wait_queue)) {
885 0 0 if (pub->data)
894 0 0 int was_connected = self->connected || self->connecting;
0 0 int was_connected = self->connected || self->connecting;
909 0 0 if (self->fd >= 0) {
923 0 0 while (!ngx_queue_empty(&self->pong_cbs)) {
927 0 0 if (pcb->cb) {
930 0 0 PUSHMARK(SP);
931 0 0 EXTEND(SP, 1);
935 0 0 FREETMPS; LEAVE;
942 0 0 if (self->draining && self->drain_cb) {
0 0 if (self->draining && self->drain_cb) {
945 0 0 PUSHMARK(SP);
946 0 0 EXTEND(SP, 1);
950 0 0 FREETMPS; LEAVE;
951 0 0 CLEAR_HANDLER(self->drain_cb);
955 0 0 if (was_connected && self->on_disconnect) {
0 0 if (was_connected && self->on_disconnect) {
958 0 0 PUSHMARK(SP);
961 0 0 FREETMPS; LEAVE;
969 0 0 if (self->on_error) {
972 0 0 PUSHMARK(SP);
973 0 0 EXTEND(SP, 1);
977 0 0 FREETMPS; LEAVE;
1003 0 0 CONNECT_APPEND("CONNECT {");
0 0 CONNECT_APPEND("CONNECT {");
0 0 CONNECT_APPEND("CONNECT {");
1004 0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
1008 0 0 CONNECT_APPEND(",\"protocol\":1");
0 0 CONNECT_APPEND(",\"protocol\":1");
0 0 CONNECT_APPEND(",\"protocol\":1");
1009 0 0 if (self->no_responders)
1010 0 0 CONNECT_APPEND(",\"no_responders\":true");
0 0 CONNECT_APPEND(",\"no_responders\":true");
0 0 CONNECT_APPEND(",\"no_responders\":true");
1011 0 0 CONNECT_APPEND(",\"headers\":true");
0 0 CONNECT_APPEND(",\"headers\":true");
0 0 CONNECT_APPEND(",\"headers\":true");
1012 0 0 CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.02\"");
0 0 CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.02\"");
0 0 CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.02\"");
1014 0 0 if (self->user) {
1016 0 0 CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
1017 0 0 if (self->pass) {
1019 0 0 CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
1022 0 0 if (self->token) {
1024 0 0 CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
1026 0 0 if (self->name) {
1028 0 0 CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
1030 0 0 if (self->jwt) {
1032 0 0 CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
0 0 CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
1035 0 0 if (self->nkey_seed && self->server_nonce) {
0 0 if (self->nkey_seed && self->server_nonce) {
1039 0 0 if (nats_nkey_sign(self->nkey_seed, self->server_nonce,
1040 0 0 strlen(self->server_nonce), sig, sizeof(sig)) > 0 &&
1042 0 0 CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
0 0 CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
0 0 CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
1047 0 0 CONNECT_APPEND("}\r\n");
0 0 CONNECT_APPEND("}\r\n");
0 0 CONNECT_APPEND("}\r\n");
1049 0 0 if (overflow) {
1072 0 0 if (tok_len == 0) return -1;
1073 0 0 for (i = 0; i < tok_len; i++) {
1074 0 0 if (tok[i] < '0' || tok[i] > '9') return -1;
0 0 if (tok[i] < '0' || tok[i] > '9') return -1;
1076 0 0 if (v > (SIZE_MAX - d) / 10) return -1;
1089 0 0 if (len < 4) return -1;
1094 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1095 0 0 if (p == tok_start) return -1;
1099 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1103 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1106 0 0 if (nats_parse_decimal(tok_start, p - tok_start, &sid) != 0) return -1;
1110 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1117 0 0 while (tp < end && ntokens < 3) {
0 0 while (tp < end && ntokens < 3) {
1118 0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
1119 0 0 if (tp >= end) break;
1121 0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
1126 0 0 if (ntokens == 1) {
1130 0 0 if (nats_parse_decimal(tokens[0], token_lens[0], &self->msg_total_len) != 0)
1132 0 0 } else if (ntokens == 2) {
1136 0 0 if (nats_parse_decimal(tokens[1], token_lens[1], &self->msg_total_len) != 0)
1153 0 0 if (len < 5) return -1;
1158 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1159 0 0 if (p == tok_start) return -1;
1163 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1167 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1170 0 0 if (nats_parse_decimal(tok_start, p - tok_start, &sid) != 0) return -1;
1174 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1180 0 0 while (tp < end && ntokens < 4) {
0 0 while (tp < end && ntokens < 4) {
1181 0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
1182 0 0 if (tp >= end) break;
1184 0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
1190 0 0 if (ntokens == 2) {
1194 0 0 } else if (ntokens == 3) {
1201 0 0 if (nats_parse_decimal(tokens[hdr_idx], token_lens[hdr_idx], &self->msg_hdr_len) != 0
1202 0 0 || nats_parse_decimal(tokens[len_idx], token_lens[len_idx], &self->msg_total_len) != 0) {
1213 0 0 if (!sub) return;
1223 0 0 if (sub->cb) {
1226 0 0 PUSHMARK(SP);
1227 0 0 EXTEND(SP, 4);
1231 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
1237 0 0 if (self->msg_reply_len > 0) {
1243 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
1249 0 0 FREETMPS; LEAVE;
1252 0 0 if (max_msgs > 0 && received >= max_msgs) {
0 0 if (max_msgs > 0 && received >= max_msgs) {
1254 0 0 if (sub)
1264 0 0 if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
0 0 if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
1271 0 0 for (i = 0; i < id_len; i++) {
1272 0 0 if (id_str[i] < '0' || id_str[i] > '9') return;
0 0 if (id_str[i] < '0' || id_str[i] > '9') return;
1277 0 0 ngx_queue_foreach(q, &self->req_queue) {
0 0 ngx_queue_foreach(q, &self->req_queue) {
1279 0 0 if (req->req_id == req_id) {
1281 0 0 if (req->timer_active) {
1285 0 0 if (req->cb) {
1287 0 0 int is_no_responders = (headers && headers_len >= 12 &&
0 0 int is_no_responders = (headers && headers_len >= 12 &&
1288 0 0 memcmp(headers, "NATS/1.0 503", 12) == 0);
1290 0 0 PUSHMARK(SP);
1291 0 0 EXTEND(SP, 3);
1292 0 0 if (is_no_responders) {
1298 0 0 if (headers && headers_len > 0)
0 0 if (headers && headers_len > 0)
1303 0 0 FREETMPS; LEAVE;
1314 0 0 if (len > 0 && line[len-1] == '\r')
0 0 if (len > 0 && line[len-1] == '\r')
1317 0 0 if (len == 0) return;
1320 0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
1321 0 0 (line[1] == 'N' || line[1] == 'n') &&
0 0 (line[1] == 'N' || line[1] == 'n') &&
1322 0 0 (line[2] == 'F' || line[2] == 'f') &&
0 0 (line[2] == 'F' || line[2] == 'f') &&
1323 0 0 (line[3] == 'O' || line[3] == 'o') &&
0 0 (line[3] == 'O' || line[3] == 'o') &&
1324 0 0 line[4] == ' ') {
1326 0 0 CLEAR_HANDLER(self->server_info_json);
1335 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1338 0 0 while (found < e && *found >= '0' && *found <= '9') {
0 0 while (found < e && *found >= '0' && *found <= '9') {
0 0 while (found < e && *found >= '0' && *found <= '9') {
1348 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1351 0 0 if (nend) {
1353 0 0 if (self->server_nonce) Safefree(self->server_nonce);
1362 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1364 0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
1365 0 0 if (found < e && *found == 't') {
0 0 if (found < e && *found == 't') {
1366 0 0 if (!self->ldm) {
1368 0 0 if (self->on_ldm) {
1371 0 0 PUSHMARK(SP);
1374 0 0 FREETMPS; LEAVE;
1381 0 0 if (self->connecting) {
1383 0 0 if (self->tls && !self->ssl) {
0 0 if (self->tls && !self->ssl) {
1384 0 0 if (nats_ssl_setup(self) != 0) {
1390 0 0 if (hret < 0) {
1394 0 0 if (hret == 1) {
1410 0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
1411 0 0 if (nats_parse_msg_args(self, line, len) == 0) {
1412 0 0 if (self->msg_total_len > (size_t)self->max_payload) {
1423 0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
1424 0 0 if (nats_parse_hmsg_args(self, line, len) == 0) {
1425 0 0 if (self->msg_total_len > (size_t)self->max_payload) {
1436 0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
1443 0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
1444 0 0 if (self->pings_outstanding > 0)
1447 0 0 if (self->connecting && !self->connected) {
0 0 if (self->connecting && !self->connected) {
1452 0 0 if (self->connect_timer_active) {
1459 0 0 if (self->on_connect) {
1462 0 0 PUSHMARK(SP);
1465 0 0 FREETMPS; LEAVE;
1472 0 0 if (!ngx_queue_empty(&self->pong_cbs)) {
1476 0 0 if (pcb->cb) {
1479 0 0 PUSHMARK(SP);
1480 0 0 EXTEND(SP, 1);
1484 0 0 FREETMPS; LEAVE;
1486 0 0 } else if (self->draining) {
1489 0 0 if (self->drain_cb) {
1492 0 0 PUSHMARK(SP);
1493 0 0 EXTEND(SP, 1);
1497 0 0 FREETMPS; LEAVE;
1498 0 0 CLEAR_HANDLER(self->drain_cb);
1512 0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
1517 0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
1520 0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
1521 0 0 while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
0 0 while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
1526 0 0 if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
0 0 if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
1545 0 0 if (self->ssl_handshaking) {
1547 0 0 if (hret == 0) return;
1548 0 0 if (hret < 0) {
1553 0 0 if (self->writing) {
1559 0 0 if (self->connecting) {
1571 0 0 if (n <= 0) {
1572 0 0 if (n == 0) {
1574 0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
1585 0 0 while (consumed < self->rbuf_len) {
1586 0 0 if (self->fd < 0) break;
1590 0 0 if (self->ssl_handshaking) break;
1593 0 0 if (self->parse_state == PARSE_OP) {
1596 0 0 if (!nl) break;
1602 0 0 } else if (self->parse_state == PARSE_MSG_BODY) {
1605 0 0 if (avail < need) break;
1610 0 0 if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
0 0 if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
1616 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
1617 0 0 self->msg_hdr_len <= self->msg_total_len) {
1637 0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
1639 0 0 if (self->rbuf_len > 0)
1646 0 0 if (self->fd < 0) return;
1648 0 0 while (self->wbuf_off < self->wbuf_len) {
1651 0 0 if (n <= 0) {
1652 0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
1653 0 0 if (!self->writing) {
1659 0 0 if (errno == EINTR)
1671 0 0 if (self->writing) {
1682 0 0 if (self->connecting && !self->connected) {
0 0 if (self->connecting && !self->connected) {
1686 0 0 if (err) {
1694 0 0 if (self->writing) {
1698 0 0 if (!self->reading) {
1728 0 0 if (self->server_pool_count > 0)
1735 0 0 if (self->ping_interval_ms <= 0) return;
1748 0 0 if (self->max_pings_outstanding > 0 &&
1749 0 0 self->pings_outstanding > self->max_pings_outstanding) {
1768 0 0 if (self->wbuf_dirty && self->fd >= 0) {
0 0 if (self->wbuf_dirty && self->fd >= 0) {
1776 0 0 if (self->batch_mode) return; /* writes batched, flushed on batch end */
1779 0 0 if (!self->prepare_active) {
1785 0 0 if (self->slow_consumer_bytes > 0 &&
1786 0 0 (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes) {
1787 0 0 if (self->on_slow_consumer) {
1790 0 0 PUSHMARK(SP);
1791 0 0 EXTEND(SP, 1);
1795 0 0 FREETMPS; LEAVE;
1812 0 0 if (req->cb) {
1815 0 0 PUSHMARK(SP);
1816 0 0 EXTEND(SP, 2);
1821 0 0 FREETMPS; LEAVE;
1833 0 0 while (!ngx_queue_empty(&self->wait_queue)) {
1844 0 0 if (self->wbuf_len > self->wbuf_off)
1850 0 0 if (self->connected) {
1853 0 0 } else if (self->connecting ||
1854 0 0 (self->reconnect_enabled && self->reconnect_timer_active)) {
0 0 (self->reconnect_enabled && self->reconnect_timer_active)) {
1875 0 0 if (self->fd >= 0 || self->connecting || self->reconnect_timer_active)
0 0 if (self->fd >= 0 || self->connecting || self->reconnect_timer_active)
0 0 if (self->fd >= 0 || self->connecting || self->reconnect_timer_active)
1877 0 0 if (!self->intentional_disconnect && self->reconnect_enabled) {
0 0 if (!self->intentional_disconnect && self->reconnect_enabled) {
1878 0 0 if (self->max_reconnect_attempts == 0 ||
1879 0 0 self->reconnect_attempts < self->max_reconnect_attempts) {
1883 0 0 if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
0 0 if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
1901 0 6 while (!ngx_queue_empty(&self->server_pool)) {
1915 0 0 if (!p || p >= json + len) return;
0 0 if (!p || p >= json + len) return;
1922 0 0 while (p < end && *p != ']') {
0 0 while (p < end && *p != ']') {
1923 0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
1924 0 0 if (p >= end || *p == ']') break;
0 0 if (p >= end || *p == ']') break;
1927 0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
1930 0 0 if (url_len > 0) {
1934 0 0 for (s = start + url_len - 1; s >= start; s--) {
1935 0 0 if (*s == ':') { colon = s; break; }
1940 0 0 if (colon && colon > start) {
0 0 if (colon && colon > start) {
1947 0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
1961 0 0 while (p < end && *p == '"') p++;
0 0 while (p < end && *p == '"') p++;
1967 0 0 if (ngx_queue_empty(&self->server_pool)) return;
1994 0 0 if (rv != 0) {
2000 0 0 for (rp = res; rp != NULL; rp = rp->ai_next) {
2002 0 0 if (fd < 0) continue;
2009 0 0 if (self->keepalive > 0) {
2017 0 0 if (rv == 0 || errno == EINPROGRESS) break;
0 0 if (rv == 0 || errno == EINPROGRESS) break;
2025 0 0 if (fd < 0) {
2038 0 0 if (self->priority) {
2046 0 0 if (rv != 0) {
2051 0 0 if (self->connect_timeout_ms > 0) {
2063 0 0 if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
0 0 if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
2070 0 0 if (fd < 0) {
2083 0 0 if (rv != 0 && errno != EINPROGRESS) {
0 0 if (rv != 0 && errno != EINPROGRESS) {
2097 0 0 if (self->priority) {
2105 0 0 if (rv != 0) {
2110 0 0 if (self->connect_timeout_ms > 0) {
2120 0 0 if (self->path)
2136 0 17 I_EV_API("EV::Nats");
17 0 I_EV_API("EV::Nats");
0 17 I_EV_API("EV::Nats");
2186 0 6 if (items > 1 && (items - 1) % 2 == 0) {
0 0 if (items > 1 && (items - 1) % 2 == 0) {
2187 0 0 for (i = 1; i < items; i += 2) {
2191 0 0 if (strcmp(key, "host") == 0) nats_set_str_sv(&self->host, val);
2192 0 0 else if (strcmp(key, "port") == 0) self->port = SvIV(val);
2193 0 0 else if (strcmp(key, "path") == 0) nats_set_str_sv(&self->path, val);
2194 0 0 else if (strcmp(key, "user") == 0) nats_set_str_sv(&self->user, val);
2195 0 0 else if (strcmp(key, "pass") == 0) nats_set_str_sv(&self->pass, val);
2196 0 0 else if (strcmp(key, "token") == 0) nats_set_str_sv(&self->token, val);
2197 0 0 else if (strcmp(key, "name") == 0) nats_set_str_sv(&self->name, val);
2198 0 0 else if (strcmp(key, "on_error") == 0)
2200 0 0 else if (strcmp(key, "on_connect") == 0)
2202 0 0 else if (strcmp(key, "on_disconnect") == 0)
2204 0 0 else if (strcmp(key, "verbose") == 0)
2206 0 0 else if (strcmp(key, "pedantic") == 0)
2208 0 0 else if (strcmp(key, "echo") == 0)
2210 0 0 else if (strcmp(key, "no_responders") == 0)
2212 0 0 else if (strcmp(key, "reconnect") == 0)
2214 0 0 else if (strcmp(key, "reconnect_delay") == 0)
2216 0 0 else if (strcmp(key, "max_reconnect_attempts") == 0)
2218 0 0 else if (strcmp(key, "max_reconnect_delay") == 0)
2220 0 0 else if (strcmp(key, "connect_timeout") == 0)
2222 0 0 else if (strcmp(key, "ping_interval") == 0)
2224 0 0 else if (strcmp(key, "max_pings_outstanding") == 0)
2226 0 0 else if (strcmp(key, "priority") == 0)
2228 0 0 else if (strcmp(key, "keepalive") == 0)
2230 0 0 #ifdef HAVE_OPENSSL
2232 0 0 self->tls = SvTRUE(val) ? 1 : 0;
2234 0 0 nats_set_str_sv(&self->tls_ca_file, val);
2236 0 0 self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
2238 0 0 nats_set_str_sv(&self->nkey_seed, val);
2240 0 0 else if (strcmp(key, "jwt") == 0)
2242 0 0 else if (strcmp(key, "slow_consumer_bytes") == 0)
2244 0 0 else if (strcmp(key, "on_slow_consumer") == 0)
2246 0 0 else if (strcmp(key, "on_lame_duck") == 0)
2253 6 0 }
0 6 }
2268 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2271 0 0 if (self->reconnect_timer_active) {
2288 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2291 0 0 if (self->reconnect_timer_active) {
2307 0 0 if (self->reconnect_timer_active) {
2318 0 0 RETVAL = self->connected;
2333 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2337 0 0 if (SvOK(payload)) {
2344 0 0 if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
0 0 if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
2348 0 0 if (reply && *reply) {
0 0 if (reply && *reply) {
2359 0 0 if (self->connected) {
2361 0 0 if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2369 0 0 if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2388 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2393 0 0 if (SvOK(payload)) {
2401 0 0 if (self->max_payload > 0 && total_size > (size_t)self->max_payload)
0 0 if (self->max_payload > 0 && total_size > (size_t)self->max_payload)
2408 0 0 if (reply && *reply) {
0 0 if (reply && *reply) {
2418 0 0 if (self->connected) {
2421 0 0 if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2430 0 0 if (pay_len > 0) memcpy(buf + cmd_len + hdr_data_len, pay_pv, pay_len);
2455 0 0 sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
0 0 sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
2461 0 0 if (queue_group && *queue_group) {
0 0 if (queue_group && *queue_group) {
2469 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2472 0 0 RETVAL = (UV)sub->sid;
2488 0 0 int can_queue = self->connected || self->connecting
2489 0 0 || (self->reconnect_enabled && self->reconnect_timer_active);
0 0 || (self->reconnect_enabled && self->reconnect_timer_active);
0 0 || (self->reconnect_enabled && self->reconnect_timer_active);
2491 0 0 if (max_msgs > 0) {
2493 0 0 if (sub)
2497 0 0 if (can_queue)
2502 0 0 if (can_queue)
2504 0 0 if (sub)
2521 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2524 0 0 if (SvOK(payload)) {
2531 0 0 if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
0 0 if (self->max_payload > 0 && pay_len > (size_t)self->max_payload)
2534 0 0 if (!self->inbox_sub_sid) {
2563 0 0 if (timeout_ms > 0) {
2576 0 0 if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2587 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2596 0 0 if (!self->connected)
2600 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
2611 0 0 if (self->server_info_json)
2622 0 0 if (items > 1)
2624 0 0 RETVAL = self->max_payload;
2632 0 0 RETVAL = self->waiting_count;
2648 0 0 if (items > 2) self->reconnect_delay_ms = SvIV(ST(2));
2649 0 0 if (items > 3) self->max_reconnect_attempts = SvIV(ST(3));
2655 0 0 RETVAL = self->reconnect_enabled;
2663 0 0 if (items > 1)
2665 0 0 RETVAL = self->connect_timeout_ms;
2673 0 0 if (items > 1)
2675 0 0 RETVAL = self->ping_interval_ms;
2683 0 0 if (items > 1)
2685 0 0 RETVAL = self->max_pings_outstanding;
2693 0 0 if (items > 1)
2695 0 0 RETVAL = self->priority;
2703 0 0 if (items > 1)
2705 0 0 RETVAL = self->keepalive;
2713 0 0 if (items > 1) {
2714 0 0 CLEAR_HANDLER(self->on_error);
2715 0 0 if (SvOK(ST(1)))
2718 0 0 if (GIMME_V != G_VOID && self->on_error)
0 0 if (GIMME_V != G_VOID && self->on_error)
2725 0 0 if (items > 1) {
2726 0 0 CLEAR_HANDLER(self->on_connect);
2727 0 0 if (SvOK(ST(1)))
2730 0 0 if (GIMME_V != G_VOID && self->on_connect)
0 0 if (GIMME_V != G_VOID && self->on_connect)
2737 0 0 if (items > 1) {
2738 0 0 CLEAR_HANDLER(self->on_disconnect);
2739 0 0 if (SvOK(ST(1)))
2742 0 0 if (GIMME_V != G_VOID && self->on_disconnect)
0 0 if (GIMME_V != G_VOID && self->on_disconnect)
2756 0 0 nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);
0 0 nats_set_str(&self->tls_ca_file, (ca_file && *ca_file) ? ca_file : NULL);
2764 0 0 EXTEND(SP, 8);
2799 0 0 RETVAL = HvKEYS(self->sub_map);
0 0 RETVAL = HvKEYS(self->sub_map);
2812 0 0 PUSHMARK(SP);
2815 0 0 FREETMPS; LEAVE;
2818 0 0 if (self->wbuf_len > self->wbuf_off) {
2820 0 0 if (!self->prepare_active) {
2825 0 0 if (self->slow_consumer_bytes > 0 &&
2826 0 0 (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes &&
2827 0 0 self->on_slow_consumer) {
2830 0 0 PUSHMARK(SP);
2831 0 0 EXTEND(SP, 1);
2835 0 0 FREETMPS; LEAVE;
2846 0 0 CLEAR_HANDLER(self->on_slow_consumer);
2847 0 0 if (cb && SvOK(cb))
0 0 if (cb && SvOK(cb))
2854 0 0 if (items > 1) {
2855 0 0 CLEAR_HANDLER(self->on_ldm);
2856 0 0 if (SvOK(ST(1)))
2859 0 0 if (GIMME_V != G_VOID && self->on_ldm)
0 0 if (GIMME_V != G_VOID && self->on_ldm)
2880 0 0 if (n <= 0)
2905 0 0 if (n < 0) croak("base32 buffer overflow");
2925 0 0 if (!self->connected || self->draining)
0 0 if (!self->connected || self->draining)
2928 0 0 CLEAR_HANDLER(self->drain_cb);
2929 0 0 if (cb && SvOK(cb))
0 0 if (cb && SvOK(cb))
2936 0 0 ngx_queue_foreach(q, &self->subs) {
0 0 ngx_queue_foreach(q, &self->subs) {
2958 0 6 if (self->magic != NATS_MAGIC_ALIVE)
2964 0 6 if (PL_dirty) {
2965 0 0 if (self->fd >= 0)
2970 0 6 CLEAR_HANDLER(self->on_error);
2971 0 6 CLEAR_HANDLER(self->on_connect);
2972 0 6 CLEAR_HANDLER(self->on_disconnect);
2977 0 6 while (!ngx_queue_empty(&self->req_queue)) {
2981 0 0 if (req->timer_active)
2983 0 0 CLEAR_HANDLER(req->cb);
2988 0 6 while (!ngx_queue_empty(&self->pong_cbs)) {
2992 0 0 CLEAR_HANDLER(pcb->cb);
2996 0 6 while (!ngx_queue_empty(&self->subs)) {
3002 0 6 if (self->fd >= 0) {
3007 0 6 CLEAR_HANDLER(self->server_info_json);
3008 0 6 CLEAR_HANDLER(self->drain_cb);
3011 6 0 if (self->sub_map) {
3017 0 6 nats_ssl_cleanup(self);
3031 0 6 Safefree(self->jwt);
3032 0 6 Safefree(self->server_nonce);