line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
/* |
2
|
|
|
|
|
|
|
* ***** BEGIN LICENSE BLOCK ***** |
3
|
|
|
|
|
|
|
* Version: MIT |
4
|
|
|
|
|
|
|
* |
5
|
|
|
|
|
|
|
* Portions created by Alan Antonuk are Copyright (c) 2012-2014 |
6
|
|
|
|
|
|
|
* Alan Antonuk. All Rights Reserved. |
7
|
|
|
|
|
|
|
* |
8
|
|
|
|
|
|
|
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. |
9
|
|
|
|
|
|
|
* All Rights Reserved. |
10
|
|
|
|
|
|
|
* |
11
|
|
|
|
|
|
|
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 |
12
|
|
|
|
|
|
|
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. |
13
|
|
|
|
|
|
|
* |
14
|
|
|
|
|
|
|
* Permission is hereby granted, free of charge, to any person |
15
|
|
|
|
|
|
|
* obtaining a copy of this software and associated documentation |
16
|
|
|
|
|
|
|
* files (the "Software"), to deal in the Software without |
17
|
|
|
|
|
|
|
* restriction, including without limitation the rights to use, copy, |
18
|
|
|
|
|
|
|
* modify, merge, publish, distribute, sublicense, and/or sell copies |
19
|
|
|
|
|
|
|
* of the Software, and to permit persons to whom the Software is |
20
|
|
|
|
|
|
|
* furnished to do so, subject to the following conditions: |
21
|
|
|
|
|
|
|
* |
22
|
|
|
|
|
|
|
* The above copyright notice and this permission notice shall be |
23
|
|
|
|
|
|
|
* included in all copies or substantial portions of the Software. |
24
|
|
|
|
|
|
|
* |
25
|
|
|
|
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
26
|
|
|
|
|
|
|
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
27
|
|
|
|
|
|
|
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
28
|
|
|
|
|
|
|
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS |
29
|
|
|
|
|
|
|
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
30
|
|
|
|
|
|
|
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN |
31
|
|
|
|
|
|
|
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
32
|
|
|
|
|
|
|
* SOFTWARE. |
33
|
|
|
|
|
|
|
* ***** END LICENSE BLOCK ***** |
34
|
|
|
|
|
|
|
*/ |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
#ifdef HAVE_CONFIG_H |
37
|
|
|
|
|
|
|
#include "config.h" |
38
|
|
|
|
|
|
|
#endif |
39
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
#ifdef _MSC_VER |
41
|
|
|
|
|
|
|
#define _CRT_SECURE_NO_WARNINGS |
42
|
|
|
|
|
|
|
#endif |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
#include "amqp_private.h" |
45
|
|
|
|
|
|
|
#include "amqp_socket.h" |
46
|
|
|
|
|
|
|
#include "amqp_table.h" |
47
|
|
|
|
|
|
|
#include "amqp_time.h" |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
#include |
50
|
|
|
|
|
|
|
#include |
51
|
|
|
|
|
|
|
#include |
52
|
|
|
|
|
|
|
#include |
53
|
|
|
|
|
|
|
#include |
54
|
|
|
|
|
|
|
#include |
55
|
|
|
|
|
|
|
#include |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
#include |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) |
60
|
|
|
|
|
|
|
#ifndef WIN32_LEAN_AND_MEAN |
61
|
|
|
|
|
|
|
#define WIN32_LEAN_AND_MEAN |
62
|
|
|
|
|
|
|
#endif |
63
|
|
|
|
|
|
|
#include |
64
|
|
|
|
|
|
|
#include |
65
|
|
|
|
|
|
|
#else |
66
|
|
|
|
|
|
|
#include |
67
|
|
|
|
|
|
|
/* On older BSD types.h must come before net includes */ |
68
|
|
|
|
|
|
|
#include |
69
|
|
|
|
|
|
|
#include |
70
|
|
|
|
|
|
|
#ifdef HAVE_SELECT |
71
|
|
|
|
|
|
|
#include |
72
|
|
|
|
|
|
|
#endif |
73
|
|
|
|
|
|
|
#include |
74
|
|
|
|
|
|
|
#include |
75
|
|
|
|
|
|
|
#include |
76
|
|
|
|
|
|
|
#include |
77
|
|
|
|
|
|
|
#ifdef HAVE_POLL |
78
|
|
|
|
|
|
|
#include |
79
|
|
|
|
|
|
|
#endif |
80
|
|
|
|
|
|
|
#include |
81
|
|
|
|
|
|
|
#endif |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
static int amqp_id_in_reply_list(amqp_method_number_t expected, |
84
|
|
|
|
|
|
|
amqp_method_number_t *list); |
85
|
|
|
|
|
|
|
|
86
|
38
|
|
|
|
|
|
static int amqp_os_socket_init(void) { |
87
|
|
|
|
|
|
|
#ifdef _WIN32 |
88
|
|
|
|
|
|
|
static int called_wsastartup = 0; |
89
|
|
|
|
|
|
|
if (!called_wsastartup) { |
90
|
|
|
|
|
|
|
WSADATA data; |
91
|
|
|
|
|
|
|
int res = WSAStartup(0x0202, &data); |
92
|
|
|
|
|
|
|
if (res) { |
93
|
|
|
|
|
|
|
return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR; |
94
|
|
|
|
|
|
|
} |
95
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
called_wsastartup = 1; |
97
|
|
|
|
|
|
|
} |
98
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
99
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
#else |
101
|
38
|
|
|
|
|
|
return AMQP_STATUS_OK; |
102
|
|
|
|
|
|
|
#endif |
103
|
|
|
|
|
|
|
} |
104
|
|
|
|
|
|
|
|
105
|
401
|
|
|
|
|
|
int amqp_os_socket_error(void) { |
106
|
|
|
|
|
|
|
#ifdef _WIN32 |
107
|
|
|
|
|
|
|
return WSAGetLastError(); |
108
|
|
|
|
|
|
|
#else |
109
|
401
|
|
|
|
|
|
return errno; |
110
|
|
|
|
|
|
|
#endif |
111
|
|
|
|
|
|
|
} |
112
|
|
|
|
|
|
|
|
113
|
36
|
|
|
|
|
|
int amqp_os_socket_close(int sockfd) { |
114
|
|
|
|
|
|
|
#ifdef _WIN32 |
115
|
|
|
|
|
|
|
return closesocket(sockfd); |
116
|
|
|
|
|
|
|
#else |
117
|
36
|
|
|
|
|
|
return close(sockfd); |
118
|
|
|
|
|
|
|
#endif |
119
|
|
|
|
|
|
|
} |
120
|
|
|
|
|
|
|
|
121
|
535
|
|
|
|
|
|
ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, |
122
|
|
|
|
|
|
|
int flags) { |
123
|
535
|
50
|
|
|
|
|
assert(self); |
124
|
535
|
50
|
|
|
|
|
assert(self->klass->send); |
125
|
535
|
|
|
|
|
|
return self->klass->send(self, buf, len, flags); |
126
|
|
|
|
|
|
|
} |
127
|
|
|
|
|
|
|
|
128
|
783
|
|
|
|
|
|
ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, |
129
|
|
|
|
|
|
|
int flags) { |
130
|
783
|
50
|
|
|
|
|
assert(self); |
131
|
783
|
50
|
|
|
|
|
assert(self->klass->recv); |
132
|
783
|
|
|
|
|
|
return self->klass->recv(self, buf, len, flags); |
133
|
|
|
|
|
|
|
} |
134
|
|
|
|
|
|
|
|
135
|
0
|
|
|
|
|
|
int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { |
136
|
0
|
0
|
|
|
|
|
assert(self); |
137
|
0
|
0
|
|
|
|
|
assert(self->klass->open); |
138
|
0
|
|
|
|
|
|
return self->klass->open(self, host, port, NULL); |
139
|
|
|
|
|
|
|
} |
140
|
|
|
|
|
|
|
|
141
|
38
|
|
|
|
|
|
int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, |
142
|
|
|
|
|
|
|
const struct timeval *timeout) { |
143
|
38
|
50
|
|
|
|
|
assert(self); |
144
|
38
|
50
|
|
|
|
|
assert(self->klass->open); |
145
|
38
|
|
|
|
|
|
return self->klass->open(self, host, port, timeout); |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
5
|
|
|
|
|
|
int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) { |
149
|
5
|
50
|
|
|
|
|
assert(self); |
150
|
5
|
50
|
|
|
|
|
assert(self->klass->close); |
151
|
5
|
|
|
|
|
|
return self->klass->close(self, force); |
152
|
|
|
|
|
|
|
} |
153
|
|
|
|
|
|
|
|
154
|
73
|
|
|
|
|
|
void amqp_socket_delete(amqp_socket_t *self) { |
155
|
73
|
100
|
|
|
|
|
if (self) { |
156
|
37
|
50
|
|
|
|
|
assert(self->klass->delete); |
157
|
37
|
|
|
|
|
|
self->klass->delete (self); |
158
|
|
|
|
|
|
|
} |
159
|
73
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
780
|
|
|
|
|
|
int amqp_socket_get_sockfd(amqp_socket_t *self) { |
162
|
780
|
50
|
|
|
|
|
assert(self); |
163
|
780
|
50
|
|
|
|
|
assert(self->klass->get_sockfd); |
164
|
780
|
|
|
|
|
|
return self->klass->get_sockfd(self); |
165
|
|
|
|
|
|
|
} |
166
|
|
|
|
|
|
|
|
167
|
438
|
|
|
|
|
|
int amqp_poll(int fd, int event, amqp_time_t deadline) { |
168
|
|
|
|
|
|
|
#ifdef HAVE_POLL |
169
|
|
|
|
|
|
|
struct pollfd pfd; |
170
|
|
|
|
|
|
|
int res; |
171
|
|
|
|
|
|
|
int timeout_ms; |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
/* Function should only ever be called with one of these two */ |
174
|
438
|
100
|
|
|
|
|
assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT); |
|
|
50
|
|
|
|
|
|
175
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
start_poll: |
177
|
438
|
|
|
|
|
|
pfd.fd = fd; |
178
|
438
|
|
|
|
|
|
switch (event) { |
179
|
|
|
|
|
|
|
case AMQP_SF_POLLIN: |
180
|
394
|
|
|
|
|
|
pfd.events = POLLIN; |
181
|
394
|
|
|
|
|
|
break; |
182
|
|
|
|
|
|
|
case AMQP_SF_POLLOUT: |
183
|
44
|
|
|
|
|
|
pfd.events = POLLOUT; |
184
|
44
|
|
|
|
|
|
break; |
185
|
|
|
|
|
|
|
} |
186
|
|
|
|
|
|
|
|
187
|
438
|
|
|
|
|
|
timeout_ms = amqp_time_ms_until(deadline); |
188
|
438
|
50
|
|
|
|
|
if (-1 > timeout_ms) { |
189
|
0
|
|
|
|
|
|
return timeout_ms; |
190
|
|
|
|
|
|
|
} |
191
|
|
|
|
|
|
|
|
192
|
438
|
|
|
|
|
|
res = poll(&pfd, 1, timeout_ms); |
193
|
|
|
|
|
|
|
|
194
|
438
|
100
|
|
|
|
|
if (0 < res) { |
195
|
|
|
|
|
|
|
/* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or |
196
|
|
|
|
|
|
|
* equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall |
197
|
|
|
|
|
|
|
* doesn't need to be made. */ |
198
|
433
|
|
|
|
|
|
return AMQP_STATUS_OK; |
199
|
5
|
50
|
|
|
|
|
} else if (0 == res) { |
200
|
5
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
201
|
|
|
|
|
|
|
} else { |
202
|
0
|
0
|
|
|
|
|
switch (amqp_os_socket_error()) { |
203
|
|
|
|
|
|
|
case EINTR: |
204
|
0
|
|
|
|
|
|
goto start_poll; |
205
|
|
|
|
|
|
|
default: |
206
|
438
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
207
|
|
|
|
|
|
|
} |
208
|
|
|
|
|
|
|
} |
209
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
210
|
|
|
|
|
|
|
#elif defined(HAVE_SELECT) |
211
|
|
|
|
|
|
|
fd_set fds; |
212
|
|
|
|
|
|
|
fd_set exceptfds; |
213
|
|
|
|
|
|
|
fd_set *exceptfdsp; |
214
|
|
|
|
|
|
|
int res; |
215
|
|
|
|
|
|
|
struct timeval tv; |
216
|
|
|
|
|
|
|
struct timeval *tvp; |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT))); |
219
|
|
|
|
|
|
|
#ifndef _WIN32 |
220
|
|
|
|
|
|
|
/* On Win32 connect() failure is indicated through the exceptfds, it does not |
221
|
|
|
|
|
|
|
* make any sense to allow POLLERR on any other platform or condition */ |
222
|
|
|
|
|
|
|
assert(0 == (event & AMQP_SF_POLLERR)); |
223
|
|
|
|
|
|
|
#endif |
224
|
|
|
|
|
|
|
|
225
|
|
|
|
|
|
|
start_select: |
226
|
|
|
|
|
|
|
FD_ZERO(&fds); |
227
|
|
|
|
|
|
|
FD_SET(fd, &fds); |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
if (event & AMQP_SF_POLLERR) { |
230
|
|
|
|
|
|
|
FD_ZERO(&exceptfds); |
231
|
|
|
|
|
|
|
FD_SET(fd, &exceptfds); |
232
|
|
|
|
|
|
|
exceptfdsp = &exceptfds; |
233
|
|
|
|
|
|
|
} else { |
234
|
|
|
|
|
|
|
exceptfdsp = NULL; |
235
|
|
|
|
|
|
|
} |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
res = amqp_time_tv_until(deadline, &tv, &tvp); |
238
|
|
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
239
|
|
|
|
|
|
|
return res; |
240
|
|
|
|
|
|
|
} |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
if (event & AMQP_SF_POLLIN) { |
243
|
|
|
|
|
|
|
res = select(fd + 1, &fds, NULL, exceptfdsp, tvp); |
244
|
|
|
|
|
|
|
} else if (event & AMQP_SF_POLLOUT) { |
245
|
|
|
|
|
|
|
res = select(fd + 1, NULL, &fds, exceptfdsp, tvp); |
246
|
|
|
|
|
|
|
} |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
if (0 < res) { |
249
|
|
|
|
|
|
|
return AMQP_STATUS_OK; |
250
|
|
|
|
|
|
|
} else if (0 == res) { |
251
|
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
252
|
|
|
|
|
|
|
} else { |
253
|
|
|
|
|
|
|
switch (amqp_os_socket_error()) { |
254
|
|
|
|
|
|
|
case EINTR: |
255
|
|
|
|
|
|
|
goto start_select; |
256
|
|
|
|
|
|
|
default: |
257
|
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
258
|
|
|
|
|
|
|
} |
259
|
|
|
|
|
|
|
} |
260
|
|
|
|
|
|
|
#else |
261
|
|
|
|
|
|
|
#error "poll() or select() is needed to compile rabbitmq-c" |
262
|
|
|
|
|
|
|
#endif |
263
|
|
|
|
|
|
|
} |
264
|
|
|
|
|
|
|
|
265
|
10
|
|
|
|
|
|
static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, |
266
|
|
|
|
|
|
|
amqp_time_t deadline) { |
267
|
10
|
|
|
|
|
|
int fd = amqp_get_sockfd(state); |
268
|
10
|
100
|
|
|
|
|
if (-1 == fd) { |
269
|
3
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_CLOSED; |
270
|
|
|
|
|
|
|
} |
271
|
7
|
|
|
|
|
|
switch (res) { |
272
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: |
273
|
0
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLIN, deadline); |
274
|
0
|
|
|
|
|
|
break; |
275
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: |
276
|
6
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline); |
277
|
6
|
|
|
|
|
|
break; |
278
|
|
|
|
|
|
|
} |
279
|
7
|
|
|
|
|
|
return res; |
280
|
|
|
|
|
|
|
} |
281
|
|
|
|
|
|
|
|
282
|
523
|
|
|
|
|
|
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, |
283
|
|
|
|
|
|
|
size_t len, amqp_time_t deadline, int flags) { |
284
|
|
|
|
|
|
|
ssize_t res; |
285
|
523
|
|
|
|
|
|
void *buf_left = (void *)buf; |
286
|
|
|
|
|
|
|
/* Assume that len is not going to be larger than ssize_t can hold. */ |
287
|
523
|
|
|
|
|
|
ssize_t len_left = (size_t)len; |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
start_send: |
290
|
535
|
|
|
|
|
|
res = amqp_socket_send(state->socket, buf_left, len_left, flags); |
291
|
|
|
|
|
|
|
|
292
|
535
|
100
|
|
|
|
|
if (res > 0) { |
293
|
525
|
|
|
|
|
|
len_left -= res; |
294
|
525
|
|
|
|
|
|
buf_left = (char *)buf_left + res; |
295
|
525
|
100
|
|
|
|
|
if (0 == len_left) { |
296
|
519
|
|
|
|
|
|
return (ssize_t)len; |
297
|
|
|
|
|
|
|
} |
298
|
6
|
|
|
|
|
|
goto start_send; |
299
|
|
|
|
|
|
|
} |
300
|
10
|
|
|
|
|
|
res = do_poll(state, res, deadline); |
301
|
10
|
100
|
|
|
|
|
if (AMQP_STATUS_OK == res) { |
302
|
6
|
|
|
|
|
|
goto start_send; |
303
|
|
|
|
|
|
|
} |
304
|
4
|
50
|
|
|
|
|
if (AMQP_STATUS_TIMEOUT == res) { |
305
|
0
|
|
|
|
|
|
return (ssize_t)len - len_left; |
306
|
|
|
|
|
|
|
} |
307
|
4
|
|
|
|
|
|
return res; |
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
|
310
|
0
|
|
|
|
|
|
int amqp_open_socket(char const *hostname, int portnumber) { |
311
|
0
|
|
|
|
|
|
return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite()); |
312
|
|
|
|
|
|
|
} |
313
|
|
|
|
|
|
|
|
314
|
37
|
|
|
|
|
|
int amqp_open_socket_noblock(char const *hostname, int portnumber, |
315
|
|
|
|
|
|
|
const struct timeval *timeout) { |
316
|
|
|
|
|
|
|
amqp_time_t deadline; |
317
|
37
|
|
|
|
|
|
int res = amqp_time_from_now(&deadline, timeout); |
318
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
319
|
0
|
|
|
|
|
|
return res; |
320
|
|
|
|
|
|
|
} |
321
|
37
|
|
|
|
|
|
return amqp_open_socket_inner(hostname, portnumber, deadline); |
322
|
|
|
|
|
|
|
} |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
#ifdef _WIN32 |
325
|
|
|
|
|
|
|
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { |
326
|
|
|
|
|
|
|
int one = 1; |
327
|
|
|
|
|
|
|
SOCKET sockfd; |
328
|
|
|
|
|
|
|
int last_error; |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
/* |
331
|
|
|
|
|
|
|
* This cast is to squash warnings on Win64, see: |
332
|
|
|
|
|
|
|
* http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 |
333
|
|
|
|
|
|
|
*/ |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); |
336
|
|
|
|
|
|
|
if (INVALID_SOCKET == sockfd) { |
337
|
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
338
|
|
|
|
|
|
|
} |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
/* Set the socket to be non-blocking */ |
341
|
|
|
|
|
|
|
if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) { |
342
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
343
|
|
|
|
|
|
|
goto err; |
344
|
|
|
|
|
|
|
} |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
/* Disable nagle */ |
347
|
|
|
|
|
|
|
if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, |
348
|
|
|
|
|
|
|
(const char *)&one, sizeof(one))) { |
349
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
350
|
|
|
|
|
|
|
goto err; |
351
|
|
|
|
|
|
|
} |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
/* Enable TCP keepalives */ |
354
|
|
|
|
|
|
|
if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, |
355
|
|
|
|
|
|
|
(const char *)&one, sizeof(one))) { |
356
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
357
|
|
|
|
|
|
|
goto err; |
358
|
|
|
|
|
|
|
} |
359
|
|
|
|
|
|
|
|
360
|
|
|
|
|
|
|
if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) { |
361
|
|
|
|
|
|
|
return (int)sockfd; |
362
|
|
|
|
|
|
|
} |
363
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
if (WSAEWOULDBLOCK != WSAGetLastError()) { |
365
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
366
|
|
|
|
|
|
|
goto err; |
367
|
|
|
|
|
|
|
} |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
last_error = |
370
|
|
|
|
|
|
|
amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline); |
371
|
|
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
372
|
|
|
|
|
|
|
goto err; |
373
|
|
|
|
|
|
|
} |
374
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
{ |
376
|
|
|
|
|
|
|
int result; |
377
|
|
|
|
|
|
|
int result_len = sizeof(result); |
378
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, |
380
|
|
|
|
|
|
|
(char *)&result, &result_len) || |
381
|
|
|
|
|
|
|
result != 0) { |
382
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
383
|
|
|
|
|
|
|
goto err; |
384
|
|
|
|
|
|
|
} |
385
|
|
|
|
|
|
|
} |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
return (int)sockfd; |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
err: |
390
|
|
|
|
|
|
|
closesocket(sockfd); |
391
|
|
|
|
|
|
|
return last_error; |
392
|
|
|
|
|
|
|
} |
393
|
|
|
|
|
|
|
#else |
394
|
38
|
|
|
|
|
|
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { |
395
|
38
|
|
|
|
|
|
int one = 1; |
396
|
|
|
|
|
|
|
int sockfd; |
397
|
|
|
|
|
|
|
int flags; |
398
|
|
|
|
|
|
|
int last_error; |
399
|
|
|
|
|
|
|
|
400
|
38
|
|
|
|
|
|
sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); |
401
|
38
|
50
|
|
|
|
|
if (-1 == sockfd) { |
402
|
0
|
|
|
|
|
|
return AMQP_STATUS_SOCKET_ERROR; |
403
|
|
|
|
|
|
|
} |
404
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
/* Enable CLOEXEC on socket */ |
406
|
38
|
|
|
|
|
|
flags = fcntl(sockfd, F_GETFD); |
407
|
38
|
50
|
|
|
|
|
if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { |
|
|
50
|
|
|
|
|
|
408
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
409
|
0
|
|
|
|
|
|
goto err; |
410
|
|
|
|
|
|
|
} |
411
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
/* Set the socket as non-blocking */ |
413
|
38
|
|
|
|
|
|
flags = fcntl(sockfd, F_GETFL); |
414
|
38
|
50
|
|
|
|
|
if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) { |
|
|
50
|
|
|
|
|
|
415
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
416
|
0
|
|
|
|
|
|
goto err; |
417
|
|
|
|
|
|
|
} |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
#ifdef SO_NOSIGPIPE |
420
|
|
|
|
|
|
|
/* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */ |
421
|
|
|
|
|
|
|
if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { |
422
|
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
423
|
|
|
|
|
|
|
goto err; |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
#endif /* SO_NOSIGPIPE */ |
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
/* Disable nagle */ |
428
|
38
|
50
|
|
|
|
|
if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { |
429
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
430
|
0
|
|
|
|
|
|
goto err; |
431
|
|
|
|
|
|
|
} |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
/* Enable TCP keepalives */ |
434
|
38
|
50
|
|
|
|
|
if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) { |
435
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
436
|
0
|
|
|
|
|
|
goto err; |
437
|
|
|
|
|
|
|
} |
438
|
|
|
|
|
|
|
|
439
|
38
|
50
|
|
|
|
|
if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { |
440
|
0
|
|
|
|
|
|
return sockfd; |
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
|
443
|
38
|
50
|
|
|
|
|
if (EINPROGRESS != errno) { |
444
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
445
|
0
|
|
|
|
|
|
goto err; |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
|
448
|
38
|
|
|
|
|
|
last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline); |
449
|
38
|
100
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
450
|
1
|
|
|
|
|
|
goto err; |
451
|
|
|
|
|
|
|
} |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
{ |
454
|
|
|
|
|
|
|
int result; |
455
|
37
|
|
|
|
|
|
socklen_t result_len = sizeof(result); |
456
|
|
|
|
|
|
|
|
457
|
37
|
50
|
|
|
|
|
if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) || |
|
|
50
|
|
|
|
|
|
458
|
37
|
|
|
|
|
|
result != 0) { |
459
|
0
|
|
|
|
|
|
last_error = AMQP_STATUS_SOCKET_ERROR; |
460
|
0
|
|
|
|
|
|
goto err; |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
} |
463
|
|
|
|
|
|
|
|
464
|
37
|
|
|
|
|
|
return sockfd; |
465
|
|
|
|
|
|
|
|
466
|
|
|
|
|
|
|
err: |
467
|
1
|
|
|
|
|
|
close(sockfd); |
468
|
38
|
|
|
|
|
|
return last_error; |
469
|
|
|
|
|
|
|
} |
470
|
|
|
|
|
|
|
#endif |
471
|
|
|
|
|
|
|
|
472
|
38
|
|
|
|
|
|
int amqp_open_socket_inner(char const *hostname, int portnumber, |
473
|
|
|
|
|
|
|
amqp_time_t deadline) { |
474
|
|
|
|
|
|
|
struct addrinfo hint; |
475
|
|
|
|
|
|
|
struct addrinfo *address_list; |
476
|
|
|
|
|
|
|
struct addrinfo *addr; |
477
|
|
|
|
|
|
|
char portnumber_string[33]; |
478
|
38
|
|
|
|
|
|
int sockfd = -1; |
479
|
|
|
|
|
|
|
int last_error; |
480
|
|
|
|
|
|
|
|
481
|
38
|
|
|
|
|
|
last_error = amqp_os_socket_init(); |
482
|
38
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != last_error) { |
483
|
0
|
|
|
|
|
|
return last_error; |
484
|
|
|
|
|
|
|
} |
485
|
|
|
|
|
|
|
|
486
|
38
|
|
|
|
|
|
memset(&hint, 0, sizeof(hint)); |
487
|
38
|
|
|
|
|
|
hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */ |
488
|
38
|
|
|
|
|
|
hint.ai_socktype = SOCK_STREAM; |
489
|
38
|
|
|
|
|
|
hint.ai_protocol = IPPROTO_TCP; |
490
|
|
|
|
|
|
|
|
491
|
38
|
|
|
|
|
|
(void)sprintf(portnumber_string, "%d", portnumber); |
492
|
|
|
|
|
|
|
|
493
|
38
|
|
|
|
|
|
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); |
494
|
38
|
50
|
|
|
|
|
if (0 != last_error) { |
495
|
0
|
|
|
|
|
|
return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED; |
496
|
|
|
|
|
|
|
} |
497
|
|
|
|
|
|
|
|
498
|
38
|
50
|
|
|
|
|
for (addr = address_list; addr; addr = addr->ai_next) { |
499
|
38
|
|
|
|
|
|
sockfd = connect_socket(addr, deadline); |
500
|
|
|
|
|
|
|
|
501
|
38
|
100
|
|
|
|
|
if (sockfd >= 0) { |
502
|
37
|
|
|
|
|
|
last_error = AMQP_STATUS_OK; |
503
|
37
|
|
|
|
|
|
break; |
504
|
1
|
50
|
|
|
|
|
} else if (sockfd == AMQP_STATUS_TIMEOUT) { |
505
|
1
|
|
|
|
|
|
last_error = sockfd; |
506
|
1
|
|
|
|
|
|
break; |
507
|
|
|
|
|
|
|
} |
508
|
|
|
|
|
|
|
} |
509
|
|
|
|
|
|
|
|
510
|
38
|
|
|
|
|
|
freeaddrinfo(address_list); |
511
|
38
|
100
|
|
|
|
|
if (last_error != AMQP_STATUS_OK || sockfd == -1) { |
|
|
50
|
|
|
|
|
|
512
|
1
|
|
|
|
|
|
return last_error; |
513
|
|
|
|
|
|
|
} |
514
|
38
|
|
|
|
|
|
return sockfd; |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
|
517
|
37
|
|
|
|
|
|
static int send_header_inner(amqp_connection_state_t state, |
518
|
|
|
|
|
|
|
amqp_time_t deadline) { |
519
|
|
|
|
|
|
|
ssize_t res; |
520
|
|
|
|
|
|
|
static const uint8_t header[8] = {'A', |
521
|
|
|
|
|
|
|
'M', |
522
|
|
|
|
|
|
|
'Q', |
523
|
|
|
|
|
|
|
'P', |
524
|
|
|
|
|
|
|
0, |
525
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_MAJOR, |
526
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_MINOR, |
527
|
|
|
|
|
|
|
AMQP_PROTOCOL_VERSION_REVISION}; |
528
|
37
|
|
|
|
|
|
res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE); |
529
|
37
|
50
|
|
|
|
|
if (sizeof(header) == res) { |
530
|
37
|
|
|
|
|
|
return AMQP_STATUS_OK; |
531
|
|
|
|
|
|
|
} |
532
|
0
|
|
|
|
|
|
return (int)res; |
533
|
|
|
|
|
|
|
} |
534
|
|
|
|
|
|
|
|
535
|
0
|
|
|
|
|
|
int amqp_send_header(amqp_connection_state_t state) { |
536
|
0
|
|
|
|
|
|
return send_header_inner(state, amqp_time_infinite()); |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
|
539
|
74
|
|
|
|
|
|
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { |
540
|
|
|
|
|
|
|
amqp_bytes_t res; |
541
|
|
|
|
|
|
|
|
542
|
74
|
|
|
|
|
|
switch (method) { |
543
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_PLAIN: |
544
|
74
|
|
|
|
|
|
res = amqp_cstring_bytes("PLAIN"); |
545
|
74
|
|
|
|
|
|
break; |
546
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_EXTERNAL: |
547
|
0
|
|
|
|
|
|
res = amqp_cstring_bytes("EXTERNAL"); |
548
|
0
|
|
|
|
|
|
break; |
549
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
default: |
551
|
0
|
|
|
|
|
|
amqp_abort("Invalid SASL method: %d", (int)method); |
552
|
|
|
|
|
|
|
} |
553
|
|
|
|
|
|
|
|
554
|
74
|
|
|
|
|
|
return res; |
555
|
|
|
|
|
|
|
} |
556
|
|
|
|
|
|
|
|
557
|
37
|
|
|
|
|
|
static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) { |
558
|
37
|
50
|
|
|
|
|
if (l.len == r.len) { |
559
|
37
|
50
|
|
|
|
|
if (l.bytes && r.bytes) { |
|
|
50
|
|
|
|
|
|
560
|
37
|
50
|
|
|
|
|
if (0 == memcmp(l.bytes, r.bytes, l.len)) { |
561
|
37
|
|
|
|
|
|
return 1; |
562
|
|
|
|
|
|
|
} |
563
|
|
|
|
|
|
|
} |
564
|
|
|
|
|
|
|
} |
565
|
0
|
|
|
|
|
|
return 0; |
566
|
|
|
|
|
|
|
} |
567
|
|
|
|
|
|
|
|
568
|
37
|
|
|
|
|
|
int sasl_mechanism_in_list(amqp_bytes_t mechanisms, |
569
|
|
|
|
|
|
|
amqp_sasl_method_enum method) { |
570
|
|
|
|
|
|
|
amqp_bytes_t mechanism; |
571
|
|
|
|
|
|
|
amqp_bytes_t supported_mechanism; |
572
|
|
|
|
|
|
|
uint8_t *start; |
573
|
|
|
|
|
|
|
uint8_t *end; |
574
|
|
|
|
|
|
|
uint8_t *current; |
575
|
|
|
|
|
|
|
|
576
|
37
|
50
|
|
|
|
|
assert(NULL != mechanisms.bytes); |
577
|
|
|
|
|
|
|
|
578
|
37
|
|
|
|
|
|
mechanism = sasl_method_name(method); |
579
|
|
|
|
|
|
|
|
580
|
37
|
|
|
|
|
|
start = (uint8_t *)mechanisms.bytes; |
581
|
37
|
|
|
|
|
|
current = start; |
582
|
37
|
|
|
|
|
|
end = start + mechanisms.len; |
583
|
|
|
|
|
|
|
|
584
|
37
|
50
|
|
|
|
|
for (; current != end; start = current + 1) { |
585
|
|
|
|
|
|
|
/* HACK: SASL states that we should be parsing this string as a UTF-8 |
586
|
|
|
|
|
|
|
* string, which we're plainly not doing here. At this point its not worth |
587
|
|
|
|
|
|
|
* dragging an entire UTF-8 parser for this one case, and this should work |
588
|
|
|
|
|
|
|
* most of the time */ |
589
|
37
|
|
|
|
|
|
current = memchr(start, ' ', end - start); |
590
|
37
|
50
|
|
|
|
|
if (NULL == current) { |
591
|
0
|
|
|
|
|
|
current = end; |
592
|
|
|
|
|
|
|
} |
593
|
37
|
|
|
|
|
|
supported_mechanism.bytes = start; |
594
|
37
|
|
|
|
|
|
supported_mechanism.len = current - start; |
595
|
37
|
50
|
|
|
|
|
if (bytes_equal(mechanism, supported_mechanism)) { |
596
|
37
|
|
|
|
|
|
return 1; |
597
|
|
|
|
|
|
|
} |
598
|
|
|
|
|
|
|
} |
599
|
|
|
|
|
|
|
|
600
|
37
|
|
|
|
|
|
return 0; |
601
|
|
|
|
|
|
|
} |
602
|
|
|
|
|
|
|
|
603
|
37
|
|
|
|
|
|
static amqp_bytes_t sasl_response(amqp_pool_t *pool, |
604
|
|
|
|
|
|
|
amqp_sasl_method_enum method, va_list args) { |
605
|
|
|
|
|
|
|
amqp_bytes_t response; |
606
|
|
|
|
|
|
|
|
607
|
37
|
|
|
|
|
|
switch (method) { |
608
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_PLAIN: { |
609
|
37
|
50
|
|
|
|
|
char *username = va_arg(args, char *); |
610
|
37
|
|
|
|
|
|
size_t username_len = strlen(username); |
611
|
37
|
50
|
|
|
|
|
char *password = va_arg(args, char *); |
612
|
37
|
|
|
|
|
|
size_t password_len = strlen(password); |
613
|
|
|
|
|
|
|
char *response_buf; |
614
|
|
|
|
|
|
|
|
615
|
37
|
|
|
|
|
|
amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, |
616
|
|
|
|
|
|
|
&response); |
617
|
37
|
50
|
|
|
|
|
if (response.bytes == NULL) |
618
|
|
|
|
|
|
|
/* We never request a zero-length block, because of the +2 |
619
|
|
|
|
|
|
|
above, so a NULL here really is ENOMEM. */ |
620
|
|
|
|
|
|
|
{ |
621
|
0
|
|
|
|
|
|
return response; |
622
|
|
|
|
|
|
|
} |
623
|
|
|
|
|
|
|
|
624
|
37
|
|
|
|
|
|
response_buf = response.bytes; |
625
|
37
|
|
|
|
|
|
response_buf[0] = 0; |
626
|
37
|
|
|
|
|
|
memcpy(response_buf + 1, username, username_len); |
627
|
37
|
|
|
|
|
|
response_buf[username_len + 1] = 0; |
628
|
37
|
|
|
|
|
|
memcpy(response_buf + username_len + 2, password, password_len); |
629
|
37
|
|
|
|
|
|
break; |
630
|
|
|
|
|
|
|
} |
631
|
|
|
|
|
|
|
case AMQP_SASL_METHOD_EXTERNAL: { |
632
|
0
|
0
|
|
|
|
|
char *identity = va_arg(args, char *); |
633
|
0
|
|
|
|
|
|
size_t identity_len = strlen(identity); |
634
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
|
amqp_pool_alloc_bytes(pool, identity_len, &response); |
636
|
0
|
0
|
|
|
|
|
if (response.bytes == NULL) { |
637
|
0
|
|
|
|
|
|
return response; |
638
|
|
|
|
|
|
|
} |
639
|
|
|
|
|
|
|
|
640
|
0
|
|
|
|
|
|
memcpy(response.bytes, identity, identity_len); |
641
|
0
|
|
|
|
|
|
break; |
642
|
|
|
|
|
|
|
} |
643
|
|
|
|
|
|
|
default: |
644
|
0
|
|
|
|
|
|
amqp_abort("Invalid SASL method: %d", (int)method); |
645
|
|
|
|
|
|
|
} |
646
|
|
|
|
|
|
|
|
647
|
37
|
|
|
|
|
|
return response; |
648
|
|
|
|
|
|
|
} |
649
|
|
|
|
|
|
|
|
650
|
0
|
|
|
|
|
|
amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { |
651
|
0
|
|
|
|
|
|
return (state->first_queued_frame != NULL); |
652
|
|
|
|
|
|
|
} |
653
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
/* |
655
|
|
|
|
|
|
|
* Check to see if we have data in our buffer. If this returns 1, we |
656
|
|
|
|
|
|
|
* will avoid an immediate blocking read in amqp_simple_wait_frame. |
657
|
|
|
|
|
|
|
*/ |
658
|
940
|
|
|
|
|
|
amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { |
659
|
940
|
|
|
|
|
|
return (state->sock_inbound_offset < state->sock_inbound_limit); |
660
|
|
|
|
|
|
|
} |
661
|
|
|
|
|
|
|
|
662
|
485
|
|
|
|
|
|
static int consume_one_frame(amqp_connection_state_t state, |
663
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
664
|
|
|
|
|
|
|
int res; |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
amqp_bytes_t buffer; |
667
|
485
|
|
|
|
|
|
buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; |
668
|
485
|
|
|
|
|
|
buffer.bytes = |
669
|
485
|
|
|
|
|
|
((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; |
670
|
|
|
|
|
|
|
|
671
|
485
|
|
|
|
|
|
res = amqp_handle_input(state, buffer, decoded_frame); |
672
|
485
|
50
|
|
|
|
|
if (res < 0) { |
673
|
0
|
|
|
|
|
|
return res; |
674
|
|
|
|
|
|
|
} |
675
|
|
|
|
|
|
|
|
676
|
485
|
|
|
|
|
|
state->sock_inbound_offset += res; |
677
|
|
|
|
|
|
|
|
678
|
485
|
|
|
|
|
|
return AMQP_STATUS_OK; |
679
|
|
|
|
|
|
|
} |
680
|
|
|
|
|
|
|
|
681
|
395
|
|
|
|
|
|
static int recv_with_timeout(amqp_connection_state_t state, |
682
|
|
|
|
|
|
|
amqp_time_t timeout) { |
683
|
|
|
|
|
|
|
ssize_t res; |
684
|
|
|
|
|
|
|
int fd; |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
start_recv: |
687
|
783
|
|
|
|
|
|
res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, |
688
|
|
|
|
|
|
|
state->sock_inbound_buffer.len, 0); |
689
|
|
|
|
|
|
|
|
690
|
783
|
100
|
|
|
|
|
if (res < 0) { |
691
|
392
|
|
|
|
|
|
fd = amqp_get_sockfd(state); |
692
|
392
|
50
|
|
|
|
|
if (-1 == fd) { |
693
|
0
|
|
|
|
|
|
return AMQP_STATUS_CONNECTION_CLOSED; |
694
|
|
|
|
|
|
|
} |
695
|
392
|
|
|
|
|
|
switch (res) { |
696
|
|
|
|
|
|
|
default: |
697
|
0
|
|
|
|
|
|
return (int)res; |
698
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: |
699
|
392
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLIN, timeout); |
700
|
392
|
|
|
|
|
|
break; |
701
|
|
|
|
|
|
|
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: |
702
|
0
|
|
|
|
|
|
res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout); |
703
|
0
|
|
|
|
|
|
break; |
704
|
|
|
|
|
|
|
} |
705
|
392
|
100
|
|
|
|
|
if (AMQP_STATUS_OK == res) { |
706
|
388
|
|
|
|
|
|
goto start_recv; |
707
|
|
|
|
|
|
|
} |
708
|
4
|
|
|
|
|
|
return (int)res; |
709
|
|
|
|
|
|
|
} |
710
|
|
|
|
|
|
|
|
711
|
391
|
|
|
|
|
|
state->sock_inbound_limit = res; |
712
|
391
|
|
|
|
|
|
state->sock_inbound_offset = 0; |
713
|
|
|
|
|
|
|
|
714
|
391
|
|
|
|
|
|
res = amqp_time_s_from_now(&state->next_recv_heartbeat, |
715
|
|
|
|
|
|
|
amqp_heartbeat_recv(state)); |
716
|
391
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
717
|
0
|
|
|
|
|
|
return (int)res; |
718
|
|
|
|
|
|
|
} |
719
|
391
|
|
|
|
|
|
return AMQP_STATUS_OK; |
720
|
|
|
|
|
|
|
} |
721
|
|
|
|
|
|
|
|
722
|
2
|
|
|
|
|
|
int amqp_try_recv(amqp_connection_state_t state) { |
723
|
|
|
|
|
|
|
amqp_time_t timeout; |
724
|
|
|
|
|
|
|
|
725
|
22
|
100
|
|
|
|
|
while (amqp_data_in_buffer(state)) { |
726
|
|
|
|
|
|
|
amqp_frame_t frame; |
727
|
20
|
|
|
|
|
|
int res = consume_one_frame(state, &frame); |
728
|
|
|
|
|
|
|
|
729
|
20
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
730
|
0
|
|
|
|
|
|
return res; |
731
|
|
|
|
|
|
|
} |
732
|
|
|
|
|
|
|
|
733
|
20
|
50
|
|
|
|
|
if (frame.frame_type != 0) { |
734
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
735
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
736
|
|
|
|
|
|
|
amqp_link_t *link; |
737
|
|
|
|
|
|
|
|
738
|
20
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); |
739
|
20
|
50
|
|
|
|
|
if (NULL == channel_pool) { |
740
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
741
|
|
|
|
|
|
|
} |
742
|
|
|
|
|
|
|
|
743
|
20
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
744
|
20
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
745
|
|
|
|
|
|
|
|
746
|
20
|
50
|
|
|
|
|
if (frame_copy == NULL || link == NULL) { |
|
|
50
|
|
|
|
|
|
747
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
748
|
|
|
|
|
|
|
} |
749
|
|
|
|
|
|
|
|
750
|
20
|
|
|
|
|
|
*frame_copy = frame; |
751
|
|
|
|
|
|
|
|
752
|
20
|
|
|
|
|
|
link->next = NULL; |
753
|
20
|
|
|
|
|
|
link->data = frame_copy; |
754
|
|
|
|
|
|
|
|
755
|
20
|
100
|
|
|
|
|
if (state->last_queued_frame == NULL) { |
756
|
1
|
|
|
|
|
|
state->first_queued_frame = link; |
757
|
|
|
|
|
|
|
} else { |
758
|
19
|
|
|
|
|
|
state->last_queued_frame->next = link; |
759
|
|
|
|
|
|
|
} |
760
|
20
|
|
|
|
|
|
state->last_queued_frame = link; |
761
|
|
|
|
|
|
|
} |
762
|
|
|
|
|
|
|
} |
763
|
2
|
|
|
|
|
|
int res = amqp_time_s_from_now(&timeout, 0); |
764
|
2
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
765
|
0
|
|
|
|
|
|
return res; |
766
|
|
|
|
|
|
|
} |
767
|
|
|
|
|
|
|
|
768
|
2
|
|
|
|
|
|
return recv_with_timeout(state, timeout); |
769
|
|
|
|
|
|
|
} |
770
|
|
|
|
|
|
|
|
771
|
460
|
|
|
|
|
|
static int wait_frame_inner(amqp_connection_state_t state, |
772
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame, |
773
|
|
|
|
|
|
|
amqp_time_t timeout_deadline) { |
774
|
|
|
|
|
|
|
amqp_time_t deadline; |
775
|
|
|
|
|
|
|
int res; |
776
|
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
for (;;) { |
778
|
859
|
100
|
|
|
|
|
while (amqp_data_in_buffer(state)) { |
779
|
465
|
|
|
|
|
|
res = consume_one_frame(state, decoded_frame); |
780
|
|
|
|
|
|
|
|
781
|
465
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
782
|
0
|
|
|
|
|
|
return res; |
783
|
|
|
|
|
|
|
} |
784
|
|
|
|
|
|
|
|
785
|
465
|
100
|
|
|
|
|
if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { |
786
|
5
|
|
|
|
|
|
amqp_maybe_release_buffers_on_channel(state, 0); |
787
|
5
|
|
|
|
|
|
continue; |
788
|
|
|
|
|
|
|
} |
789
|
|
|
|
|
|
|
|
790
|
460
|
100
|
|
|
|
|
if (decoded_frame->frame_type != 0) { |
791
|
|
|
|
|
|
|
/* Complete frame was read. Return it. */ |
792
|
455
|
|
|
|
|
|
return AMQP_STATUS_OK; |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
} |
795
|
|
|
|
|
|
|
|
796
|
|
|
|
|
|
|
beginrecv: |
797
|
394
|
|
|
|
|
|
res = amqp_time_has_past(state->next_send_heartbeat); |
798
|
394
|
50
|
|
|
|
|
if (AMQP_STATUS_TIMER_FAILURE == res) { |
799
|
0
|
|
|
|
|
|
return res; |
800
|
394
|
100
|
|
|
|
|
} else if (AMQP_STATUS_TIMEOUT == res) { |
801
|
|
|
|
|
|
|
amqp_frame_t heartbeat; |
802
|
2
|
|
|
|
|
|
heartbeat.channel = 0; |
803
|
2
|
|
|
|
|
|
heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; |
804
|
|
|
|
|
|
|
|
805
|
2
|
|
|
|
|
|
res = amqp_send_frame(state, &heartbeat); |
806
|
2
|
100
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
807
|
2
|
|
|
|
|
|
return res; |
808
|
|
|
|
|
|
|
} |
809
|
|
|
|
|
|
|
} |
810
|
393
|
|
|
|
|
|
deadline = amqp_time_first(timeout_deadline, |
811
|
|
|
|
|
|
|
amqp_time_first(state->next_recv_heartbeat, |
812
|
|
|
|
|
|
|
state->next_send_heartbeat)); |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
/* TODO this needs to wait for a _frame_ and not anything written from the |
815
|
|
|
|
|
|
|
* socket */ |
816
|
393
|
|
|
|
|
|
res = recv_with_timeout(state, deadline); |
817
|
|
|
|
|
|
|
|
818
|
393
|
100
|
|
|
|
|
if (AMQP_STATUS_TIMEOUT == res) { |
819
|
4
|
50
|
|
|
|
|
if (amqp_time_equal(deadline, state->next_recv_heartbeat)) { |
820
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
821
|
0
|
|
|
|
|
|
return AMQP_STATUS_HEARTBEAT_TIMEOUT; |
822
|
4
|
50
|
|
|
|
|
} else if (amqp_time_equal(deadline, timeout_deadline)) { |
823
|
4
|
|
|
|
|
|
return AMQP_STATUS_TIMEOUT; |
824
|
0
|
0
|
|
|
|
|
} else if (amqp_time_equal(deadline, state->next_send_heartbeat)) { |
825
|
|
|
|
|
|
|
/* send heartbeat happens before we do recv_with_timeout */ |
826
|
0
|
|
|
|
|
|
goto beginrecv; |
827
|
|
|
|
|
|
|
} else { |
828
|
0
|
|
|
|
|
|
amqp_abort("Internal error: unable to determine timeout reason"); |
829
|
|
|
|
|
|
|
} |
830
|
389
|
50
|
|
|
|
|
} else if (AMQP_STATUS_OK != res) { |
831
|
0
|
|
|
|
|
|
return res; |
832
|
|
|
|
|
|
|
} |
833
|
849
|
|
|
|
|
|
} |
834
|
|
|
|
|
|
|
} |
835
|
|
|
|
|
|
|
|
836
|
0
|
|
|
|
|
|
static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state, |
837
|
|
|
|
|
|
|
amqp_frame_t *frame) { |
838
|
|
|
|
|
|
|
amqp_link_t *link; |
839
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
840
|
|
|
|
|
|
|
|
841
|
0
|
|
|
|
|
|
amqp_pool_t *channel_pool = |
842
|
0
|
|
|
|
|
|
amqp_get_or_create_channel_pool(state, frame->channel); |
843
|
|
|
|
|
|
|
|
844
|
0
|
0
|
|
|
|
|
if (NULL == channel_pool) { |
845
|
0
|
|
|
|
|
|
return NULL; |
846
|
|
|
|
|
|
|
} |
847
|
|
|
|
|
|
|
|
848
|
0
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
849
|
0
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
850
|
|
|
|
|
|
|
|
851
|
0
|
0
|
|
|
|
|
if (NULL == link || NULL == frame_copy) { |
|
|
0
|
|
|
|
|
|
852
|
0
|
|
|
|
|
|
return NULL; |
853
|
|
|
|
|
|
|
} |
854
|
|
|
|
|
|
|
|
855
|
0
|
|
|
|
|
|
*frame_copy = *frame; |
856
|
0
|
|
|
|
|
|
link->data = frame_copy; |
857
|
|
|
|
|
|
|
|
858
|
0
|
|
|
|
|
|
return link; |
859
|
|
|
|
|
|
|
} |
860
|
|
|
|
|
|
|
|
861
|
0
|
|
|
|
|
|
int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) { |
862
|
0
|
|
|
|
|
|
amqp_link_t *link = amqp_create_link_for_frame(state, frame); |
863
|
0
|
0
|
|
|
|
|
if (NULL == link) { |
864
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
865
|
|
|
|
|
|
|
} |
866
|
|
|
|
|
|
|
|
867
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
868
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
869
|
|
|
|
|
|
|
} else { |
870
|
0
|
|
|
|
|
|
state->last_queued_frame->next = link; |
871
|
|
|
|
|
|
|
} |
872
|
|
|
|
|
|
|
|
873
|
0
|
|
|
|
|
|
link->next = NULL; |
874
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
875
|
|
|
|
|
|
|
|
876
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
877
|
|
|
|
|
|
|
} |
878
|
|
|
|
|
|
|
|
879
|
0
|
|
|
|
|
|
int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) { |
880
|
0
|
|
|
|
|
|
amqp_link_t *link = amqp_create_link_for_frame(state, frame); |
881
|
0
|
0
|
|
|
|
|
if (NULL == link) { |
882
|
0
|
|
|
|
|
|
return AMQP_STATUS_NO_MEMORY; |
883
|
|
|
|
|
|
|
} |
884
|
|
|
|
|
|
|
|
885
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
886
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
887
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
888
|
0
|
|
|
|
|
|
link->next = NULL; |
889
|
|
|
|
|
|
|
} else { |
890
|
0
|
|
|
|
|
|
link->next = state->first_queued_frame; |
891
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
892
|
|
|
|
|
|
|
} |
893
|
|
|
|
|
|
|
|
894
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
895
|
|
|
|
|
|
|
} |
896
|
|
|
|
|
|
|
|
897
|
59
|
|
|
|
|
|
int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, |
898
|
|
|
|
|
|
|
amqp_channel_t channel, |
899
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
900
|
|
|
|
|
|
|
amqp_frame_t *frame_ptr; |
901
|
|
|
|
|
|
|
amqp_link_t *cur; |
902
|
|
|
|
|
|
|
int res; |
903
|
|
|
|
|
|
|
|
904
|
59
|
50
|
|
|
|
|
for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { |
905
|
0
|
|
|
|
|
|
frame_ptr = cur->data; |
906
|
|
|
|
|
|
|
|
907
|
0
|
0
|
|
|
|
|
if (channel == frame_ptr->channel) { |
908
|
0
|
|
|
|
|
|
state->first_queued_frame = cur->next; |
909
|
0
|
0
|
|
|
|
|
if (NULL == state->first_queued_frame) { |
910
|
0
|
|
|
|
|
|
state->last_queued_frame = NULL; |
911
|
|
|
|
|
|
|
} |
912
|
|
|
|
|
|
|
|
913
|
0
|
|
|
|
|
|
*decoded_frame = *frame_ptr; |
914
|
|
|
|
|
|
|
|
915
|
0
|
|
|
|
|
|
return AMQP_STATUS_OK; |
916
|
|
|
|
|
|
|
} |
917
|
|
|
|
|
|
|
} |
918
|
|
|
|
|
|
|
|
919
|
|
|
|
|
|
|
for (;;) { |
920
|
59
|
|
|
|
|
|
res = wait_frame_inner(state, decoded_frame, amqp_time_infinite()); |
921
|
|
|
|
|
|
|
|
922
|
59
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
923
|
0
|
|
|
|
|
|
return res; |
924
|
|
|
|
|
|
|
} |
925
|
|
|
|
|
|
|
|
926
|
59
|
50
|
|
|
|
|
if (channel == decoded_frame->channel) { |
927
|
59
|
|
|
|
|
|
return AMQP_STATUS_OK; |
928
|
|
|
|
|
|
|
} else { |
929
|
0
|
|
|
|
|
|
res = amqp_queue_frame(state, decoded_frame); |
930
|
0
|
0
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
931
|
0
|
|
|
|
|
|
return res; |
932
|
|
|
|
|
|
|
} |
933
|
|
|
|
|
|
|
} |
934
|
0
|
|
|
|
|
|
} |
935
|
|
|
|
|
|
|
} |
936
|
|
|
|
|
|
|
|
937
|
21
|
|
|
|
|
|
int amqp_simple_wait_frame(amqp_connection_state_t state, |
938
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame) { |
939
|
21
|
|
|
|
|
|
return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL); |
940
|
|
|
|
|
|
|
} |
941
|
|
|
|
|
|
|
|
942
|
121
|
|
|
|
|
|
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, |
943
|
|
|
|
|
|
|
amqp_frame_t *decoded_frame, |
944
|
|
|
|
|
|
|
const struct timeval *timeout) { |
945
|
|
|
|
|
|
|
amqp_time_t deadline; |
946
|
|
|
|
|
|
|
|
947
|
121
|
|
|
|
|
|
int res = amqp_time_from_now(&deadline, timeout); |
948
|
121
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
949
|
0
|
|
|
|
|
|
return res; |
950
|
|
|
|
|
|
|
} |
951
|
|
|
|
|
|
|
|
952
|
121
|
100
|
|
|
|
|
if (state->first_queued_frame != NULL) { |
953
|
20
|
|
|
|
|
|
amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data; |
954
|
20
|
|
|
|
|
|
state->first_queued_frame = state->first_queued_frame->next; |
955
|
20
|
100
|
|
|
|
|
if (state->first_queued_frame == NULL) { |
956
|
1
|
|
|
|
|
|
state->last_queued_frame = NULL; |
957
|
|
|
|
|
|
|
} |
958
|
20
|
|
|
|
|
|
*decoded_frame = *f; |
959
|
20
|
|
|
|
|
|
return AMQP_STATUS_OK; |
960
|
|
|
|
|
|
|
} else { |
961
|
121
|
|
|
|
|
|
return wait_frame_inner(state, decoded_frame, deadline); |
962
|
|
|
|
|
|
|
} |
963
|
|
|
|
|
|
|
} |
964
|
|
|
|
|
|
|
|
965
|
74
|
|
|
|
|
|
static int amqp_simple_wait_method_list(amqp_connection_state_t state, |
966
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
967
|
|
|
|
|
|
|
amqp_method_number_t *expected_methods, |
968
|
|
|
|
|
|
|
amqp_time_t deadline, |
969
|
|
|
|
|
|
|
amqp_method_t *output) { |
970
|
|
|
|
|
|
|
amqp_frame_t frame; |
971
|
|
|
|
|
|
|
struct timeval tv; |
972
|
|
|
|
|
|
|
struct timeval *tvp; |
973
|
|
|
|
|
|
|
|
974
|
74
|
|
|
|
|
|
int res = amqp_time_tv_until(deadline, &tv, &tvp); |
975
|
74
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
976
|
0
|
|
|
|
|
|
return res; |
977
|
|
|
|
|
|
|
} |
978
|
|
|
|
|
|
|
|
979
|
74
|
|
|
|
|
|
res = amqp_simple_wait_frame_noblock(state, &frame, tvp); |
980
|
74
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
981
|
0
|
|
|
|
|
|
return res; |
982
|
|
|
|
|
|
|
} |
983
|
|
|
|
|
|
|
|
984
|
74
|
50
|
|
|
|
|
if (AMQP_FRAME_METHOD != frame.frame_type || |
|
|
50
|
|
|
|
|
|
985
|
74
|
50
|
|
|
|
|
expected_channel != frame.channel || |
986
|
74
|
|
|
|
|
|
!amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) { |
987
|
0
|
|
|
|
|
|
return AMQP_STATUS_WRONG_METHOD; |
988
|
|
|
|
|
|
|
} |
989
|
74
|
|
|
|
|
|
*output = frame.payload.method; |
990
|
74
|
|
|
|
|
|
return AMQP_STATUS_OK; |
991
|
|
|
|
|
|
|
} |
992
|
|
|
|
|
|
|
|
993
|
37
|
|
|
|
|
|
static int simple_wait_method_inner(amqp_connection_state_t state, |
994
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
995
|
|
|
|
|
|
|
amqp_method_number_t expected_method, |
996
|
|
|
|
|
|
|
amqp_time_t deadline, |
997
|
|
|
|
|
|
|
amqp_method_t *output) { |
998
|
37
|
|
|
|
|
|
amqp_method_number_t expected_methods[] = {expected_method, 0}; |
999
|
37
|
|
|
|
|
|
return amqp_simple_wait_method_list(state, expected_channel, expected_methods, |
1000
|
|
|
|
|
|
|
deadline, output); |
1001
|
|
|
|
|
|
|
} |
1002
|
|
|
|
|
|
|
|
1003
|
0
|
|
|
|
|
|
int amqp_simple_wait_method(amqp_connection_state_t state, |
1004
|
|
|
|
|
|
|
amqp_channel_t expected_channel, |
1005
|
|
|
|
|
|
|
amqp_method_number_t expected_method, |
1006
|
|
|
|
|
|
|
amqp_method_t *output) { |
1007
|
0
|
|
|
|
|
|
return simple_wait_method_inner(state, expected_channel, expected_method, |
1008
|
|
|
|
|
|
|
amqp_time_infinite(), output); |
1009
|
|
|
|
|
|
|
} |
1010
|
|
|
|
|
|
|
|
1011
|
306
|
|
|
|
|
|
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, |
1012
|
|
|
|
|
|
|
amqp_method_number_t id, void *decoded) { |
1013
|
306
|
|
|
|
|
|
return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE, |
1014
|
|
|
|
|
|
|
amqp_time_infinite()); |
1015
|
|
|
|
|
|
|
} |
1016
|
|
|
|
|
|
|
|
1017
|
409
|
|
|
|
|
|
int amqp_send_method_inner(amqp_connection_state_t state, |
1018
|
|
|
|
|
|
|
amqp_channel_t channel, amqp_method_number_t id, |
1019
|
|
|
|
|
|
|
void *decoded, int flags, amqp_time_t deadline) { |
1020
|
|
|
|
|
|
|
amqp_frame_t frame; |
1021
|
|
|
|
|
|
|
|
1022
|
409
|
|
|
|
|
|
frame.frame_type = AMQP_FRAME_METHOD; |
1023
|
409
|
|
|
|
|
|
frame.channel = channel; |
1024
|
409
|
|
|
|
|
|
frame.payload.method.id = id; |
1025
|
409
|
|
|
|
|
|
frame.payload.method.decoded = decoded; |
1026
|
409
|
|
|
|
|
|
return amqp_send_frame_inner(state, &frame, flags, deadline); |
1027
|
|
|
|
|
|
|
} |
1028
|
|
|
|
|
|
|
|
1029
|
674
|
|
|
|
|
|
static int amqp_id_in_reply_list(amqp_method_number_t expected, |
1030
|
|
|
|
|
|
|
amqp_method_number_t *list) { |
1031
|
720
|
100
|
|
|
|
|
while (*list != 0) { |
1032
|
710
|
100
|
|
|
|
|
if (*list == expected) { |
1033
|
664
|
|
|
|
|
|
return 1; |
1034
|
|
|
|
|
|
|
} |
1035
|
46
|
|
|
|
|
|
list++; |
1036
|
|
|
|
|
|
|
} |
1037
|
10
|
|
|
|
|
|
return 0; |
1038
|
|
|
|
|
|
|
} |
1039
|
|
|
|
|
|
|
|
1040
|
302
|
|
|
|
|
|
static amqp_rpc_reply_t simple_rpc_inner( |
1041
|
|
|
|
|
|
|
amqp_connection_state_t state, amqp_channel_t channel, |
1042
|
|
|
|
|
|
|
amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, |
1043
|
|
|
|
|
|
|
void *decoded_request_method, amqp_time_t deadline) { |
1044
|
|
|
|
|
|
|
int status; |
1045
|
|
|
|
|
|
|
amqp_rpc_reply_t result; |
1046
|
|
|
|
|
|
|
|
1047
|
302
|
|
|
|
|
|
memset(&result, 0, sizeof(result)); |
1048
|
|
|
|
|
|
|
|
1049
|
302
|
|
|
|
|
|
status = amqp_send_method(state, channel, request_id, decoded_request_method); |
1050
|
302
|
100
|
|
|
|
|
if (status < 0) { |
1051
|
2
|
|
|
|
|
|
return amqp_rpc_reply_error(status); |
1052
|
|
|
|
|
|
|
} |
1053
|
|
|
|
|
|
|
|
1054
|
|
|
|
|
|
|
{ |
1055
|
|
|
|
|
|
|
amqp_frame_t frame; |
1056
|
|
|
|
|
|
|
|
1057
|
|
|
|
|
|
|
retry: |
1058
|
300
|
|
|
|
|
|
status = wait_frame_inner(state, &frame, deadline); |
1059
|
300
|
50
|
|
|
|
|
if (status < 0) { |
1060
|
0
|
0
|
|
|
|
|
if (status == AMQP_STATUS_TIMEOUT) { |
1061
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
1062
|
|
|
|
|
|
|
} |
1063
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(status); |
1064
|
|
|
|
|
|
|
} |
1065
|
|
|
|
|
|
|
|
1066
|
|
|
|
|
|
|
/* |
1067
|
|
|
|
|
|
|
* We store the frame for later processing unless it's something |
1068
|
|
|
|
|
|
|
* that directly affects us here, namely a method frame that is |
1069
|
|
|
|
|
|
|
* either |
1070
|
|
|
|
|
|
|
* - on the channel we want, and of the expected type, or |
1071
|
|
|
|
|
|
|
* - on the channel we want, and a channel.close frame, or |
1072
|
|
|
|
|
|
|
* - on channel zero, and a connection.close frame. |
1073
|
|
|
|
|
|
|
*/ |
1074
|
300
|
50
|
|
|
|
|
if (!((frame.frame_type == AMQP_FRAME_METHOD) && |
|
|
50
|
|
|
|
|
|
1075
|
300
|
100
|
|
|
|
|
(((frame.channel == channel) && |
1076
|
300
|
|
|
|
|
|
(amqp_id_in_reply_list(frame.payload.method.id, |
1077
|
5
|
50
|
|
|
|
|
expected_reply_ids) || |
1078
|
0
|
0
|
|
|
|
|
(frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || |
1079
|
0
|
0
|
|
|
|
|
((frame.channel == 0) && |
1080
|
0
|
|
|
|
|
|
(frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) { |
1081
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
1082
|
|
|
|
|
|
|
amqp_frame_t *frame_copy; |
1083
|
|
|
|
|
|
|
amqp_link_t *link; |
1084
|
|
|
|
|
|
|
|
1085
|
0
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); |
1086
|
0
|
0
|
|
|
|
|
if (NULL == channel_pool) { |
1087
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); |
1088
|
|
|
|
|
|
|
} |
1089
|
|
|
|
|
|
|
|
1090
|
0
|
|
|
|
|
|
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); |
1091
|
0
|
|
|
|
|
|
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); |
1092
|
|
|
|
|
|
|
|
1093
|
0
|
0
|
|
|
|
|
if (frame_copy == NULL || link == NULL) { |
|
|
0
|
|
|
|
|
|
1094
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); |
1095
|
|
|
|
|
|
|
} |
1096
|
|
|
|
|
|
|
|
1097
|
0
|
|
|
|
|
|
*frame_copy = frame; |
1098
|
|
|
|
|
|
|
|
1099
|
0
|
|
|
|
|
|
link->next = NULL; |
1100
|
0
|
|
|
|
|
|
link->data = frame_copy; |
1101
|
|
|
|
|
|
|
|
1102
|
0
|
0
|
|
|
|
|
if (state->last_queued_frame == NULL) { |
1103
|
0
|
|
|
|
|
|
state->first_queued_frame = link; |
1104
|
|
|
|
|
|
|
} else { |
1105
|
0
|
|
|
|
|
|
state->last_queued_frame->next = link; |
1106
|
|
|
|
|
|
|
} |
1107
|
0
|
|
|
|
|
|
state->last_queued_frame = link; |
1108
|
|
|
|
|
|
|
|
1109
|
0
|
|
|
|
|
|
goto retry; |
1110
|
|
|
|
|
|
|
} |
1111
|
|
|
|
|
|
|
|
1112
|
300
|
100
|
|
|
|
|
result.reply_type = |
1113
|
300
|
|
|
|
|
|
(amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) |
1114
|
|
|
|
|
|
|
? AMQP_RESPONSE_NORMAL |
1115
|
|
|
|
|
|
|
: AMQP_RESPONSE_SERVER_EXCEPTION; |
1116
|
|
|
|
|
|
|
|
1117
|
300
|
|
|
|
|
|
result.reply = frame.payload.method; |
1118
|
302
|
|
|
|
|
|
return result; |
1119
|
|
|
|
|
|
|
} |
1120
|
|
|
|
|
|
|
} |
1121
|
|
|
|
|
|
|
|
1122
|
54
|
|
|
|
|
|
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, |
1123
|
|
|
|
|
|
|
amqp_channel_t channel, |
1124
|
|
|
|
|
|
|
amqp_method_number_t request_id, |
1125
|
|
|
|
|
|
|
amqp_method_number_t *expected_reply_ids, |
1126
|
|
|
|
|
|
|
void *decoded_request_method) { |
1127
|
|
|
|
|
|
|
amqp_time_t deadline; |
1128
|
|
|
|
|
|
|
int res; |
1129
|
|
|
|
|
|
|
|
1130
|
54
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, state->rpc_timeout); |
1131
|
54
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
1132
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(res); |
1133
|
|
|
|
|
|
|
} |
1134
|
|
|
|
|
|
|
|
1135
|
54
|
|
|
|
|
|
return simple_rpc_inner(state, channel, request_id, expected_reply_ids, |
1136
|
|
|
|
|
|
|
decoded_request_method, deadline); |
1137
|
|
|
|
|
|
|
} |
1138
|
|
|
|
|
|
|
|
1139
|
211
|
|
|
|
|
|
void *amqp_simple_rpc_decoded(amqp_connection_state_t state, |
1140
|
|
|
|
|
|
|
amqp_channel_t channel, |
1141
|
|
|
|
|
|
|
amqp_method_number_t request_id, |
1142
|
|
|
|
|
|
|
amqp_method_number_t reply_id, |
1143
|
|
|
|
|
|
|
void *decoded_request_method) { |
1144
|
|
|
|
|
|
|
amqp_time_t deadline; |
1145
|
|
|
|
|
|
|
int res; |
1146
|
|
|
|
|
|
|
amqp_method_number_t replies[2]; |
1147
|
|
|
|
|
|
|
|
1148
|
211
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, state->rpc_timeout); |
1149
|
211
|
50
|
|
|
|
|
if (res != AMQP_STATUS_OK) { |
1150
|
0
|
|
|
|
|
|
state->most_recent_api_result = amqp_rpc_reply_error(res); |
1151
|
0
|
|
|
|
|
|
return NULL; |
1152
|
|
|
|
|
|
|
} |
1153
|
|
|
|
|
|
|
|
1154
|
211
|
|
|
|
|
|
replies[0] = reply_id; |
1155
|
211
|
|
|
|
|
|
replies[1] = 0; |
1156
|
|
|
|
|
|
|
|
1157
|
211
|
|
|
|
|
|
state->most_recent_api_result = simple_rpc_inner( |
1158
|
|
|
|
|
|
|
state, channel, request_id, replies, decoded_request_method, deadline); |
1159
|
|
|
|
|
|
|
|
1160
|
211
|
100
|
|
|
|
|
if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { |
1161
|
206
|
|
|
|
|
|
return state->most_recent_api_result.reply.decoded; |
1162
|
|
|
|
|
|
|
} else { |
1163
|
211
|
|
|
|
|
|
return NULL; |
1164
|
|
|
|
|
|
|
} |
1165
|
|
|
|
|
|
|
} |
1166
|
|
|
|
|
|
|
|
1167
|
191
|
|
|
|
|
|
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { |
1168
|
191
|
|
|
|
|
|
return state->most_recent_api_result; |
1169
|
|
|
|
|
|
|
} |
1170
|
|
|
|
|
|
|
|
1171
|
|
|
|
|
|
|
/* |
1172
|
|
|
|
|
|
|
* Merge base and add tables. If the two tables contain an entry with the same |
1173
|
|
|
|
|
|
|
* key, the entry from the add table takes precedence. For entries that are both |
1174
|
|
|
|
|
|
|
* tables with the same key, the table is recursively merged. |
1175
|
|
|
|
|
|
|
*/ |
1176
|
37
|
|
|
|
|
|
int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add, |
1177
|
|
|
|
|
|
|
amqp_table_t *result, amqp_pool_t *pool) { |
1178
|
|
|
|
|
|
|
int i; |
1179
|
|
|
|
|
|
|
int res; |
1180
|
|
|
|
|
|
|
amqp_pool_t temp_pool; |
1181
|
|
|
|
|
|
|
amqp_table_t temp_result; |
1182
|
37
|
50
|
|
|
|
|
assert(base != NULL); |
1183
|
37
|
50
|
|
|
|
|
assert(result != NULL); |
1184
|
37
|
50
|
|
|
|
|
assert(pool != NULL); |
1185
|
|
|
|
|
|
|
|
1186
|
37
|
50
|
|
|
|
|
if (NULL == add) { |
1187
|
0
|
|
|
|
|
|
return amqp_table_clone(base, result, pool); |
1188
|
|
|
|
|
|
|
} |
1189
|
|
|
|
|
|
|
|
1190
|
37
|
|
|
|
|
|
init_amqp_pool(&temp_pool, 4096); |
1191
|
37
|
|
|
|
|
|
temp_result.num_entries = 0; |
1192
|
37
|
|
|
|
|
|
temp_result.entries = |
1193
|
37
|
|
|
|
|
|
amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) * |
1194
|
37
|
|
|
|
|
|
(base->num_entries + add->num_entries)); |
1195
|
37
|
50
|
|
|
|
|
if (NULL == temp_result.entries) { |
1196
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
1197
|
0
|
|
|
|
|
|
goto error_out; |
1198
|
|
|
|
|
|
|
} |
1199
|
259
|
100
|
|
|
|
|
for (i = 0; i < base->num_entries; ++i) { |
1200
|
222
|
|
|
|
|
|
temp_result.entries[temp_result.num_entries] = base->entries[i]; |
1201
|
222
|
|
|
|
|
|
temp_result.num_entries++; |
1202
|
|
|
|
|
|
|
} |
1203
|
37
|
50
|
|
|
|
|
for (i = 0; i < add->num_entries; ++i) { |
1204
|
0
|
|
|
|
|
|
amqp_table_entry_t *e = |
1205
|
0
|
|
|
|
|
|
amqp_table_get_entry_by_key(&temp_result, add->entries[i].key); |
1206
|
0
|
0
|
|
|
|
|
if (NULL != e) { |
1207
|
0
|
0
|
|
|
|
|
if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind && |
|
|
0
|
|
|
|
|
|
1208
|
0
|
|
|
|
|
|
AMQP_FIELD_KIND_TABLE == e->value.kind) { |
1209
|
0
|
|
|
|
|
|
amqp_table_entry_t *be = |
1210
|
0
|
|
|
|
|
|
amqp_table_get_entry_by_key(base, add->entries[i].key); |
1211
|
|
|
|
|
|
|
|
1212
|
0
|
|
|
|
|
|
res = amqp_merge_capabilities(&be->value.value.table, |
1213
|
0
|
|
|
|
|
|
&add->entries[i].value.value.table, |
1214
|
|
|
|
|
|
|
&e->value.value.table, &temp_pool); |
1215
|
0
|
0
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1216
|
0
|
|
|
|
|
|
goto error_out; |
1217
|
|
|
|
|
|
|
} |
1218
|
|
|
|
|
|
|
} else { |
1219
|
0
|
|
|
|
|
|
e->value = add->entries[i].value; |
1220
|
|
|
|
|
|
|
} |
1221
|
|
|
|
|
|
|
} else { |
1222
|
0
|
|
|
|
|
|
temp_result.entries[temp_result.num_entries] = add->entries[i]; |
1223
|
0
|
|
|
|
|
|
temp_result.num_entries++; |
1224
|
|
|
|
|
|
|
} |
1225
|
|
|
|
|
|
|
} |
1226
|
37
|
|
|
|
|
|
res = amqp_table_clone(&temp_result, result, pool); |
1227
|
|
|
|
|
|
|
error_out: |
1228
|
37
|
|
|
|
|
|
empty_amqp_pool(&temp_pool); |
1229
|
37
|
|
|
|
|
|
return res; |
1230
|
|
|
|
|
|
|
} |
1231
|
|
|
|
|
|
|
|
1232
|
37
|
|
|
|
|
|
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, |
1233
|
|
|
|
|
|
|
char const *vhost, int channel_max, |
1234
|
|
|
|
|
|
|
int frame_max, int heartbeat, |
1235
|
|
|
|
|
|
|
const amqp_table_t *client_properties, |
1236
|
|
|
|
|
|
|
const struct timeval *timeout, |
1237
|
|
|
|
|
|
|
amqp_sasl_method_enum sasl_method, |
1238
|
|
|
|
|
|
|
va_list vl) { |
1239
|
|
|
|
|
|
|
int res; |
1240
|
|
|
|
|
|
|
amqp_method_t method; |
1241
|
|
|
|
|
|
|
|
1242
|
|
|
|
|
|
|
uint16_t client_channel_max; |
1243
|
|
|
|
|
|
|
uint32_t client_frame_max; |
1244
|
|
|
|
|
|
|
uint16_t client_heartbeat; |
1245
|
|
|
|
|
|
|
|
1246
|
|
|
|
|
|
|
uint16_t server_channel_max; |
1247
|
|
|
|
|
|
|
uint32_t server_frame_max; |
1248
|
|
|
|
|
|
|
uint16_t server_heartbeat; |
1249
|
|
|
|
|
|
|
|
1250
|
|
|
|
|
|
|
amqp_rpc_reply_t result; |
1251
|
|
|
|
|
|
|
amqp_time_t deadline; |
1252
|
|
|
|
|
|
|
|
1253
|
37
|
50
|
|
|
|
|
if (channel_max < 0 || channel_max > UINT16_MAX) { |
|
|
50
|
|
|
|
|
|
1254
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
1255
|
|
|
|
|
|
|
} |
1256
|
37
|
|
|
|
|
|
client_channel_max = (uint16_t)channel_max; |
1257
|
|
|
|
|
|
|
|
1258
|
37
|
50
|
|
|
|
|
if (frame_max < 0) { |
1259
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
1260
|
|
|
|
|
|
|
} |
1261
|
37
|
|
|
|
|
|
client_frame_max = (uint32_t)frame_max; |
1262
|
|
|
|
|
|
|
|
1263
|
37
|
50
|
|
|
|
|
if (heartbeat < 0 || heartbeat > UINT16_MAX) { |
|
|
50
|
|
|
|
|
|
1264
|
0
|
|
|
|
|
|
return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
1265
|
|
|
|
|
|
|
} |
1266
|
37
|
|
|
|
|
|
client_heartbeat = (uint16_t)heartbeat; |
1267
|
|
|
|
|
|
|
|
1268
|
37
|
|
|
|
|
|
res = amqp_time_from_now(&deadline, timeout); |
1269
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1270
|
0
|
|
|
|
|
|
goto error_res; |
1271
|
|
|
|
|
|
|
} |
1272
|
|
|
|
|
|
|
|
1273
|
37
|
|
|
|
|
|
res = send_header_inner(state, deadline); |
1274
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1275
|
0
|
|
|
|
|
|
goto error_res; |
1276
|
|
|
|
|
|
|
} |
1277
|
|
|
|
|
|
|
|
1278
|
37
|
|
|
|
|
|
res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD, |
1279
|
|
|
|
|
|
|
deadline, &method); |
1280
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1281
|
0
|
|
|
|
|
|
goto error_res; |
1282
|
|
|
|
|
|
|
} |
1283
|
|
|
|
|
|
|
|
1284
|
|
|
|
|
|
|
{ |
1285
|
37
|
|
|
|
|
|
amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded; |
1286
|
37
|
50
|
|
|
|
|
if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || |
|
|
50
|
|
|
|
|
|
1287
|
37
|
|
|
|
|
|
(s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { |
1288
|
0
|
|
|
|
|
|
res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION; |
1289
|
0
|
|
|
|
|
|
goto error_res; |
1290
|
|
|
|
|
|
|
} |
1291
|
|
|
|
|
|
|
|
1292
|
37
|
|
|
|
|
|
res = amqp_table_clone(&s->server_properties, &state->server_properties, |
1293
|
|
|
|
|
|
|
&state->properties_pool); |
1294
|
|
|
|
|
|
|
|
1295
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1296
|
0
|
|
|
|
|
|
goto error_res; |
1297
|
|
|
|
|
|
|
} |
1298
|
|
|
|
|
|
|
|
1299
|
|
|
|
|
|
|
/* TODO: check that our chosen SASL mechanism is in the list of |
1300
|
|
|
|
|
|
|
acceptable mechanisms. Or even let the application choose from |
1301
|
|
|
|
|
|
|
the list! */ |
1302
|
37
|
50
|
|
|
|
|
if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) { |
1303
|
0
|
|
|
|
|
|
res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD; |
1304
|
0
|
|
|
|
|
|
goto error_res; |
1305
|
|
|
|
|
|
|
} |
1306
|
|
|
|
|
|
|
} |
1307
|
|
|
|
|
|
|
|
1308
|
|
|
|
|
|
|
{ |
1309
|
|
|
|
|
|
|
amqp_table_entry_t default_properties[6]; |
1310
|
|
|
|
|
|
|
amqp_table_t default_table; |
1311
|
|
|
|
|
|
|
amqp_table_entry_t client_capabilities[2]; |
1312
|
|
|
|
|
|
|
amqp_table_t client_capabilities_table; |
1313
|
|
|
|
|
|
|
amqp_connection_start_ok_t s; |
1314
|
|
|
|
|
|
|
amqp_pool_t *channel_pool; |
1315
|
|
|
|
|
|
|
amqp_bytes_t response_bytes; |
1316
|
|
|
|
|
|
|
|
1317
|
37
|
|
|
|
|
|
channel_pool = amqp_get_or_create_channel_pool(state, 0); |
1318
|
37
|
50
|
|
|
|
|
if (NULL == channel_pool) { |
1319
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
1320
|
0
|
|
|
|
|
|
goto error_res; |
1321
|
|
|
|
|
|
|
} |
1322
|
|
|
|
|
|
|
|
1323
|
37
|
|
|
|
|
|
response_bytes = sasl_response(channel_pool, sasl_method, vl); |
1324
|
37
|
50
|
|
|
|
|
if (response_bytes.bytes == NULL) { |
1325
|
0
|
|
|
|
|
|
res = AMQP_STATUS_NO_MEMORY; |
1326
|
0
|
|
|
|
|
|
goto error_res; |
1327
|
|
|
|
|
|
|
} |
1328
|
|
|
|
|
|
|
|
1329
|
37
|
|
|
|
|
|
client_capabilities[0] = |
1330
|
|
|
|
|
|
|
amqp_table_construct_bool_entry("authentication_failure_close", 1); |
1331
|
37
|
|
|
|
|
|
client_capabilities[1] = |
1332
|
|
|
|
|
|
|
amqp_table_construct_bool_entry("exchange_exchange_bindings", 1); |
1333
|
|
|
|
|
|
|
|
1334
|
37
|
|
|
|
|
|
client_capabilities_table.entries = client_capabilities; |
1335
|
37
|
|
|
|
|
|
client_capabilities_table.num_entries = |
1336
|
|
|
|
|
|
|
sizeof(client_capabilities) / sizeof(amqp_table_entry_t); |
1337
|
|
|
|
|
|
|
|
1338
|
37
|
|
|
|
|
|
default_properties[0] = |
1339
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("product", "rabbitmq-c"); |
1340
|
37
|
|
|
|
|
|
default_properties[1] = |
1341
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING); |
1342
|
37
|
|
|
|
|
|
default_properties[2] = |
1343
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM); |
1344
|
37
|
|
|
|
|
|
default_properties[3] = |
1345
|
|
|
|
|
|
|
amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT); |
1346
|
37
|
|
|
|
|
|
default_properties[4] = amqp_table_construct_utf8_entry( |
1347
|
|
|
|
|
|
|
"information", "See https://github.com/alanxz/rabbitmq-c"); |
1348
|
37
|
|
|
|
|
|
default_properties[5] = amqp_table_construct_table_entry( |
1349
|
|
|
|
|
|
|
"capabilities", &client_capabilities_table); |
1350
|
|
|
|
|
|
|
|
1351
|
37
|
|
|
|
|
|
default_table.entries = default_properties; |
1352
|
37
|
|
|
|
|
|
default_table.num_entries = |
1353
|
|
|
|
|
|
|
sizeof(default_properties) / sizeof(amqp_table_entry_t); |
1354
|
|
|
|
|
|
|
|
1355
|
37
|
|
|
|
|
|
res = amqp_merge_capabilities(&default_table, client_properties, |
1356
|
|
|
|
|
|
|
&state->client_properties, channel_pool); |
1357
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1358
|
0
|
|
|
|
|
|
goto error_res; |
1359
|
|
|
|
|
|
|
} |
1360
|
|
|
|
|
|
|
|
1361
|
37
|
|
|
|
|
|
s.client_properties = state->client_properties; |
1362
|
37
|
|
|
|
|
|
s.mechanism = sasl_method_name(sasl_method); |
1363
|
37
|
|
|
|
|
|
s.response = response_bytes; |
1364
|
37
|
|
|
|
|
|
s.locale = amqp_cstring_bytes("en_US"); |
1365
|
|
|
|
|
|
|
|
1366
|
37
|
|
|
|
|
|
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s, |
1367
|
|
|
|
|
|
|
AMQP_SF_NONE, deadline); |
1368
|
37
|
50
|
|
|
|
|
if (res < 0) { |
1369
|
0
|
|
|
|
|
|
goto error_res; |
1370
|
|
|
|
|
|
|
} |
1371
|
|
|
|
|
|
|
} |
1372
|
|
|
|
|
|
|
|
1373
|
37
|
|
|
|
|
|
amqp_release_buffers(state); |
1374
|
|
|
|
|
|
|
|
1375
|
|
|
|
|
|
|
{ |
1376
|
37
|
|
|
|
|
|
amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD, |
1377
|
|
|
|
|
|
|
AMQP_CONNECTION_CLOSE_METHOD, 0}; |
1378
|
|
|
|
|
|
|
|
1379
|
37
|
|
|
|
|
|
res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method); |
1380
|
37
|
50
|
|
|
|
|
if (AMQP_STATUS_OK != res) { |
1381
|
0
|
|
|
|
|
|
goto error_res; |
1382
|
|
|
|
|
|
|
} |
1383
|
|
|
|
|
|
|
} |
1384
|
|
|
|
|
|
|
|
1385
|
37
|
50
|
|
|
|
|
if (AMQP_CONNECTION_CLOSE_METHOD == method.id) { |
1386
|
0
|
|
|
|
|
|
result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; |
1387
|
0
|
|
|
|
|
|
result.reply = method; |
1388
|
0
|
|
|
|
|
|
result.library_error = 0; |
1389
|
0
|
|
|
|
|
|
goto out; |
1390
|
|
|
|
|
|
|
} |
1391
|
|
|
|
|
|
|
|
1392
|
|
|
|
|
|
|
{ |
1393
|
37
|
|
|
|
|
|
amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded; |
1394
|
37
|
|
|
|
|
|
server_channel_max = s->channel_max; |
1395
|
37
|
|
|
|
|
|
server_frame_max = s->frame_max; |
1396
|
37
|
|
|
|
|
|
server_heartbeat = s->heartbeat; |
1397
|
|
|
|
|
|
|
} |
1398
|
|
|
|
|
|
|
|
1399
|
37
|
50
|
|
|
|
|
if (server_channel_max != 0 && |
|
|
50
|
|
|
|
|
|
1400
|
37
|
50
|
|
|
|
|
(server_channel_max < client_channel_max || client_channel_max == 0)) { |
1401
|
37
|
|
|
|
|
|
client_channel_max = server_channel_max; |
1402
|
0
|
0
|
|
|
|
|
} else if (server_channel_max == 0 && client_channel_max == 0) { |
|
|
0
|
|
|
|
|
|
1403
|
0
|
|
|
|
|
|
client_channel_max = UINT16_MAX; |
1404
|
|
|
|
|
|
|
} |
1405
|
|
|
|
|
|
|
|
1406
|
37
|
50
|
|
|
|
|
if (server_frame_max != 0 && server_frame_max < client_frame_max) { |
|
|
50
|
|
|
|
|
|
1407
|
37
|
|
|
|
|
|
client_frame_max = server_frame_max; |
1408
|
|
|
|
|
|
|
} |
1409
|
|
|
|
|
|
|
|
1410
|
37
|
50
|
|
|
|
|
if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) { |
|
|
50
|
|
|
|
|
|
1411
|
0
|
|
|
|
|
|
client_heartbeat = server_heartbeat; |
1412
|
|
|
|
|
|
|
} |
1413
|
|
|
|
|
|
|
|
1414
|
37
|
|
|
|
|
|
res = amqp_tune_connection(state, client_channel_max, client_frame_max, |
1415
|
|
|
|
|
|
|
client_heartbeat); |
1416
|
37
|
50
|
|
|
|
|
if (res < 0) { |
1417
|
0
|
|
|
|
|
|
goto error_res; |
1418
|
|
|
|
|
|
|
} |
1419
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
{ |
1421
|
|
|
|
|
|
|
amqp_connection_tune_ok_t s; |
1422
|
37
|
|
|
|
|
|
s.frame_max = client_frame_max; |
1423
|
37
|
|
|
|
|
|
s.channel_max = client_channel_max; |
1424
|
37
|
|
|
|
|
|
s.heartbeat = client_heartbeat; |
1425
|
|
|
|
|
|
|
|
1426
|
37
|
|
|
|
|
|
res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s, |
1427
|
|
|
|
|
|
|
AMQP_SF_NONE, deadline); |
1428
|
37
|
50
|
|
|
|
|
if (res < 0) { |
1429
|
0
|
|
|
|
|
|
goto error_res; |
1430
|
|
|
|
|
|
|
} |
1431
|
|
|
|
|
|
|
} |
1432
|
|
|
|
|
|
|
|
1433
|
37
|
|
|
|
|
|
amqp_release_buffers(state); |
1434
|
|
|
|
|
|
|
|
1435
|
|
|
|
|
|
|
{ |
1436
|
37
|
|
|
|
|
|
amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0}; |
1437
|
|
|
|
|
|
|
amqp_connection_open_t s; |
1438
|
37
|
|
|
|
|
|
s.virtual_host = amqp_cstring_bytes(vhost); |
1439
|
37
|
|
|
|
|
|
s.capabilities = amqp_empty_bytes; |
1440
|
37
|
|
|
|
|
|
s.insist = 1; |
1441
|
|
|
|
|
|
|
|
1442
|
37
|
|
|
|
|
|
result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies, |
1443
|
|
|
|
|
|
|
&s, deadline); |
1444
|
37
|
50
|
|
|
|
|
if (result.reply_type != AMQP_RESPONSE_NORMAL) { |
1445
|
0
|
|
|
|
|
|
goto out; |
1446
|
|
|
|
|
|
|
} |
1447
|
|
|
|
|
|
|
} |
1448
|
|
|
|
|
|
|
|
1449
|
37
|
|
|
|
|
|
result.reply_type = AMQP_RESPONSE_NORMAL; |
1450
|
37
|
|
|
|
|
|
result.reply.id = 0; |
1451
|
37
|
|
|
|
|
|
result.reply.decoded = NULL; |
1452
|
37
|
|
|
|
|
|
result.library_error = 0; |
1453
|
37
|
|
|
|
|
|
amqp_maybe_release_buffers(state); |
1454
|
|
|
|
|
|
|
|
1455
|
|
|
|
|
|
|
out: |
1456
|
37
|
|
|
|
|
|
return result; |
1457
|
|
|
|
|
|
|
|
1458
|
|
|
|
|
|
|
error_res: |
1459
|
0
|
|
|
|
|
|
amqp_socket_close(state->socket, AMQP_SC_FORCE); |
1460
|
0
|
|
|
|
|
|
result = amqp_rpc_reply_error(res); |
1461
|
|
|
|
|
|
|
|
1462
|
37
|
|
|
|
|
|
goto out; |
1463
|
|
|
|
|
|
|
} |
1464
|
|
|
|
|
|
|
|
1465
|
0
|
|
|
|
|
|
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, |
1466
|
|
|
|
|
|
|
int channel_max, int frame_max, int heartbeat, |
1467
|
|
|
|
|
|
|
int sasl_method, ...) { |
1468
|
|
|
|
|
|
|
va_list vl; |
1469
|
|
|
|
|
|
|
amqp_rpc_reply_t ret; |
1470
|
|
|
|
|
|
|
|
1471
|
0
|
|
|
|
|
|
va_start(vl, sasl_method); |
1472
|
|
|
|
|
|
|
|
1473
|
0
|
|
|
|
|
|
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, |
1474
|
0
|
|
|
|
|
|
&amqp_empty_table, state->handshake_timeout, |
1475
|
|
|
|
|
|
|
sasl_method, vl); |
1476
|
|
|
|
|
|
|
|
1477
|
0
|
|
|
|
|
|
va_end(vl); |
1478
|
|
|
|
|
|
|
|
1479
|
0
|
|
|
|
|
|
return ret; |
1480
|
|
|
|
|
|
|
} |
1481
|
|
|
|
|
|
|
|
1482
|
37
|
|
|
|
|
|
amqp_rpc_reply_t amqp_login_with_properties( |
1483
|
|
|
|
|
|
|
amqp_connection_state_t state, char const *vhost, int channel_max, |
1484
|
|
|
|
|
|
|
int frame_max, int heartbeat, const amqp_table_t *client_properties, |
1485
|
|
|
|
|
|
|
int sasl_method, ...) { |
1486
|
|
|
|
|
|
|
va_list vl; |
1487
|
|
|
|
|
|
|
amqp_rpc_reply_t ret; |
1488
|
|
|
|
|
|
|
|
1489
|
37
|
|
|
|
|
|
va_start(vl, sasl_method); |
1490
|
|
|
|
|
|
|
|
1491
|
37
|
|
|
|
|
|
ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, |
1492
|
37
|
|
|
|
|
|
client_properties, state->handshake_timeout, |
1493
|
|
|
|
|
|
|
sasl_method, vl); |
1494
|
|
|
|
|
|
|
|
1495
|
37
|
|
|
|
|
|
va_end(vl); |
1496
|
|
|
|
|
|
|
|
1497
|
37
|
|
|
|
|
|
return ret; |
1498
|
|
|
|
|
|
|
} |