File Coverage

amqp_socket.c
Criterion Covered Total %
statement 107 626 17.0
branch 37 316 11.7
condition n/a
subroutine n/a
pod n/a
total 144 942 15.2


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #ifdef HAVE_CONFIG_H
5             #include "config.h"
6             #endif
7              
8             #ifdef _MSC_VER
9             #define _CRT_SECURE_NO_WARNINGS
10             #endif
11              
12             #include "amqp_private.h"
13             #include "amqp_socket.h"
14             #include "amqp_table.h"
15             #include "amqp_time.h"
16              
17             #include
18             #include
19             #include
20             #include
21             #include
22             #include
23             #include
24              
25             #include
26              
27             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
28             #ifndef WIN32_LEAN_AND_MEAN
29             #define WIN32_LEAN_AND_MEAN
30             #endif
31             #include
32             #include
33             #else
34             #include
35             /* On older BSD types.h must come before net includes */
36             #include
37             #include
38             #ifdef HAVE_SELECT
39             #include
40             #endif
41             #include
42             #include
43             #include
44             #include
45             #ifdef HAVE_POLL
46             #include
47             #endif
48             #include
49             #endif
50              
51             static int amqp_id_in_reply_list(amqp_method_number_t expected,
52             amqp_method_number_t *list);
53              
54 1           static int amqp_os_socket_init(void) {
55             #ifdef _WIN32
56             static int called_wsastartup = 0;
57             if (!called_wsastartup) {
58             WSADATA data;
59             int res = WSAStartup(0x0202, &data);
60             if (res) {
61             return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR;
62             }
63              
64             called_wsastartup = 1;
65             }
66             return AMQP_STATUS_OK;
67              
68             #else
69 1           return AMQP_STATUS_OK;
70             #endif
71             }
72              
73 0           int amqp_os_socket_error(void) {
74             #ifdef _WIN32
75             return WSAGetLastError();
76             #else
77 0           return errno;
78             #endif
79             }
80              
81 0           int amqp_os_socket_close(int sockfd) {
82             #ifdef _WIN32
83             return closesocket(sockfd);
84             #else
85 0           return close(sockfd);
86             #endif
87             }
88              
89 1           ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len,
90             int flags) {
91 1 50         assert(self);
92 1 50         assert(self->klass->send);
93 1           return self->klass->send(self, buf, len, flags);
94             }
95              
96 0           ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len,
97             int flags) {
98 0 0         assert(self);
99 0 0         assert(self->klass->recv);
100 0           return self->klass->recv(self, buf, len, flags);
101             }
102              
103 0           int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
104 0 0         assert(self);
105 0 0         assert(self->klass->open);
106 0           return self->klass->open(self, host, port, NULL);
107             }
108              
109 1           int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port,
110             const struct timeval *timeout) {
111 1 50         assert(self);
112 1 50         assert(self->klass->open);
113 1           return self->klass->open(self, host, port, timeout);
114             }
115              
116 0           int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) {
117 0 0         assert(self);
118 0 0         assert(self->klass->close);
119 0           return self->klass->close(self, force);
120             }
121              
122 34           void amqp_socket_delete(amqp_socket_t *self) {
123 34 100         if (self) {
124 1 50         assert(self->klass->delete);
125 1           self->klass->delete (self);
126             }
127 34           }
128              
129 1           int amqp_socket_get_sockfd(amqp_socket_t *self) {
130 1 50         assert(self);
131 1 50         assert(self->klass->get_sockfd);
132 1           return self->klass->get_sockfd(self);
133             }
134              
135 1           int amqp_poll(int fd, int event, amqp_time_t deadline) {
136             #ifdef HAVE_POLL
137             struct pollfd pfd;
138             int res;
139             int timeout_ms;
140              
141             /* Function should only ever be called with one of these two */
142 1 50         assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT);
    50          
