Branch Coverage

Nats.xs
Criterion Covered Total %
branch 3 1320 0.2


line true false branch
274 0 0 if (needed <= *cap)
276 0 0 size_t newcap = *cap ? *cap : BUF_INIT_SIZE;
277 0 0 while (newcap < needed)
286 0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
0 0 if (self->wbuf_off > 0 && self->wbuf_off > self->wbuf_len / 2) {
287 0 0 if (used > 0)
322 0 0 for (i = 0; i < src_len; i++) {
324 0 0 if (c == '=' || c == ' ') continue;
0 0 if (c == '=' || c == ' ') continue;
325 0 0 if (c >= 128 || b32_tab[c] < 0) return -1;
0 0 if (c >= 128 || b32_tab[c] < 0) return -1;
328 0 0 if (bits >= 8) {
343 0 0 for (i = 0; i + 2 < src_len; i += 3) {
344 0 0 if (di + 4 > dst_size) return -1;
351 0 0 if (i < src_len) {
352 0 0 if (di + 4 > dst_size) return -1;
354 0 0 if (i + 1 < src_len) v |= (uint32_t)src[i+1] << 8;
357 0 0 if (i + 1 < src_len) dst[di++] = b64[(v >> 6) & 0x3F];
359 0 0 if (di < dst_size) dst[di] = '\0';
368 0 0 for (i = 0; i < len; i++) {
371 0 0 for (j = 0; j < 8; j++) {
372 0 0 if (crc & 1) crc = (crc >> 1) ^ 0xA001;
385 0 0 if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
387 0 0 if (raw_len < 36) return -1; /* prefix(2) + seed(32) + CRC(2) */
392 0 0 if (expected_crc != actual_crc) return -1;
397 0 0 if (!pkey) return -1;
400 0 0 if (!ctx) { EVP_PKEY_free(pkey); return -1; }
411 0 0 if (!ok) return -1;
421 0 0 if (nats_base32_decode(seed_encoded, strlen(seed_encoded), raw, &raw_len) != 0)
423 0 0 if (raw_len < 36) return -1;
426 0 0 if (nats_crc16(raw, raw_len - 2) != expected_crc) return -1;
431 0 0 if (!pkey) return -1;
437 0 0 if (!ok) return -1;
464 0 0 for (ci = 0; ci < 35 && di < pub_out_size - 1; ci++) {
0 0 for (ci = 0; ci < 35 && di < pub_out_size - 1; ci++) {
467 0 0 while (bits >= 5 && di < pub_out_size - 1) {
0 0 while (bits >= 5 && di < pub_out_size - 1) {
472 0 0 if (di < pub_out_size) pub_out[di] = '\0';
484 0 0 if (self->ssl) {
485 0 0 int n = SSL_read(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
486 0 0 if (n <= 0) {
488 0 0 if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
0 0 if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) {
492 0 0 if (err == SSL_ERROR_ZERO_RETURN) return 0;
505 0 0 if (self->ssl) {
506 0 0 int n = SSL_write(self->ssl, buf, (len > (size_t)INT_MAX) ? INT_MAX : (int)len);
507 0 0 if (n <= 0) {
509 0 0 if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
0 0 if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
525 0 0 if (!self->ssl_ctx) {
527 0 0 if (!self->ssl_ctx) return -1;
529 0 0 if (self->tls_ca_file) {
530 0 0 if (!SSL_CTX_load_verify_locations(self->ssl_ctx, self->tls_ca_file, NULL))
536 0 0 if (!self->tls_skip_verify)
541 0 0 if (!self->ssl) return -1;
544 0 0 if (self->host)
552 0 0 if (self->ssl) {
562 0 0 if (ret == 1) {
567 0 0 if (err == SSL_ERROR_WANT_READ) {
568 0 0 if (!self->reading) {
574 0 0 if (err == SSL_ERROR_WANT_WRITE) {
575 0 0 if (!self->writing) {
595 0 0 if (dst_size < 3) {
596 0 0 if (dst_size > 0) dst[0] = '\0';
602 0 0 for (; *s; s++) {
603 0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
0 0 size_t need = (*s == '"' || *s == '\\') ? 2 : (*s < 0x20) ? 6 : 1;
604 0 0 if (di + need + 2 > dst_size) break; /* +2 for closing quote + NUL */
606 0 0 if (*s == '"') {
608 0 0 } else if (*s == '\\') {
610 0 0 } else if (*s < 0x20) {
633 0 0 if (fd >= 0) {
634 0 0 while (got < nbytes) {
636 0 0 if (n > 0) got += n;
637 0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
0 0 else if (n == 0 || (errno != EINTR && errno != EAGAIN)) break;
641 0 0 for (i = got; i < nbytes; i++)
644 0 0 for (i = 0; i < nbytes; i++) {
668 0 0 if (svp && SvIOK(*svp))
0 0 if (svp && SvIOK(*svp))
692 0 0 CLEAR_HANDLER(sub->subject);
693 0 0 CLEAR_HANDLER(sub->queue_group);
694 0 0 CLEAR_HANDLER(sub->cb);
703 0 0 ngx_queue_foreach(q, &self->subs) {
0 0 ngx_queue_foreach(q, &self->subs) {
709 0 0 if (sub->queue_group) {
721 0 0 if (sub->max_msgs > 0) {
723 0 0 if (remaining < 1) remaining = 1;
737 0 0 if (self->reading) {
741 0 0 if (self->writing) {
749 0 0 if (self->connect_timer_active) {
753 0 0 if (self->reconnect_timer_active) {
757 0 0 if (self->ping_timer_active) {
761 0 0 if (self->prepare_active) {
770 0 0 while (!ngx_queue_empty(&self->req_queue)) {
775 0 0 if (req->timer_active) {
780 0 0 if (req->cb) {
782 0 0 PUSHMARK(SP);
783 0 0 EXTEND(SP, 2);
788 0 0 FREETMPS; LEAVE;
797 0 0 while (!ngx_queue_empty(&self->wait_queue)) {
801 0 0 if (pub->data)
810 0 0 int was_connected = self->connected || self->connecting;
0 0 int was_connected = self->connected || self->connecting;
824 0 0 if (self->fd >= 0) {
836 0 0 while (!ngx_queue_empty(&self->pong_cbs)) {
840 0 0 CLEAR_HANDLER(pcb->cb);
844 0 0 if (was_connected && self->on_disconnect) {
0 0 if (was_connected && self->on_disconnect) {
847 0 0 PUSHMARK(SP);
850 0 0 FREETMPS; LEAVE;
858 0 0 if (self->on_error) {
861 0 0 PUSHMARK(SP);
862 0 0 EXTEND(SP, 1);
866 0 0 FREETMPS; LEAVE;
889 0 0 CONNECT_APPEND("CONNECT {");
890 0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
0 0 CONNECT_APPEND("\"verbose\":%s,\"pedantic\":%s,\"echo\":%s",
894 0 0 CONNECT_APPEND(",\"protocol\":1");
895 0 0 if (self->no_responders)
896 0 0 CONNECT_APPEND(",\"no_responders\":true");
897 0 0 CONNECT_APPEND(",\"headers\":true");
898 0 0 CONNECT_APPEND(",\"lang\":\"perl-xs\",\"version\":\"0.01\"");
900 0 0 if (self->user) {
902 0 0 CONNECT_APPEND(",\"user\":%.*s", elen, escaped);
903 0 0 if (self->pass) {
905 0 0 CONNECT_APPEND(",\"pass\":%.*s", elen, escaped);
908 0 0 if (self->token) {
910 0 0 CONNECT_APPEND(",\"auth_token\":%.*s", elen, escaped);
912 0 0 if (self->name) {
914 0 0 CONNECT_APPEND(",\"name\":%.*s", elen, escaped);
916 0 0 if (self->jwt) {
918 0 0 CONNECT_APPEND(",\"jwt\":%.*s", elen, escaped);
921 0 0 if (self->nkey_seed && self->server_nonce) {
0 0 if (self->nkey_seed && self->server_nonce) {
925 0 0 if (nats_nkey_sign(self->nkey_seed, self->server_nonce,
926 0 0 strlen(self->server_nonce), sig, sizeof(sig)) > 0 &&
928 0 0 CONNECT_APPEND(",\"nkey\":\"%s\",\"sig\":\"%s\"", pub, sig);
933 0 0 CONNECT_APPEND("}\r\n");
935 0 0 if (off > (int)sizeof(buf))
955 0 0 if (len < 4) return -1;
960 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
961 0 0 if (p == tok_start) return -1;
965 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
969 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
970 0 0 if (p == tok_start) return -1;
974 0 0 while (s < p) {
975 0 0 if (*s < '0' || *s > '9') return -1;
0 0 if (*s < '0' || *s > '9') return -1;
981 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
988 0 0 while (tp < end && ntokens < 3) {
0 0 while (tp < end && ntokens < 3) {
989 0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
990 0 0 if (tp >= end) break;
992 0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
997 0 0 if (ntokens == 1) {
1003 0 0 while (s < e) {
1004 0 0 if (*s < '0' || *s > '9') return -1;
0 0 if (*s < '0' || *s > '9') return -1;
1008 0 0 } else if (ntokens == 2) {
1014 0 0 while (s < e) {
1015 0 0 if (*s < '0' || *s > '9') return -1;
0 0 if (*s < '0' || *s > '9') return -1;
1034 0 0 if (len < 5) return -1;
1039 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1040 0 0 if (p == tok_start) return -1;
1044 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1048 0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
0 0 while (p < end && *p != ' ' && *p != '\t') p++;
1049 0 0 if (p == tok_start) return -1;
1053 0 0 while (s < p) {
1054 0 0 if (*s < '0' || *s > '9') return -1;
0 0 if (*s < '0' || *s > '9') return -1;
1060 0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
0 0 while (p < end && (*p == ' ' || *p == '\t')) p++;
1066 0 0 while (tp < end && ntokens < 4) {
0 0 while (tp < end && ntokens < 4) {
1067 0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
0 0 while (tp < end && (*tp == ' ' || *tp == '\t')) tp++;
1068 0 0 if (tp >= end) break;
1070 0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
0 0 while (tp < end && *tp != ' ' && *tp != '\t') tp++;
1075 0 0 if (ntokens == 2) {
1080 0 0 { char *s = tokens[0]; char *e = s + token_lens[0]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[0]; char *e = s + token_lens[0]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[0]; char *e = s + token_lens[0]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
1081 0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
1082 0 0 } else if (ntokens == 3) {
1087 0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[1]; char *e = s + token_lens[1]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_hdr_len = self->msg_hdr_len * 10 + (*s - '0'); s++; } }
1088 0 0 { char *s = tokens[2]; char *e = s + token_lens[2]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[2]; char *e = s + token_lens[2]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
0 0 { char *s = tokens[2]; char *e = s + token_lens[2]; while (s < e) { if (*s < '0' || *s > '9') return -1; self->msg_total_len = self->msg_total_len * 10 + (*s - '0'); s++; } }
1100 0 0 if (!sub) return;
1110 0 0 if (sub->cb) {
1113 0 0 PUSHMARK(SP);
1114 0 0 EXTEND(SP, 4);
1118 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 && self->msg_hdr_len <= len) {
1124 0 0 if (self->msg_reply_len > 0) {
1130 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0) {
1136 0 0 FREETMPS; LEAVE;
1139 0 0 if (max_msgs > 0 && received >= max_msgs) {
0 0 if (max_msgs > 0 && received >= max_msgs) {
1141 0 0 if (sub)
1151 0 0 if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
0 0 if (subject_len <= pfx_len || memcmp(subject, self->inbox_prefix, pfx_len) != 0)
1158 0 0 for (i = 0; i < id_len; i++) {
1159 0 0 if (id_str[i] < '0' || id_str[i] > '9') return;
0 0 if (id_str[i] < '0' || id_str[i] > '9') return;
1164 0 0 ngx_queue_foreach(q, &self->req_queue) {
0 0 ngx_queue_foreach(q, &self->req_queue) {
1166 0 0 if (req->req_id == req_id) {
1168 0 0 if (req->timer_active) {
1172 0 0 if (req->cb) {
1174 0 0 int is_no_responders = (headers && headers_len >= 12 &&
0 0 int is_no_responders = (headers && headers_len >= 12 &&
1175 0 0 memcmp(headers, "NATS/1.0 503", 12) == 0);
1177 0 0 PUSHMARK(SP);
1178 0 0 EXTEND(SP, 3);
1179 0 0 if (is_no_responders) {
1185 0 0 if (headers && headers_len > 0)
0 0 if (headers && headers_len > 0)
1190 0 0 FREETMPS; LEAVE;
1201 0 0 if (len > 0 && line[len-1] == '\r')
0 0 if (len > 0 && line[len-1] == '\r')
1204 0 0 if (len == 0) return;
1207 0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
0 0 if (len > 5 && (line[0] == 'I' || line[0] == 'i') &&
1208 0 0 (line[1] == 'N' || line[1] == 'n') &&
0 0 (line[1] == 'N' || line[1] == 'n') &&
1209 0 0 (line[2] == 'F' || line[2] == 'f') &&
0 0 (line[2] == 'F' || line[2] == 'f') &&
1210 0 0 (line[3] == 'O' || line[3] == 'o') &&
0 0 (line[3] == 'O' || line[3] == 'o') &&
1211 0 0 line[4] == ' ') {
1213 0 0 CLEAR_HANDLER(self->server_info_json);
1222 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1225 0 0 while (found < e && *found >= '0' && *found <= '9') {
0 0 while (found < e && *found >= '0' && *found <= '9') {
0 0 while (found < e && *found >= '0' && *found <= '9') {
1235 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1238 0 0 if (nend) {
1240 0 0 if (self->server_nonce) Safefree(self->server_nonce);
1249 0 0 if (found && found < e) {
0 0 if (found && found < e) {
1251 0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
0 0 while (found < e && (*found == ' ' || *found == '\t')) found++;
1252 0 0 if (found < e && *found == 't') {
0 0 if (found < e && *found == 't') {
1253 0 0 if (!self->ldm) {
1255 0 0 if (self->on_ldm) {
1258 0 0 PUSHMARK(SP);
1261 0 0 FREETMPS; LEAVE;
1268 0 0 if (self->connecting) {
1276 0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
0 0 if (len >= 4 && line[0] == 'M' && line[1] == 'S' && line[2] == 'G' && line[3] == ' ') {
1277 0 0 if (nats_parse_msg_args(self, line, len) == 0) {
1278 0 0 if (self->msg_total_len > (size_t)self->max_payload) {
1289 0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
0 0 if (len >= 5 && line[0] == 'H' && line[1] == 'M' && line[2] == 'S' && line[3] == 'G' && line[4] == ' ') {
1290 0 0 if (nats_parse_hmsg_args(self, line, len) == 0) {
1291 0 0 if (self->msg_total_len > (size_t)self->max_payload) {
1302 0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'I' && line[2] == 'N' && line[3] == 'G') {
1309 0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
0 0 if (len == 4 && line[0] == 'P' && line[1] == 'O' && line[2] == 'N' && line[3] == 'G') {
1310 0 0 if (self->pings_outstanding > 0)
1313 0 0 if (self->connecting && !self->connected) {
0 0 if (self->connecting && !self->connected) {
1318 0 0 if (self->connect_timer_active) {
1325 0 0 if (self->on_connect) {
1328 0 0 PUSHMARK(SP);
1331 0 0 FREETMPS; LEAVE;
1338 0 0 if (!ngx_queue_empty(&self->pong_cbs)) {
1342 0 0 if (pcb->cb) {
1345 0 0 PUSHMARK(SP);
1348 0 0 FREETMPS; LEAVE;
1350 0 0 } else if (self->draining) {
1353 0 0 if (self->drain_cb) {
1356 0 0 PUSHMARK(SP);
1359 0 0 FREETMPS; LEAVE;
1360 0 0 CLEAR_HANDLER(self->drain_cb);
1374 0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
0 0 if (len >= 3 && line[0] == '+' && line[1] == 'O' && line[2] == 'K') {
1379 0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
0 0 if (len >= 4 && line[0] == '-' && line[1] == 'E' && line[2] == 'R' && line[3] == 'R') {
1382 0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
0 0 while (mlen > 0 && (*msg == ' ' || *msg == '\'')) { msg++; mlen--; }
1383 0 0 while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
0 0 while (mlen > 0 && msg[mlen-1] == '\'') mlen--;
1388 0 0 if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
0 0 if (strstr(errbuf, "authorization") || strstr(errbuf, "authentication")) {
1407 0 0 if (self->ssl_handshaking) {
1409 0 0 if (hret == 0) return;
1410 0 0 if (hret < 0) {
1416 0 0 if (self->writing) {
1428 0 0 if (n <= 0) {
1429 0 0 if (n == 0) {
1431 0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
0 0 } else if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
1442 0 0 while (consumed < self->rbuf_len) {
1443 0 0 if (self->fd < 0) break;
1445 0 0 if (self->parse_state == PARSE_OP) {
1448 0 0 if (!nl) break;
1454 0 0 } else if (self->parse_state == PARSE_MSG_BODY) {
1457 0 0 if (avail < need) break;
1462 0 0 if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
0 0 if (self->inbox_sub_sid && self->msg_sid == self->inbox_sub_sid) {
1468 0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
0 0 if (self->msg_type == MSG_TYPE_HMSG && self->msg_hdr_len > 0 &&
1469 0 0 self->msg_hdr_len <= self->msg_total_len) {
1489 0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
0 0 if (self->fd >= 0 && consumed > 0 && consumed <= self->rbuf_len) {
1491 0 0 if (self->rbuf_len > 0)
1498 0 0 if (self->fd < 0) return;
1500 0 0 while (self->wbuf_off < self->wbuf_len) {
1503 0 0 if (n <= 0) {
1504 0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
0 0 if (n == 0 || errno == EAGAIN || errno == EWOULDBLOCK) {
1505 0 0 if (!self->writing) {
1511 0 0 if (errno == EINTR)
1523 0 0 if (self->writing) {
1534 0 0 if (self->connecting && !self->connected) {
0 0 if (self->connecting && !self->connected) {
1538 0 0 if (err) {
1544 0 0 if (self->tls && !self->ssl) {
0 0 if (self->tls && !self->ssl) {
1545 0 0 if (nats_ssl_setup(self) != 0) {
1552 0 0 if (self->ssl_handshaking) {
1554 0 0 if (hret == 0) return; /* want read/write */
1555 0 0 if (hret < 0) {
1563 0 0 if (self->writing) {
1567 0 0 if (!self->reading) {
1597 0 0 if (self->server_pool_count > 0)
1604 0 0 if (self->ping_interval_ms <= 0) return;
1617 0 0 if (self->max_pings_outstanding > 0 &&
1618 0 0 self->pings_outstanding > self->max_pings_outstanding) {
1637 0 0 if (self->wbuf_dirty && self->fd >= 0) {
0 0 if (self->wbuf_dirty && self->fd >= 0) {
1645 0 0 if (self->batch_mode) return; /* writes batched, flushed on batch end */
1648 0 0 if (!self->prepare_active) {
1654 0 0 if (self->slow_consumer_bytes > 0 &&
1655 0 0 (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes) {
1656 0 0 if (self->on_slow_consumer) {
1659 0 0 PUSHMARK(SP);
1660 0 0 EXTEND(SP, 1);
1664 0 0 FREETMPS; LEAVE;
1681 0 0 if (req->cb) {
1684 0 0 PUSHMARK(SP);
1685 0 0 EXTEND(SP, 2);
1690 0 0 FREETMPS; LEAVE;
1702 0 0 while (!ngx_queue_empty(&self->wait_queue)) {
1713 0 0 if (self->wbuf_len > self->wbuf_off)
1719 0 0 if (self->connected) {
1722 0 0 } else if (self->connecting ||
1723 0 0 (self->reconnect_enabled && self->reconnect_timer_active)) {
0 0 (self->reconnect_enabled && self->reconnect_timer_active)) {
1742 0 0 if (!self->intentional_disconnect && self->reconnect_enabled) {
0 0 if (!self->intentional_disconnect && self->reconnect_enabled) {
1743 0 0 if (self->max_reconnect_attempts == 0 ||
1744 0 0 self->reconnect_attempts < self->max_reconnect_attempts) {
1748 0 0 if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
0 0 if (self->max_reconnect_delay_ms > 0 && delay > self->max_reconnect_delay_ms)
1763 0 0 while (!ngx_queue_empty(&self->server_pool)) {
1777 0 0 if (!p || p >= json + len) return;
0 0 if (!p || p >= json + len) return;
1784 0 0 while (p < end && *p != ']') {
0 0 while (p < end && *p != ']') {
1785 0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
0 0 while (p < end && (*p == ' ' || *p == ',' || *p == '"')) p++;
1786 0 0 if (p >= end || *p == ']') break;
0 0 if (p >= end || *p == ']') break;
1789 0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
0 0 while (p < end && *p != '"' && *p != ',' && *p != ']') p++;
1792 0 0 if (url_len > 0) {
1796 0 0 for (s = start + url_len - 1; s >= start; s--) {
1797 0 0 if (*s == ':') { colon = s; break; }
1802 0 0 if (colon && colon > start) {
0 0 if (colon && colon > start) {
1809 0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
0 0 while (dp < start + url_len && *dp >= '0' && *dp <= '9') {
1823 0 0 while (p < end && *p == '"') p++;
0 0 while (p < end && *p == '"') p++;
1829 0 0 if (ngx_queue_empty(&self->server_pool)) return;
1835 0 0 if (self->host) Safefree(self->host);
1859 0 0 if (rv != 0) {
1865 0 0 for (rp = res; rp != NULL; rp = rp->ai_next) {
1867 0 0 if (fd < 0) continue;
1874 0 0 if (self->keepalive > 0) {
1882 0 0 if (rv == 0 || errno == EINPROGRESS) break;
0 0 if (rv == 0 || errno == EINPROGRESS) break;
1890 0 0 if (fd < 0) {
1903 0 0 if (self->priority) {
1911 0 0 if (rv != 0) {
1916 0 0 if (self->connect_timeout_ms > 0) {
1928 0 0 if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
0 0 if (!self->path || strlen(self->path) >= sizeof(addr.sun_path)) {
1934 0 0 if (fd < 0) {
1947 0 0 if (rv != 0 && errno != EINPROGRESS) {
0 0 if (rv != 0 && errno != EINPROGRESS) {
1961 0 0 if (self->priority) {
1969 0 0 if (rv != 0) {
1974 0 0 if (self->connect_timeout_ms > 0) {
1984 0 0 if (self->path)
2000 0 14 I_EV_API("EV::Nats");
14 0 I_EV_API("EV::Nats");
0 14 I_EV_API("EV::Nats");
2040 0 0 if (items > 1 && (items - 1) % 2 == 0) {
0 0 if (items > 1 && (items - 1) % 2 == 0) {
2041 0 0 for (i = 1; i < items; i += 2) {
2045 0 0 if (strcmp(key, "host") == 0) {
2051 0 0 else if (strcmp(key, "port") == 0)
2053 0 0 else if (strcmp(key, "path") == 0) {
2059 0 0 else if (strcmp(key, "on_error") == 0)
2061 0 0 else if (strcmp(key, "on_connect") == 0)
2063 0 0 else if (strcmp(key, "on_disconnect") == 0)
2065 0 0 else if (strcmp(key, "user") == 0) {
2070 0 0 else if (strcmp(key, "pass") == 0) {
2075 0 0 else if (strcmp(key, "token") == 0) {
2080 0 0 else if (strcmp(key, "name") == 0) {
2085 0 0 else if (strcmp(key, "verbose") == 0)
2087 0 0 else if (strcmp(key, "pedantic") == 0)
2089 0 0 else if (strcmp(key, "echo") == 0)
2091 0 0 else if (strcmp(key, "no_responders") == 0)
2093 0 0 else if (strcmp(key, "reconnect") == 0)
2095 0 0 else if (strcmp(key, "reconnect_delay") == 0)
2097 0 0 else if (strcmp(key, "max_reconnect_attempts") == 0)
2099 0 0 else if (strcmp(key, "max_reconnect_delay") == 0)
2101 0 0 else if (strcmp(key, "connect_timeout") == 0)
2103 0 0 else if (strcmp(key, "ping_interval") == 0)
2105 0 0 else if (strcmp(key, "max_pings_outstanding") == 0)
2107 0 0 else if (strcmp(key, "priority") == 0)
2109 0 0 else if (strcmp(key, "keepalive") == 0)
2111 0 0 #ifdef HAVE_OPENSSL
2113 0 0 self->tls = SvTRUE(val) ? 1 : 0;
2118 0 0 }
2120 0 0 self->tls_skip_verify = SvTRUE(val) ? 1 : 0;
2125 0 0 }
2130 0 0 Copy(s, self->jwt, l + 1, char);
2132 0 0 else if (strcmp(key, "slow_consumer_bytes") == 0)
2134 0 0 else if (strcmp(key, "on_slow_consumer") == 0)
2136 0 0 else if (strcmp(key, "on_lame_duck") == 0)
2143 0 0 }
0 0 }
2158 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2161 0 0 if (self->reconnect_timer_active) {
2166 0 0 if (self->host) { Safefree(self->host); self->host = NULL; }
2167 0 0 if (self->path) { Safefree(self->path); self->path = NULL; }
2181 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2184 0 0 if (self->reconnect_timer_active) {
2189 0 0 if (self->host) { Safefree(self->host); self->host = NULL; }
2190 0 0 if (self->path) Safefree(self->path);
2203 0 0 if (self->reconnect_timer_active) {
2214 0 0 RETVAL = self->connected;
2229 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2233 0 0 if (SvOK(payload)) {
2240 0 0 if ((int)pay_len > self->max_payload)
2244 0 0 if (reply && *reply) {
0 0 if (reply && *reply) {
2255 0 0 if (self->connected) {
2257 0 0 if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2265 0 0 if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2284 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2289 0 0 if (SvOK(payload)) {
2297 0 0 if ((int)total_size > self->max_payload)
2304 0 0 if (reply && *reply) {
0 0 if (reply && *reply) {
2314 0 0 if (self->connected) {
2317 0 0 if (pay_len > 0) wbuf_append(self, pay_pv, pay_len);
2326 0 0 if (pay_len > 0) memcpy(buf + cmd_len + hdr_data_len, pay_pv, pay_len);
2351 0 0 sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
0 0 sub->queue_group = (queue_group && *queue_group) ? newSVpv(queue_group, 0) : NULL;
2357 0 0 if (queue_group && *queue_group) {
0 0 if (queue_group && *queue_group) {
2365 0 0 if (self->connected || self->connecting)
0 0 if (self->connected || self->connecting)
2368 0 0 RETVAL = (UV)sub->sid;
2380 0 0 if (max_msgs > 0) {
2382 0 0 if (sub)
2386 0 0 if (self->connected)
2391 0 0 if (self->connected)
2393 0 0 if (sub)
2410 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2413 0 0 if (SvOK(payload)) {
2420 0 0 if (!self->inbox_sub_sid) {
2449 0 0 if (timeout_ms > 0) {
2462 0 0 if (pay_len > 0) memcpy(buf + hdr_len, pay_pv, pay_len);
2473 0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
0 0 NATS_CROAK_UNLESS_CONNECTED(self);
2482 0 0 if (!self->connected)
2486 0 0 if (cb && SvOK(cb)) {
0 0 if (cb && SvOK(cb)) {
2497 0 0 if (self->server_info_json)
2508 0 0 if (items > 1)
2510 0 0 RETVAL = self->max_payload;
2518 0 0 RETVAL = self->waiting_count;
2543 0 0 RETVAL = self->reconnect_enabled;
2551 0 0 if (items > 1)
2553 0 0 RETVAL = self->connect_timeout_ms;
2561 0 0 if (items > 1)
2563 0 0 RETVAL = self->ping_interval_ms;
2571 0 0 if (items > 1)
2573 0 0 RETVAL = self->max_pings_outstanding;
2581 0 0 if (items > 1)
2583 0 0 RETVAL = self->priority;
2591 0 0 if (items > 1)
2593 0 0 RETVAL = self->keepalive;
2601 0 0 if (items > 1) {
2602 0 0 CLEAR_HANDLER(self->on_error);
2603 0 0 if (SvOK(ST(1)))
2606 0 0 if (GIMME_V != G_VOID && self->on_error)
0 0 if (GIMME_V != G_VOID && self->on_error)
2613 0 0 if (items > 1) {
2614 0 0 CLEAR_HANDLER(self->on_connect);
2615 0 0 if (SvOK(ST(1)))
2618 0 0 if (GIMME_V != G_VOID && self->on_connect)
0 0 if (GIMME_V != G_VOID && self->on_connect)
2625 0 0 if (items > 1) {
2626 0 0 CLEAR_HANDLER(self->on_disconnect);
2627 0 0 if (SvOK(ST(1)))
2630 0 0 if (GIMME_V != G_VOID && self->on_disconnect)
0 0 if (GIMME_V != G_VOID && self->on_disconnect)
2644 0 0 if (self->tls_ca_file) { Safefree(self->tls_ca_file); self->tls_ca_file = NULL; }
2645 0 0 if (ca_file && *ca_file) {
0 0 if (ca_file && *ca_file) {
2657 0 0 EXTEND(SP, 8);
2692 0 0 RETVAL = HvKEYS(self->sub_map);
0 0 RETVAL = HvKEYS(self->sub_map);
2705 0 0 PUSHMARK(SP);
2708 0 0 FREETMPS; LEAVE;
2711 0 0 if (self->wbuf_len > self->wbuf_off) {
2713 0 0 if (!self->prepare_active) {
2718 0 0 if (self->slow_consumer_bytes > 0 &&
2719 0 0 (self->wbuf_len - self->wbuf_off) > self->slow_consumer_bytes &&
2720 0 0 self->on_slow_consumer) {
2723 0 0 PUSHMARK(SP);
2724 0 0 EXTEND(SP, 1);
2728 0 0 FREETMPS; LEAVE;
2739 0 0 CLEAR_HANDLER(self->on_slow_consumer);
2740 0 0 if (cb && SvOK(cb))
0 0 if (cb && SvOK(cb))
2747 0 0 if (items > 1) {
2748 0 0 CLEAR_HANDLER(self->on_ldm);
2749 0 0 if (SvOK(ST(1)))
2752 0 0 if (GIMME_V != G_VOID && self->on_ldm)
0 0 if (GIMME_V != G_VOID && self->on_ldm)
2762 0 0 if (self->nkey_seed) Safefree(self->nkey_seed);
2774 0 0 if (self->jwt) Safefree(self->jwt);
2784 0 0 if (!self->connected || self->draining)
0 0 if (!self->connected || self->draining)
2787 0 0 CLEAR_HANDLER(self->drain_cb);
2788 0 0 if (cb && SvOK(cb))
0 0 if (cb && SvOK(cb))
2795 0 0 ngx_queue_foreach(q, &self->subs) {
0 0 ngx_queue_foreach(q, &self->subs) {
2817 0 0 if (self->magic != NATS_MAGIC_ALIVE)
2823 0 0 if (PL_dirty) {
2824 0 0 if (self->fd >= 0)
2829 0 0 CLEAR_HANDLER(self->on_error);
2830 0 0 CLEAR_HANDLER(self->on_connect);
2831 0 0 CLEAR_HANDLER(self->on_disconnect);
2836 0 0 while (!ngx_queue_empty(&self->req_queue)) {
2840 0 0 if (req->timer_active)
2842 0 0 CLEAR_HANDLER(req->cb);
2847 0 0 while (!ngx_queue_empty(&self->pong_cbs)) {
2851 0 0 CLEAR_HANDLER(pcb->cb);
2855 0 0 while (!ngx_queue_empty(&self->subs)) {
2861 0 0 if (self->fd >= 0) {
2866 0 0 CLEAR_HANDLER(self->server_info_json);
2867 0 0 CLEAR_HANDLER(self->drain_cb);
2870 0 0 if (self->sub_map) {
2876 0 0 nats_ssl_cleanup(self);
2877 0 0 if (self->ssl_ctx) { SSL_CTX_free(self->ssl_ctx); self->ssl_ctx = NULL; }
2879 0 0 #endif
2880 0 0
2881 0 0 if (self->rbuf) Safefree(self->rbuf);
2882 0 0 if (self->wbuf) Safefree(self->wbuf);
2883 0 0 if (self->host) Safefree(self->host);
2884 0 0 if (self->path) Safefree(self->path);
2885 0 0 if (self->user) Safefree(self->user);
2886 0 0 if (self->pass) Safefree(self->pass);
2887 0 0 if (self->token) Safefree(self->token);
2888 0 0 if (self->name) Safefree(self->name);
2889 0 0 if (self->nkey_seed) Safefree(self->nkey_seed);
2890 0 0 if (self->jwt) Safefree(self->jwt);
2891 0 0 if (self->server_nonce) Safefree(self->server_nonce);