File Coverage

amqp_tcp_socket.c
Criterion Covered Total %
statement 61 74 82.4
branch 18 26 69.2
condition n/a
subroutine n/a
pod n/a
total 79 100 79.0


line stmt bran cond sub pod time code
1             /*
2             * Copyright 2012-2013 Michael Steinert
3             *
4             * Permission is hereby granted, free of charge, to any person obtaining a
5             * copy of this software and associated documentation files (the "Software"),
6             * to deal in the Software without restriction, including without limitation
7             * the rights to use, copy, modify, merge, publish, distribute, sublicense,
8             * and/or sell copies of the Software, and to permit persons to whom the
9             * Software is furnished to do so, subject to the following conditions:
10             *
11             * The above copyright notice and this permission notice shall be included in
12             * all copies or substantial portions of the Software.
13             *
14             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15             * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16             * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17             * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18             * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19             * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20             * DEALINGS IN THE SOFTWARE.
21             */
22              
23             #ifdef HAVE_CONFIG_H
24             #include "config.h"
25             #endif
26              
27             #include "amqp_private.h"
28             #include "amqp_tcp_socket.h"
29              
30             #include
31             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
32             #ifndef WIN32_LEAN_AND_MEAN
33             #define WIN32_LEAN_AND_MEAN
34             #endif
35             #include
36             #else
37             #include
38             #include
39             #include
40             #endif
41             #include
42             #include
43              
44             struct amqp_tcp_socket_t {
45             const struct amqp_socket_class_t *klass;
46             int sockfd;
47             int internal_error;
48             int state;
49             };
50              
51 515           static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len,
52             int flags) {
53 515           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
54             ssize_t res;
55 515           int flagz = 0;
56              
57 515 100         if (-1 == self->sockfd) {
58 3           return AMQP_STATUS_SOCKET_CLOSED;
59             }
60              
61             #ifdef MSG_NOSIGNAL
62 512           flagz |= MSG_NOSIGNAL;
63             #endif
64              
65             #if defined(MSG_MORE)
66 512 100         if (flags & AMQP_SF_MORE) {
67 74           flagz |= MSG_MORE;
68             }
69             /* Cygwin defines TCP_NOPUSH, but trying to use it will return not
70             * implemented. Disable it here. */
71             #elif defined(TCP_NOPUSH) && !defined(__CYGWIN__)
72             if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) {
73             int one = 1;
74             res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &one, sizeof(one));
75             if (0 != res) {
76             self->internal_error = res;
77             return AMQP_STATUS_SOCKET_ERROR;
78             }
79             self->state |= AMQP_SF_MORE;
80             } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) {
81             int zero = 0;
82             res =
83             setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero));
84             if (0 != res) {
85             self->internal_error = res;
86             res = AMQP_STATUS_SOCKET_ERROR;
87             } else {
88             self->state &= ~AMQP_SF_MORE;
89             }
90             }
91             #endif
92              
93             start:
94             #ifdef _WIN32
95             res = send(self->sockfd, buf, (int)len, flagz);
96             #else
97 512           res = send(self->sockfd, buf, len, flagz);
98             #endif
99              
100 512 100         if (res < 0) {
101 6           self->internal_error = amqp_os_socket_error();
102 6           switch (self->internal_error) {
103             case EINTR:
104 0           goto start;
105             #ifdef _WIN32
106             case WSAEWOULDBLOCK:
107             #else
108             case EWOULDBLOCK:
109             #endif
110             #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
111             case EAGAIN:
112             #endif
113 5           res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
114 5           break;
115             default:
116 6           res = AMQP_STATUS_SOCKET_ERROR;
117             }
118             } else {
119 506           self->internal_error = 0;
120             }
121              
122 512           return res;
123             }
124              
125 757           static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len,
126             int flags) {
127 757           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
128             ssize_t ret;
129 757 50         if (-1 == self->sockfd) {
130 0           return AMQP_STATUS_SOCKET_CLOSED;
131             }
132              
133             start:
134             #ifdef _WIN32
135             ret = recv(self->sockfd, buf, (int)len, flags);
136             #else
137 757           ret = recv(self->sockfd, buf, len, flags);
138             #endif
139              
140 757 100         if (0 > ret) {
141 378           self->internal_error = amqp_os_socket_error();
142 378           switch (self->internal_error) {
143             case EINTR:
144 0           goto start;
145             #ifdef _WIN32
146             case WSAEWOULDBLOCK:
147             #else
148             case EWOULDBLOCK:
149             #endif
150             #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
151             case EAGAIN:
152             #endif
153 378           ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD;
154 378           break;
155             default:
156 378           ret = AMQP_STATUS_SOCKET_ERROR;
157             }
158 379 50         } else if (0 == ret) {
159 0           ret = AMQP_STATUS_CONNECTION_CLOSED;
160             }
161              
162 757           return ret;
163             }
164              
165 37           static int amqp_tcp_socket_open(void *base, const char *host, int port,
166             const struct timeval *timeout) {
167 37           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
168 37 50         if (-1 != self->sockfd) {
169 0           return AMQP_STATUS_SOCKET_INUSE;
170             }
171 37           self->sockfd = amqp_open_socket_noblock(host, port, timeout);
172 37 100         if (0 > self->sockfd) {
173 1           int err = self->sockfd;
174 1           self->sockfd = -1;
175 1           return err;
176             }
177 36           return AMQP_STATUS_OK;
178             }
179              
180 42           static int amqp_tcp_socket_close(void *base,
181             AMQP_UNUSED amqp_socket_close_enum force) {
182 42           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
183 42 100         if (-1 == self->sockfd) {
184 6           return AMQP_STATUS_SOCKET_CLOSED;
185             }
186              
187 36 50         if (amqp_os_socket_close(self->sockfd)) {
188 0           return AMQP_STATUS_SOCKET_ERROR;
189             }
190 36           self->sockfd = -1;
191              
192 36           return AMQP_STATUS_OK;
193             }
194              
195 749           static int amqp_tcp_socket_get_sockfd(void *base) {
196 749           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
197 749           return self->sockfd;
198             }
199              
200 37           static void amqp_tcp_socket_delete(void *base) {
201 37           struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
202              
203 37 50         if (self) {
204 37           amqp_tcp_socket_close(self, AMQP_SC_NONE);
205 37           free(self);
206             }
207 37           }
208              
209             static const struct amqp_socket_class_t amqp_tcp_socket_class = {
210             amqp_tcp_socket_send, /* send */
211             amqp_tcp_socket_recv, /* recv */
212             amqp_tcp_socket_open, /* open */
213             amqp_tcp_socket_close, /* close */
214             amqp_tcp_socket_get_sockfd, /* get_sockfd */
215             amqp_tcp_socket_delete /* delete */
216             };
217              
218 37           amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) {
219 37           struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
220 37 50         if (!self) {
221 0           return NULL;
222             }
223 37           self->klass = &amqp_tcp_socket_class;
224 37           self->sockfd = -1;
225              
226 37           amqp_set_socket(state, (amqp_socket_t *)self);
227              
228 37           return (amqp_socket_t *)self;
229             }
230              
231 0           void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) {
232             struct amqp_tcp_socket_t *self;
233 0 0         if (base->klass != &amqp_tcp_socket_class) {
234 0           amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
235             }
236 0           self = (struct amqp_tcp_socket_t *)base;
237 0           self->sockfd = sockfd;
238 0           }