143              
144 1           start_poll:
145 1           pfd.fd = fd;
146 1           switch (event) {
147 0           case AMQP_SF_POLLIN:
148 0           pfd.events = POLLIN;
149 0           break;
150 1           case AMQP_SF_POLLOUT:
151 1           pfd.events = POLLOUT;
152 1           break;
153             }
154              
155 1           timeout_ms = amqp_time_ms_until(deadline);
156 1 50         if (-1 > timeout_ms) {
157 0           return timeout_ms;
158             }
159              
160 1           res = poll(&pfd, 1, timeout_ms);
161              
162 1 50         if (0 < res) {
163             /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or
164             * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall
165             * doesn't need to be made. */
166 0           return AMQP_STATUS_OK;
167 1 50         } else if (0 == res) {
168 1           return AMQP_STATUS_TIMEOUT;
169             } else {
170 0 0         switch (amqp_os_socket_error()) {
171 0           case EINTR:
172 0           goto start_poll;
173 0           default:
174 0           return AMQP_STATUS_SOCKET_ERROR;
175             }
176             }
177             return AMQP_STATUS_OK;
178             #elif defined(HAVE_SELECT)
179             fd_set fds;
180             fd_set exceptfds;
181             fd_set *exceptfdsp;
182             int res;
183             struct timeval tv;
184             struct timeval *tvp;
185              
186             assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT)));
187             #ifndef _WIN32
188             /* On Win32 connect() failure is indicated through the exceptfds, it does not
189             * make any sense to allow POLLERR on any other platform or condition */
190             assert(0 == (event & AMQP_SF_POLLERR));
191             #endif
192              
193             start_select:
194             FD_ZERO(&fds);
195             FD_SET(fd, &fds);
196              
197             if (event & AMQP_SF_POLLERR) {
198             FD_ZERO(&exceptfds);
199             FD_SET(fd, &exceptfds);
200             exceptfdsp = &exceptfds;
201             } else {
202             exceptfdsp = NULL;
203             }
204              
205             res = amqp_time_tv_until(deadline, &tv, &tvp);
206             if (res != AMQP_STATUS_OK) {
207             return res;
208             }
209              
210             if (event & AMQP_SF_POLLIN) {
211             res = select(fd + 1, &fds, NULL, exceptfdsp, tvp);
212             } else if (event & AMQP_SF_POLLOUT) {
213             res = select(fd + 1, NULL, &fds, exceptfdsp, tvp);
214             }
215              
216             if (0 < res) {
217             return AMQP_STATUS_OK;
218             } else if (0 == res) {
219             return AMQP_STATUS_TIMEOUT;
220             } else {
221             switch (amqp_os_socket_error()) {
222             case EINTR:
223             goto start_select;
224             default:
225             return AMQP_STATUS_SOCKET_ERROR;
226             }
227             }
228             #else
229             #error "poll() or select() is needed to compile rabbitmq-c"
230             #endif
231             }
232              
233 1           static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
234             amqp_time_t deadline) {
235 1           int fd = amqp_get_sockfd(state);
236 1 50         if (-1 == fd) {
237 1           return AMQP_STATUS_SOCKET_CLOSED;
238             }
239 0           switch (res) {
240 0           case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
241 0           res = amqp_poll(fd, AMQP_SF_POLLIN, deadline);
242 0           break;
243 0           case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
244 0           res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline);
245 0           break;
246             }
247 0           return res;
248             }
249              
250 1           ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
251             size_t len, amqp_time_t deadline, int flags) {
252             ssize_t res;
253 1           void *buf_left = (void *)buf;
254             /* Assume that len is not going to be larger than ssize_t can hold. */
255 1           ssize_t len_left = (size_t)len;
256              
257 0           start_send:
258 1           res = amqp_socket_send(state->socket, buf_left, len_left, flags);
259              
260 1 50         if (res > 0) {
261 0           len_left -= res;
262 0           buf_left = (char *)buf_left + res;
263 0 0         if (0 == len_left) {
264 0           return (ssize_t)len;
265             }
266 0           goto start_send;
267             }
268 1           res = do_poll(state, res, deadline);
269 1 50         if (AMQP_STATUS_OK == res) {
270 0           goto start_send;
271             }
272 1 50         if (AMQP_STATUS_TIMEOUT == res) {
273 0           return (ssize_t)len - len_left;
274             }
275 1           return res;
276             }
277              
278 0           int amqp_open_socket(char const *hostname, int portnumber) {
279 0           return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite());
280             }
281              
282 1           int amqp_open_socket_noblock(char const *hostname, int portnumber,
283             const struct timeval *timeout) {
284             amqp_time_t deadline;
285 1           int res = amqp_time_from_now(&deadline, timeout);
286 1 50         if (AMQP_STATUS_OK != res) {
287 0           return res;
288             }
289 1           return amqp_open_socket_inner(hostname, portnumber, deadline);
290             }
291              
292             #ifdef _WIN32
293             static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
294             int one = 1;
295             SOCKET sockfd;
296             int last_error;
297              
298             /*
299             * This cast is to squash warnings on Win64, see:
300             * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
301             */
302              
303             sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
304             if (INVALID_SOCKET == sockfd) {
305             return AMQP_STATUS_SOCKET_ERROR;
306             }
307              
308             /* Set the socket to be non-blocking */
309             if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) {
310             last_error = AMQP_STATUS_SOCKET_ERROR;
311             goto err;
312             }
313              
314             /* Disable nagle */
315             if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
316             (const char *)&one, sizeof(one))) {
317             last_error = AMQP_STATUS_SOCKET_ERROR;
318             goto err;
319             }
320              
321             /* Enable TCP keepalives */
322             if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE,
323             (const char *)&one, sizeof(one))) {
324             last_error = AMQP_STATUS_SOCKET_ERROR;
325             goto err;
326             }
327              
328             if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) {
329             return (int)sockfd;
330             }
331              
332             if (WSAEWOULDBLOCK != WSAGetLastError()) {
333             last_error = AMQP_STATUS_SOCKET_ERROR;
334             goto err;
335             }
336              
337             last_error =
338             amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline);
339             if (AMQP_STATUS_OK != last_error) {
340             goto err;
341             }
342              
343             {
344             int result;
345             int result_len = sizeof(result);
346              
347             if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR,
348             (char *)&result, &result_len) ||
349             result != 0) {
350             last_error = AMQP_STATUS_SOCKET_ERROR;
351             goto err;
352             }
353             }
354              
355             return (int)sockfd;
356              
357             err:
358             closesocket(sockfd);
359             return last_error;
360             }
361             #else
362 1           static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
363 1           int one = 1;
364             int sockfd;
365             int flags;
366             int last_error;
367              
368 1           sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
369 1 50         if (-1 == sockfd) {
370 0           return AMQP_STATUS_SOCKET_ERROR;
371             }
372              
373             /* Enable CLOEXEC on socket */
374 1           flags = fcntl(sockfd, F_GETFD);
375 1 50         if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
    50          
