File Coverage

amqp_tcp_socket.c
Criterion Covered Total %
statement 33 82 40.2
branch 6 26 23.0
condition n/a
subroutine n/a
pod n/a
total 39 108 36.1


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #ifdef HAVE_CONFIG_H
5             #include "config.h"
6             #endif
7              
8             #include "amqp_private.h"
9             #include "rabbitmq-c/tcp_socket.h"
10              
11             #include
12             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
13             #ifndef WIN32_LEAN_AND_MEAN
14             #define WIN32_LEAN_AND_MEAN
15             #endif
16             #include
17             #else
18             #include
19             #include
20             #include
21             #endif
22             #include
23             #include
24              
25             struct amqp_tcp_socket_t {
26             const struct amqp_socket_class_t *klass;
27             int sockfd;
28             int internal_error;
29             int state;
30             };
31              
32 1           static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len,
33             int flags) {
34 1           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
35             ssize_t res;
36 1           int flagz = 0;
37              
38 1 50         if (-1 == self->sockfd) {
39 1           return AMQP_STATUS_SOCKET_CLOSED;
40             }
41              
42             #ifdef MSG_NOSIGNAL
43 0           flagz |= MSG_NOSIGNAL;
44             #endif
45              
46             #if defined(MSG_MORE)
47 0 0         if (flags & AMQP_SF_MORE) {
48 0           flagz |= MSG_MORE;
49             }
50             /* Cygwin defines TCP_NOPUSH, but trying to use it will return not
51             * implemented. Disable it here. */
52             #elif defined(TCP_NOPUSH) && !defined(__CYGWIN__)
53             if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) {
54             int one = 1;
55             res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &one, sizeof(one));
56             if (0 != res) {
57             self->internal_error = res;
58             return AMQP_STATUS_SOCKET_ERROR;
59             }
60             self->state |= AMQP_SF_MORE;
61             } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) {
62             int zero = 0;
63             res =
64             setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero));
65             if (0 != res) {
66             self->internal_error = res;
67             res = AMQP_STATUS_SOCKET_ERROR;
68             } else {
69             self->state &= ~AMQP_SF_MORE;
70             }
71             }
72             #endif
73              
74 0           start:
75             #ifdef _WIN32
76             res = send(self->sockfd, buf, (int)len, flagz);
77             #else
78 0           res = send(self->sockfd, buf, len, flagz);
79             #endif
80              
81 0 0         if (res < 0) {
82 0           self->internal_error = amqp_os_socket_error();
83 0           switch (self->internal_error) {
84 0           case EINTR:
85 0           goto start;
86             #ifdef _WIN32
87             case WSAEWOULDBLOCK:
88             #else
89 0           case EWOULDBLOCK:
90             #endif
91             #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
92             case EAGAIN:
93             #endif
94 0           res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
95 0           break;
96 0           default:
97 0           res = AMQP_STATUS_SOCKET_ERROR;
98             }
99             } else {
100 0           self->internal_error = 0;
101             }
102              
103 0           return res;
104             }
105              
106 0           static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len,
107             int flags) {
108 0           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
109             ssize_t ret;
110 0 0         if (-1 == self->sockfd) {
111 0           return AMQP_STATUS_SOCKET_CLOSED;
112             }
113              
114 0           start:
115             #ifdef _WIN32
116             ret = recv(self->sockfd, buf, (int)len, flags);
117             #else
118 0           ret = recv(self->sockfd, buf, len, flags);
119             #endif
120              
121 0 0         if (0 > ret) {
122 0           self->internal_error = amqp_os_socket_error();
123 0           switch (self->internal_error) {
124 0           case EINTR:
125 0           goto start;
126             #ifdef _WIN32
127             case WSAEWOULDBLOCK:
128             #else
129 0           case EWOULDBLOCK:
130             #endif
131             #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
132             case EAGAIN:
133             #endif
134 0           ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD;
135 0           break;
136 0           default:
137 0           ret = AMQP_STATUS_SOCKET_ERROR;
138             }
139 0 0         } else if (0 == ret) {
140 0           ret = AMQP_STATUS_CONNECTION_CLOSED;
141             }
142              
143 0           return ret;
144             }
145              
146 1           static int amqp_tcp_socket_open(void *base, const char *host, int port,
147             const struct timeval *timeout) {
148 1           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
149 1 50         if (-1 != self->sockfd) {
150 0           return AMQP_STATUS_SOCKET_INUSE;
151             }
152 1           self->sockfd = amqp_open_socket_noblock(host, port, timeout);
153 1 50         if (0 > self->sockfd) {
154 1           int err = self->sockfd;
155 1           self->sockfd = -1;
156 1           return err;
157             }
158 0           return AMQP_STATUS_OK;
159             }
160              
161 1           static int amqp_tcp_socket_close(void *base,
162             AMQP_UNUSED amqp_socket_close_enum force) {
163 1           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
164 1 50         if (-1 == self->sockfd) {
165 1           return AMQP_STATUS_SOCKET_CLOSED;
166             }
167              
168 0 0         if (amqp_os_socket_close(self->sockfd)) {
169 0           return AMQP_STATUS_SOCKET_ERROR;
170             }
171 0           self->sockfd = -1;
172              
173 0           return AMQP_STATUS_OK;
174             }
175              
176 1           static int amqp_tcp_socket_get_sockfd(void *base) {
177 1           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
178 1           return self->sockfd;
179             }
180              
181 1           static void amqp_tcp_socket_delete(void *base) {
182 1           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
183              
184 1 50         if (self) {
185 1           amqp_tcp_socket_close(self, AMQP_SC_NONE);
186 1           free(self);
187             }
188 1           }
189              
190             static const struct amqp_socket_class_t amqp_tcp_socket_class = {
191             amqp_tcp_socket_send, /* send */
192             amqp_tcp_socket_recv, /* recv */
193             amqp_tcp_socket_open, /* open */
194             amqp_tcp_socket_close, /* close */
195             amqp_tcp_socket_get_sockfd, /* get_sockfd */
196             amqp_tcp_socket_delete /* delete */
197             };
198              
199 1           amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) {
200 1           struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
201 1 50         if (!self) {
202 0           return NULL;
203             }
204 1           self->klass = &amqp_tcp_socket_class;
205 1           self->sockfd = -1;
206              
207 1           amqp_set_socket(state, (amqp_socket_t *)self);
208              
209 1           return (amqp_socket_t *)self;
210             }
211              
212 0           void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) {
213             struct amqp_tcp_socket_t *self;
214 0 0         if (base->klass != &amqp_tcp_socket_class) {
215 0           amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
216             }
217 0           self = (struct amqp_tcp_socket_t *)base;
218 0           self->sockfd = sockfd;
219 0           }