376 0           last_error = AMQP_STATUS_SOCKET_ERROR;
377 0           goto err;
378             }
379              
380             /* Set the socket as non-blocking */
381 1           flags = fcntl(sockfd, F_GETFL);
382 1 50         if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) {
    50          
383 0           last_error = AMQP_STATUS_SOCKET_ERROR;
384 0           goto err;
385             }
386              
387             #ifdef SO_NOSIGPIPE
388             /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */
389             if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
390             last_error = AMQP_STATUS_SOCKET_ERROR;
391             goto err;
392             }
393             #endif /* SO_NOSIGPIPE */
394              
395             /* Disable nagle */
396 1 50         if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
397 0           last_error = AMQP_STATUS_SOCKET_ERROR;
398 0           goto err;
399             }
400              
401             /* Enable TCP keepalives */
402 1 50         if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) {
403 0           last_error = AMQP_STATUS_SOCKET_ERROR;
404 0           goto err;
405             }
406              
407 1 50         if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
408 0           return sockfd;
409             }
410              
411 1 50         if (EINPROGRESS != errno) {
412 0           last_error = AMQP_STATUS_SOCKET_ERROR;
413 0           goto err;
414             }
415              
416 1           last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline);
417 1 50         if (AMQP_STATUS_OK != last_error) {
418 1           goto err;
419             }
420              
421             {
422             int result;
423 0           socklen_t result_len = sizeof(result);
424              
425 0 0         if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) ||
426 0 0         result != 0) {
427 0           last_error = AMQP_STATUS_SOCKET_ERROR;
428 0           goto err;
429             }
430             }
431              
432 0           return sockfd;
433              
434 1           err:
435 1           close(sockfd);
436 1           return last_error;
437             }
438             #endif
439              
440 1           int amqp_open_socket_inner(char const *hostname, int portnumber,
441             amqp_time_t deadline) {
442             struct addrinfo hint;
443             struct addrinfo *address_list;
444             struct addrinfo *addr;
445             char portnumber_string[33];
446 1           int sockfd = -1;
447             int last_error;
448              
449 1           last_error = amqp_os_socket_init();
450 1 50         if (AMQP_STATUS_OK != last_error) {
451 0           return last_error;
452             }
453              
454 1           memset(&hint, 0, sizeof(hint));
455 1           hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
456 1           hint.ai_socktype = SOCK_STREAM;
457 1           hint.ai_protocol = IPPROTO_TCP;
458              
459 1           (void)sprintf(portnumber_string, "%d", portnumber);
460              
461 1           last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
462 1 50         if (0 != last_error) {
463 0           return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED;
464             }
465              
466 1 50         for (addr = address_list; addr; addr = addr->ai_next) {
467 1           sockfd = connect_socket(addr, deadline);
468              
469 1 50         if (sockfd >= 0) {
470 0           last_error = AMQP_STATUS_OK;
471 0           break;
472 1 50         } else if (sockfd == AMQP_STATUS_TIMEOUT) {
473 1           last_error = sockfd;
474 1           break;
475             }
476             }
477              
478 1           freeaddrinfo(address_list);
479 1 50         if (last_error != AMQP_STATUS_OK || sockfd == -1) {
    0          
480 1           return last_error;
481             }
482 0           return sockfd;
483             }
484              
485 0           static int send_header_inner(amqp_connection_state_t state,
486             amqp_time_t deadline) {
487             ssize_t res;
488             static const uint8_t header[8] = {'A',
489             'M',
490             'Q',
491             'P',
492             0,
493             AMQP_PROTOCOL_VERSION_MAJOR,
494             AMQP_PROTOCOL_VERSION_MINOR,
495             AMQP_PROTOCOL_VERSION_REVISION};
496 0           res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
497 0 0         if (sizeof(header) == res) {
498 0           return AMQP_STATUS_OK;
499             }
500 0           return (int)res;
501             }
502              
503 0           int amqp_send_header(amqp_connection_state_t state) {
504 0           return send_header_inner(state, amqp_time_infinite());
505             }
506              
507 0           static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
508             amqp_bytes_t res;
509              
510 0           switch (method) {
511 0           case AMQP_SASL_METHOD_PLAIN:
512 0           res = amqp_cstring_bytes("PLAIN");
513 0           break;
514 0           case AMQP_SASL_METHOD_EXTERNAL:
515 0           res = amqp_cstring_bytes("EXTERNAL");
516 0           break;
517              
518 0           default:
519 0           amqp_abort("Invalid SASL method: %d", (int)method);
520             }
521              
522 0           return res;
523             }
524              
525 0           static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) {
526 0 0         if (l.len == r.len) {
527 0 0         if (l.bytes && r.bytes) {
    0          
528 0 0         if (0 == memcmp(l.bytes, r.bytes, l.len)) {
529 0           return 1;
530             }
531             }
532             }
533 0           return 0;
534             }
535              
536 0           int sasl_mechanism_in_list(amqp_bytes_t mechanisms,
537             amqp_sasl_method_enum method) {
538             amqp_bytes_t mechanism;
539             amqp_bytes_t supported_mechanism;
540             uint8_t *start;
541             uint8_t *end;
542             uint8_t *current;
543              
544 0 0         assert(NULL != mechanisms.bytes);
545              
546 0           mechanism = sasl_method_name(method);
547              
548 0           start = (uint8_t *)mechanisms.bytes;
549 0           current = start;
550 0           end = start + mechanisms.len;
551              
552 0 0         for (; current != end; start = current + 1) {
553             /* HACK: SASL states that we should be parsing this string as a UTF-8
554             * string, which we're plainly not doing here. At this point its not worth
555             * dragging an entire UTF-8 parser for this one case, and this should work
556             * most of the time */
557 0           current = memchr(start, ' ', end - start);
558 0 0         if (NULL == current) {
559 0           current = end;
560             }
561 0           supported_mechanism.bytes = start;
562 0           supported_mechanism.len = current - start;
563 0 0         if (bytes_equal(mechanism, supported_mechanism)) {
564 0           return 1;
565             }
566             }
567              
568 0           return 0;
569             }
570              
571 0           static amqp_bytes_t sasl_response(amqp_pool_t *pool,
572             amqp_sasl_method_enum method, va_list args) {
573             amqp_bytes_t response;
574              
575 0           switch (method) {
576 0           case AMQP_SASL_METHOD_PLAIN: {
577 0           char *username = va_arg(args, char *);
578 0           size_t username_len = strlen(username);
579 0           char *password = va_arg(args, char *);
580 0           size_t password_len = strlen(password);
581             char *response_buf;
582              
583 0           amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2,
584             &response);
585 0 0         if (response.bytes == NULL)
586             /* We never request a zero-length block, because of the +2
587             above, so a NULL here really is ENOMEM. */
588             {
589 0           return response;
590             }
591              
592 0           response_buf = response.bytes;
593 0           response_buf[0] = 0;
594 0           memcpy(response_buf + 1, username, username_len);
595 0           response_buf[username_len + 1] = 0;
596 0           memcpy(response_buf + username_len + 2, password, password_len);
597 0           break;
598             }
599 0           case AMQP_SASL_METHOD_EXTERNAL: {
600 0           char *identity = va_arg(args, char *);
601 0           size_t identity_len = strlen(identity);
602              
603 0           amqp_pool_alloc_bytes(pool, identity_len, &response);
604 0 0         if (response.bytes == NULL) {
605 0           return response;
606             }
607              
608 0           memcpy(response.bytes, identity, identity_len);
609 0           break;
610             }
611 0           default:
612 0           amqp_abort("Invalid SASL method: %d", (int)method);
613             }
614              
615 0           return response;
616             }
617              
618 0           amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
619 0           return (state->first_queued_frame != NULL);
620             }
621              
622             /*
623             * Check to see if we have data in our buffer. If this returns 1, we
624             * will avoid an immediate blocking read in amqp_simple_wait_frame.
625             */
626 1           amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
627 1           return (state->sock_inbound_offset < state->sock_inbound_limit);
628             }
629              
630 0           static int consume_one_frame(amqp_connection_state_t state,
631             amqp_frame_t *decoded_frame) {
632             int res;
633              
634             amqp_bytes_t buffer;
635 0           buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
636 0           buffer.bytes =
637 0           ((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
638              
639 0           res = amqp_handle_input(state, buffer, decoded_frame);
640 0 0         if (res < 0) {
641 0           return res;
642             }
643              
644 0           state->sock_inbound_offset += res;
645              
646 0           return AMQP_STATUS_OK;
647             }
648              
649 0           static int recv_with_timeout(amqp_connection_state_t state,
650             amqp_time_t timeout) {
651             ssize_t res;
652             int fd;
653              
654 0           start_recv:
655 0           res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
656             state->sock_inbound_buffer.len, 0);
657              
658 0 0         if (res < 0) {
659 0           fd = amqp_get_sockfd(state);
660 0 0         if (-1 == fd) {
661 0           return AMQP_STATUS_CONNECTION_CLOSED;
662             }
663 0           switch (res) {
664 0           default:
665 0           return (int)res;
666 0           case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
667 0           res = amqp_poll(fd, AMQP_SF_POLLIN, timeout);
668 0           break;
669 0           case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
670 0           res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout);
671 0           break;
672             }
673 0 0         if (AMQP_STATUS_OK == res) {
674 0           goto start_recv;
675             }
676 0           return (int)res;
677             }
678              
679 0           state->sock_inbound_limit = res;
680 0           state->sock_inbound_offset = 0;
681              
682 0           res = amqp_time_s_from_now(&state->next_recv_heartbeat,
683             amqp_heartbeat_recv(state));
684 0 0         if (AMQP_STATUS_OK != res) {
685 0           return (int)res;
686             }
687 0           return AMQP_STATUS_OK;
688             }
689              
690 0           int amqp_try_recv(amqp_connection_state_t state) {
691             amqp_time_t timeout;
692             int res;
693              
694 0 0         while (amqp_data_in_buffer(state)) {
695             amqp_frame_t frame;
696 0           res = consume_one_frame(state, &frame);
697              
698 0 0         if (AMQP_STATUS_OK != res) {
699 0           return res;
700             }
701              
702 0 0         if (frame.frame_type != 0) {
703             amqp_pool_t *channel_pool;
704             amqp_frame_t *frame_copy;
705             amqp_link_t *link;
706              
707 0           channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
708 0 0         if (NULL == channel_pool) {
709 0           return AMQP_STATUS_NO_MEMORY;
710             }
711              
712 0           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
713 0           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
714              
715 0 0         if (frame_copy == NULL || link == NULL) {
    0          
716 0           return AMQP_STATUS_NO_MEMORY;
717             }
718              
719 0           *frame_copy = frame;
720              
721 0           link->next = NULL;
722 0           link->data = frame_copy;
723              
724 0 0         if (state->last_queued_frame == NULL) {
725 0           state->first_queued_frame = link;
726             } else {
727 0           state->last_queued_frame->next = link;
728             }
729 0           state->last_queued_frame = link;
730             }
731             }
732 0           res = amqp_time_from_now(&timeout, &(struct timeval){0});
733 0 0         if (AMQP_STATUS_OK != res) {
734 0           return res;
735             }
736              
737 0           return recv_with_timeout(state, timeout);
738             }
739              
740 0           static int wait_frame_inner(amqp_connection_state_t state,
741             amqp_frame_t *decoded_frame,
742             amqp_time_t timeout_deadline) {
743             amqp_time_t deadline;
744             int res;
745              
746             for (;;) {
747 0 0         while (amqp_data_in_buffer(state)) {
748 0           res = consume_one_frame(state, decoded_frame);
749              
750 0 0         if (AMQP_STATUS_OK != res) {
751 0           return res;
752             }
753              
754 0 0         if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
755 0           amqp_maybe_release_buffers_on_channel(state, 0);
756 0           continue;
757             }
758              
759 0 0         if (decoded_frame->frame_type != 0) {
760             /* Complete frame was read. Return it. */
761 0           return AMQP_STATUS_OK;
762             }
763             }
764              
765 0           beginrecv:
766 0           res = amqp_time_has_past(state->next_send_heartbeat);
767 0 0         if (AMQP_STATUS_TIMER_FAILURE == res) {
768 0           return res;
769 0 0         } else if (AMQP_STATUS_TIMEOUT == res) {
770             amqp_frame_t heartbeat;
771 0           heartbeat.channel = 0;
772 0           heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
773              
774 0           res = amqp_send_frame(state, &heartbeat);
775 0 0         if (AMQP_STATUS_OK != res) {
776 0           return res;
777             }
778             }
779 0           deadline = amqp_time_first(timeout_deadline,
780             amqp_time_first(state->next_recv_heartbeat,
781             state->next_send_heartbeat));
782              
783             /* TODO this needs to wait for a _frame_ and not anything written from the
784             * socket */
785 0           res = recv_with_timeout(state, deadline);
786              
787 0 0         if (AMQP_STATUS_TIMEOUT == res) {
788 0 0         if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
789 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
790 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
791 0 0         } else if (amqp_time_equal(deadline, timeout_deadline)) {
792 0           return AMQP_STATUS_TIMEOUT;
793 0 0         } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
794             /* send heartbeat happens before we do recv_with_timeout */
795 0           goto beginrecv;
796             } else {
797 0           amqp_abort("Internal error: unable to determine timeout reason");
798             }
799 0 0         } else if (AMQP_STATUS_OK != res) {
800 0           return res;
801             }
802             }
803             }
804              
805 0           static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state,
806             amqp_frame_t *frame) {
807             amqp_link_t *link;
808             amqp_frame_t *frame_copy;
809              
810             amqp_pool_t *channel_pool =
811 0           amqp_get_or_create_channel_pool(state, frame->channel);
812              
813 0 0         if (NULL == channel_pool) {
814 0           return NULL;
815             }
816              
817 0           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
818 0           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
819              
820 0 0         if (NULL == link || NULL == frame_copy) {
    0          
821 0           return NULL;
822             }
823              
824 0           *frame_copy = *frame;
825 0           link->data = frame_copy;
826              
827 0           return link;
828             }
829              
830 0           int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
831 0           amqp_link_t *link = amqp_create_link_for_frame(state, frame);
832 0 0         if (NULL == link) {
833 0           return AMQP_STATUS_NO_MEMORY;
834             }
835              
836 0 0         if (NULL == state->first_queued_frame) {
837 0           state->first_queued_frame = link;
838             } else {
839 0           state->last_queued_frame->next = link;
840             }
841              
842 0           link->next = NULL;
843 0           state->last_queued_frame = link;
844              
845 0           return AMQP_STATUS_OK;
846             }
847              
848 0           int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
849 0           amqp_link_t *link = amqp_create_link_for_frame(state, frame);
850 0 0         if (NULL == link) {
851 0           return AMQP_STATUS_NO_MEMORY;
852             }
853              
854 0 0         if (NULL == state->first_queued_frame) {
855 0           state->first_queued_frame = link;
856 0           state->last_queued_frame = link;
857 0           link->next = NULL;
858             } else {
859 0           link->next = state->first_queued_frame;
860 0           state->first_queued_frame = link;
861             }
862              
863 0           return AMQP_STATUS_OK;
864             }
865              
866 0           int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
867             amqp_channel_t channel,
868             amqp_frame_t *decoded_frame) {
869             amqp_frame_t *frame_ptr;
870             amqp_link_t *cur;
871             int res;
872              
873 0 0         for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
874 0           frame_ptr = cur->data;
875              
876 0 0         if (channel == frame_ptr->channel) {
877 0           state->first_queued_frame = cur->next;
878 0 0         if (NULL == state->first_queued_frame) {
879 0           state->last_queued_frame = NULL;
880             }
881              
882 0           *decoded_frame = *frame_ptr;
883              
884 0           return AMQP_STATUS_OK;
885             }
886             }
887              
888             for (;;) {
889 0           res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
890              
891 0 0         if (AMQP_STATUS_OK != res) {
892 0           return res;
893             }
894              
895 0 0         if (channel == decoded_frame->channel) {
896 0           return AMQP_STATUS_OK;
897             } else {
898 0           res = amqp_queue_frame(state, decoded_frame);
899 0 0         if (res != AMQP_STATUS_OK) {
900 0           return res;
901             }
902             }
903             }
904             }
905              
906 0           int amqp_simple_wait_frame(amqp_connection_state_t state,
907             amqp_frame_t *decoded_frame) {
908 0           return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL);
909             }
910              
911 0           int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
912             amqp_frame_t *decoded_frame,
913             const struct timeval *timeout) {
914             amqp_time_t deadline;
915              
916 0           int res = amqp_time_from_now(&deadline, timeout);
917 0 0         if (AMQP_STATUS_OK != res) {
918 0           return res;
919             }
920              
921 0 0         if (state->first_queued_frame != NULL) {
922 0           amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data;
923 0           state->first_queued_frame = state->first_queued_frame->next;
924 0 0         if (state->first_queued_frame == NULL) {
925 0           state->last_queued_frame = NULL;
926             }
927 0           *decoded_frame = *f;
928 0           return AMQP_STATUS_OK;
929             } else {
930 0           return wait_frame_inner(state, decoded_frame, deadline);
931             }
932             }
933              
934 0           static int amqp_simple_wait_method_list(amqp_connection_state_t state,
935             amqp_channel_t expected_channel,
936             amqp_method_number_t *expected_methods,
937             amqp_time_t deadline,
938             amqp_method_t *output) {
939             amqp_frame_t frame;
940             struct timeval tv;
941             struct timeval *tvp;
942              
943 0           int res = amqp_time_tv_until(deadline, &tv, &tvp);
944 0 0         if (res != AMQP_STATUS_OK) {
945 0           return res;
946             }
947              
948 0           res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
949 0 0         if (AMQP_STATUS_OK != res) {
950 0           return res;
951             }
952              
953 0 0         if (AMQP_FRAME_METHOD != frame.frame_type ||
954 0           expected_channel != frame.channel ||
955 0           !amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) {
956 0           return AMQP_STATUS_WRONG_METHOD;
957             }
958 0           *output = frame.payload.method;
959 0           return AMQP_STATUS_OK;
960             }
961              
962 0           static int simple_wait_method_inner(amqp_connection_state_t state,
963             amqp_channel_t expected_channel,
964             amqp_method_number_t expected_method,
965             amqp_time_t deadline,
966             amqp_method_t *output) {
967             amqp_method_number_t expected_methods[2];
968 0           expected_methods[0] = expected_method;
969 0           expected_methods[1] = 0;
970 0           return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
971             deadline, output);
972             }
973              
974 0           int amqp_simple_wait_method(amqp_connection_state_t state,
975             amqp_channel_t expected_channel,
976             amqp_method_number_t expected_method,
977             amqp_method_t *output) {
978 0           return simple_wait_method_inner(state, expected_channel, expected_method,
979             amqp_time_infinite(), output);
980             }
981              
982 1           int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
983             amqp_method_number_t id, void *decoded) {
984 1           return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
985             amqp_time_infinite());
986             }
987              
988 1           int amqp_send_method_inner(amqp_connection_state_t state,
989             amqp_channel_t channel, amqp_method_number_t id,
990             void *decoded, int flags, amqp_time_t deadline) {
991             amqp_frame_t frame;
992              
993 1           frame.frame_type = AMQP_FRAME_METHOD;
994 1           frame.channel = channel;
995 1           frame.payload.method.id = id;
996 1           frame.payload.method.decoded = decoded;
997 1           return amqp_send_frame_inner(state, &frame, flags, deadline);
998             }
999              
1000 0           static int amqp_id_in_reply_list(amqp_method_number_t expected,
1001             amqp_method_number_t *list) {
1002 0 0         while (*list != 0) {
1003 0 0         if (*list == expected) {
1004 0           return 1;
1005             }
1006 0           list++;
1007             }
1008 0           return 0;
1009             }
1010              
1011 1           static amqp_rpc_reply_t simple_rpc_inner(
1012             amqp_connection_state_t state, amqp_channel_t channel,
1013             amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
1014             void *decoded_request_method, amqp_time_t deadline) {
1015             int status;
1016             amqp_rpc_reply_t result;
1017              
1018 1           memset(&result, 0, sizeof(result));
1019              
1020 1           status = amqp_send_method(state, channel, request_id, decoded_request_method);
1021 1 50         if (status < 0) {
1022 1           return amqp_rpc_reply_error(status);
1023             }
1024              
1025             {
1026             amqp_frame_t frame;
1027              
1028 0           retry:
1029 0           status = wait_frame_inner(state, &frame, deadline);
1030 0 0         if (status != AMQP_STATUS_OK) {
1031 0 0         if (status == AMQP_STATUS_TIMEOUT) {
1032 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
1033             }
1034 0           return amqp_rpc_reply_error(status);
1035             }
1036              
1037             /*
1038             * We store the frame for later processing unless it's something
1039             * that directly affects us here, namely a method frame that is
1040             * either
1041             * - on the channel we want, and of the expected type, or
1042             * - on the channel we want, and a channel.close frame, or
1043             * - on channel zero, and a connection.close frame.
1044             */
1045 0 0         if (!((frame.frame_type == AMQP_FRAME_METHOD) &&
1046 0           (((frame.channel == channel) &&
1047 0           (amqp_id_in_reply_list(frame.payload.method.id,
1048 0           expected_reply_ids) ||
1049 0 0         (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) ||
1050 0 0         ((frame.channel == 0) &&
1051 0 0         (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) {
1052             amqp_pool_t *channel_pool;
1053             amqp_frame_t *frame_copy;
1054             amqp_link_t *link;
1055              
1056 0           channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
1057 0 0         if (NULL == channel_pool) {
1058 0           return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1059             }
1060              
1061 0           frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
1062 0           link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
1063              
1064 0 0         if (frame_copy == NULL || link == NULL) {
    0          
1065 0           return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1066             }
1067              
1068 0           *frame_copy = frame;
1069              
1070 0           link->next = NULL;
1071 0           link->data = frame_copy;
1072              
1073 0 0         if (state->last_queued_frame == NULL) {
1074 0           state->first_queued_frame = link;
1075             } else {
1076 0           state->last_queued_frame->next = link;
1077             }
1078 0           state->last_queued_frame = link;
1079              
1080 0           goto retry;
1081             }
1082              
1083 0           result.reply_type =
1084 0           (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
1085             ? AMQP_RESPONSE_NORMAL
1086 0 0         : AMQP_RESPONSE_SERVER_EXCEPTION;
1087              
1088 0           result.reply = frame.payload.method;
1089 0           return result;
1090             }
1091             }
1092              
1093 1           amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
1094             amqp_channel_t channel,
1095             amqp_method_number_t request_id,
1096             amqp_method_number_t *expected_reply_ids,
1097             void *decoded_request_method) {
1098             amqp_time_t deadline;
1099             int res;
1100              
1101 1           res = amqp_time_from_now(&deadline, state->rpc_timeout);
1102 1 50         if (res != AMQP_STATUS_OK) {
1103 0           return amqp_rpc_reply_error(res);
1104             }
1105              
1106 1           return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
1107             decoded_request_method, deadline);
1108             }
1109              
1110 0           void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
1111             amqp_channel_t channel,
1112             amqp_method_number_t request_id,
1113             amqp_method_number_t reply_id,
1114             void *decoded_request_method) {
1115             amqp_time_t deadline;
1116             int res;
1117             amqp_method_number_t replies[2];
1118              
1119 0           res = amqp_time_from_now(&deadline, state->rpc_timeout);
1120 0 0         if (res != AMQP_STATUS_OK) {
1121 0           state->most_recent_api_result = amqp_rpc_reply_error(res);
1122 0           return NULL;
1123             }
1124              
1125 0           replies[0] = reply_id;
1126 0           replies[1] = 0;
1127              
1128 0           state->most_recent_api_result = simple_rpc_inner(
1129             state, channel, request_id, replies, decoded_request_method, deadline);
1130              
1131 0 0         if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
1132 0           return state->most_recent_api_result.reply.decoded;
1133             } else {
1134 0           return NULL;
1135             }
1136             }
1137              
1138 0           amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
1139 0           return state->most_recent_api_result;
1140             }
1141              
1142             /*
1143             * Merge base and add tables. If the two tables contain an entry with the same
1144             * key, the entry from the add table takes precedence. For entries that are both
1145             * tables with the same key, the table is recursively merged.
1146             */
1147 0           int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
1148             amqp_table_t *result, amqp_pool_t *pool) {
1149             int i;
1150             int res;
1151             amqp_pool_t temp_pool;
1152             amqp_table_t temp_result;
1153 0 0         assert(base != NULL);
1154 0 0         assert(result != NULL);
1155 0 0         assert(pool != NULL);
1156              
1157 0 0         if (NULL == add) {
1158 0           return amqp_table_clone(base, result, pool);
1159             }
1160              
1161 0           init_amqp_pool(&temp_pool, 4096);
1162 0           temp_result.num_entries = 0;
1163 0           temp_result.entries =
1164 0           amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) *
1165 0           (base->num_entries + add->num_entries));
1166 0 0         if (NULL == temp_result.entries) {
1167 0           res = AMQP_STATUS_NO_MEMORY;
1168 0           goto error_out;
1169             }
1170 0 0         for (i = 0; i < base->num_entries; ++i) {
1171 0           temp_result.entries[temp_result.num_entries] = base->entries[i];
1172 0           temp_result.num_entries++;
1173             }
1174 0 0         for (i = 0; i < add->num_entries; ++i) {
1175             amqp_table_entry_t *e =
1176 0           amqp_table_get_entry_by_key(&temp_result, add->entries[i].key);
1177 0 0         if (NULL != e) {
1178 0 0         if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind &&
1179 0 0         AMQP_FIELD_KIND_TABLE == e->value.kind) {
1180             amqp_table_entry_t *be =
1181 0           amqp_table_get_entry_by_key(base, add->entries[i].key);
1182              
1183 0           res = amqp_merge_capabilities(&be->value.value.table,
1184 0           &add->entries[i].value.value.table,
1185             &e->value.value.table, &temp_pool);
1186 0 0         if (AMQP_STATUS_OK != res) {
1187 0           goto error_out;
1188             }
1189             } else {
1190 0           e->value = add->entries[i].value;
1191             }
1192             } else {
1193 0           temp_result.entries[temp_result.num_entries] = add->entries[i];
1194 0           temp_result.num_entries++;
1195             }
1196             }
1197 0           res = amqp_table_clone(&temp_result, result, pool);
1198 0           error_out:
1199 0           empty_amqp_pool(&temp_pool);
1200 0           return res;
1201             }
1202              
1203 0           static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
1204             char const *vhost, int channel_max,
1205             int frame_max, int heartbeat,
1206             const amqp_table_t *client_properties,
1207             const struct timeval *timeout,
1208             amqp_sasl_method_enum sasl_method,
1209             va_list vl) {
1210             int res;
1211             amqp_method_t method;
1212              
1213             uint16_t client_channel_max;
1214             uint32_t client_frame_max;
1215             uint16_t client_heartbeat;
1216              
1217             uint16_t server_channel_max;
1218             uint32_t server_frame_max;
1219             uint16_t server_heartbeat;
1220              
1221             amqp_rpc_reply_t result;
1222             amqp_time_t deadline;
1223              
1224 0 0         if (channel_max < 0 || channel_max > UINT16_MAX) {
    0          
1225 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1226             }
1227 0           client_channel_max = (uint16_t)channel_max;
1228              
1229 0 0         if (frame_max < 0) {
1230 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1231             }
1232 0           client_frame_max = (uint32_t)frame_max;
1233              
1234 0 0         if (heartbeat < 0 || heartbeat > UINT16_MAX) {
    0          
1235 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1236             }
1237 0           client_heartbeat = (uint16_t)heartbeat;
1238              
1239 0           res = amqp_time_from_now(&deadline, timeout);
1240 0 0         if (AMQP_STATUS_OK != res) {
1241 0           goto error_res;
1242             }
1243              
1244 0           res = send_header_inner(state, deadline);
1245 0 0         if (AMQP_STATUS_OK != res) {
1246 0           goto error_res;
1247             }
1248              
1249 0           res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
1250             deadline, &method);
1251 0 0         if (AMQP_STATUS_OK != res) {
1252 0           goto error_res;
1253             }
1254              
1255             {
1256 0           amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded;
1257 0 0         if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
1258 0 0         (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
1259 0           res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION;
1260 0           goto error_res;
1261             }
1262              
1263 0           res = amqp_table_clone(&s->server_properties, &state->server_properties,
1264             &state->properties_pool);
1265              
1266 0 0         if (AMQP_STATUS_OK != res) {
1267 0           goto error_res;
1268             }
1269              
1270             /* TODO: check that our chosen SASL mechanism is in the list of
1271             acceptable mechanisms. Or even let the application choose from
1272             the list! */
1273 0 0         if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) {
1274 0           res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD;
1275 0           goto error_res;
1276             }
1277             }
1278              
1279             {
1280             amqp_table_entry_t default_properties[6];
1281             amqp_table_t default_table;
1282             amqp_table_entry_t client_capabilities[2];
1283             amqp_table_t client_capabilities_table;
1284             amqp_connection_start_ok_t s;
1285             amqp_pool_t *channel_pool;
1286             amqp_bytes_t response_bytes;
1287              
1288 0           channel_pool = amqp_get_or_create_channel_pool(state, 0);
1289 0 0         if (NULL == channel_pool) {
1290 0           res = AMQP_STATUS_NO_MEMORY;
1291 0           goto error_res;
1292             }
1293              
1294 0           response_bytes = sasl_response(channel_pool, sasl_method, vl);
1295 0 0         if (response_bytes.bytes == NULL) {
1296 0           res = AMQP_STATUS_NO_MEMORY;
1297 0           goto error_res;
1298             }
1299              
1300             client_capabilities[0] =
1301 0           amqp_table_construct_bool_entry("authentication_failure_close", 1);
1302             client_capabilities[1] =
1303 0           amqp_table_construct_bool_entry("exchange_exchange_bindings", 1);
1304              
1305 0           client_capabilities_table.entries = client_capabilities;
1306 0           client_capabilities_table.num_entries =
1307             sizeof(client_capabilities) / sizeof(amqp_table_entry_t);
1308              
1309             default_properties[0] =
1310 0           amqp_table_construct_utf8_entry("product", "rabbitmq-c");
1311             default_properties[1] =
1312 0           amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING);
1313             default_properties[2] =
1314 0           amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM);
1315             default_properties[3] =
1316 0           amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT);
1317 0           default_properties[4] = amqp_table_construct_utf8_entry(
1318             "information", "See https://github.com/alanxz/rabbitmq-c");
1319 0           default_properties[5] = amqp_table_construct_table_entry(
1320             "capabilities", &client_capabilities_table);
1321              
1322 0           default_table.entries = default_properties;
1323 0           default_table.num_entries =
1324             sizeof(default_properties) / sizeof(amqp_table_entry_t);
1325              
1326 0           res = amqp_merge_capabilities(&default_table, client_properties,
1327             &state->client_properties, channel_pool);
1328 0 0         if (AMQP_STATUS_OK != res) {
1329 0           goto error_res;
1330             }
1331              
1332 0           s.client_properties = state->client_properties;
1333 0           s.mechanism = sasl_method_name(sasl_method);
1334 0           s.response = response_bytes;
1335 0           s.locale = amqp_cstring_bytes("en_US");
1336              
1337 0           res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
1338             AMQP_SF_NONE, deadline);
1339 0 0         if (res < 0) {
1340 0           goto error_res;
1341             }
1342             }
1343              
1344 0           amqp_release_buffers(state);
1345              
1346             {
1347 0           amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD,
1348             AMQP_CONNECTION_CLOSE_METHOD, 0};
1349              
1350 0           res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
1351 0 0         if (AMQP_STATUS_OK != res) {
1352 0           goto error_res;
1353             }
1354             }
1355              
1356 0 0         if (AMQP_CONNECTION_CLOSE_METHOD == method.id) {
1357 0           result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
1358 0           result.reply = method;
1359 0           result.library_error = 0;
1360 0           goto out;
1361             }
1362              
1363             {
1364 0           amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded;
1365 0           server_channel_max = s->channel_max;
1366 0           server_frame_max = s->frame_max;
1367 0           server_heartbeat = s->heartbeat;
1368             }
1369              
1370 0 0         if (server_channel_max != 0 &&
    0          
1371 0 0         (server_channel_max < client_channel_max || client_channel_max == 0)) {
1372 0           client_channel_max = server_channel_max;
1373 0 0         } else if (server_channel_max == 0 && client_channel_max == 0) {
    0          
1374 0           client_channel_max = UINT16_MAX;
1375             }
1376              
1377 0 0         if (server_frame_max != 0 && server_frame_max < client_frame_max) {
    0          
1378 0           client_frame_max = server_frame_max;
1379             }
1380              
1381 0 0         if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) {
    0          
1382 0           client_heartbeat = server_heartbeat;
1383             }
1384              
1385 0           res = amqp_tune_connection(state, client_channel_max, client_frame_max,
1386             client_heartbeat);
1387 0 0         if (res < 0) {
1388 0           goto error_res;
1389             }
1390              
1391             {
1392             amqp_connection_tune_ok_t s;
1393 0           s.frame_max = client_frame_max;
1394 0           s.channel_max = client_channel_max;
1395 0           s.heartbeat = client_heartbeat;
1396              
1397 0           res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
1398             AMQP_SF_NONE, deadline);
1399 0 0         if (res < 0) {
1400 0           goto error_res;
1401             }
1402             }
1403              
1404 0           amqp_release_buffers(state);
1405              
1406             {
1407 0           amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0};
1408             amqp_connection_open_t s;
1409 0           s.virtual_host = amqp_cstring_bytes(vhost);
1410 0           s.capabilities = amqp_empty_bytes;
1411 0           s.insist = 1;
1412              
1413 0           result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
1414             &s, deadline);
1415 0 0         if (result.reply_type != AMQP_RESPONSE_NORMAL) {
1416 0           goto out;
1417             }
1418             }
1419              
1420 0           result.reply_type = AMQP_RESPONSE_NORMAL;
1421 0           result.reply.id = 0;
1422 0           result.reply.decoded = NULL;
1423 0           result.library_error = 0;
1424 0           amqp_maybe_release_buffers(state);
1425              
1426 0           out:
1427 0           return result;
1428              
1429 0           error_res:
1430 0           amqp_socket_close(state->socket, AMQP_SC_FORCE);
1431 0           result = amqp_rpc_reply_error(res);
1432              
1433 0           goto out;
1434             }
1435              
1436 0           amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,
1437             int channel_max, int frame_max, int heartbeat,
1438             amqp_sasl_method_enum sasl_method, ...) {
1439              
1440             va_list vl;
1441             amqp_rpc_reply_t ret;
1442              
1443 0           va_start(vl, sasl_method);
1444              
1445 0           ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1446 0           &amqp_empty_table, state->handshake_timeout,
1447             sasl_method, vl);
1448              
1449 0           va_end(vl);
1450              
1451 0           return ret;
1452             }
1453              
1454 0           amqp_rpc_reply_t amqp_login_with_properties(
1455             amqp_connection_state_t state, char const *vhost, int channel_max,
1456             int frame_max, int heartbeat, const amqp_table_t *client_properties,
1457             amqp_sasl_method_enum sasl_method, ...) {
1458             va_list vl;
1459             amqp_rpc_reply_t ret;
1460              
1461 0           va_start(vl, sasl_method);
1462              
1463 0           ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1464 0           client_properties, state->handshake_timeout,
1465             sasl_method, vl);
1466              
1467 0           va_end(vl);
1468              
1469 0           return ret;
1470             }