File Coverage

amqp_connection.c
Criterion Covered Total %
statement 80 283 28.2
branch 16 86 18.6
condition n/a
subroutine n/a
pod n/a
total 96 369 26.0


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #ifdef HAVE_CONFIG_H
5             #include "config.h"
6             #endif
7              
8             #ifdef _MSC_VER
9             #define _CRT_SECURE_NO_WARNINGS
10             #endif
11              
12             #include "amqp_private.h"
13             #include "amqp_time.h"
14             #include "rabbitmq-c/tcp_socket.h"
15             #include
16             #include
17             #include
18             #include
19             #include
20              
21             #ifndef AMQP_INITIAL_FRAME_POOL_PAGE_SIZE
22             #define AMQP_INITIAL_FRAME_POOL_PAGE_SIZE 65536
23             #endif
24              
25             #ifndef AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE
26             #define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
27             #endif
28              
29             #ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC
30             #define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12
31             #endif
32              
33             #define ENFORCE_STATE(statevec, statenum) \
34             { \
35             amqp_connection_state_t _check_state = (statevec); \
36             amqp_connection_state_enum _wanted_state = (statenum); \
37             if (_check_state->state != _wanted_state) \
38             amqp_abort( \
39             "Programming error: invalid AMQP connection state: expected %d, " \
40             "got %d", \
41             _wanted_state, _check_state->state); \
42             }
43              
44 33           amqp_connection_state_t amqp_new_connection(void) {
45             int res;
46 33           amqp_connection_state_t state = (amqp_connection_state_t)calloc(
47             1, sizeof(struct amqp_connection_state_t_));
48              
49 33 50         if (state == NULL) {
50 0           return NULL;
51             }
52              
53 33           res = amqp_tune_connection(state, 0, AMQP_INITIAL_FRAME_POOL_PAGE_SIZE, 0);
54 33 50         if (0 != res) {
55 0           goto out_nomem;
56             }
57              
58 33           state->inbound_buffer.bytes = state->header_buffer;
59 33           state->inbound_buffer.len = sizeof(state->header_buffer);
60              
61 33           state->state = CONNECTION_STATE_INITIAL;
62             /* the server protocol version response is 8 bytes, which conveniently
63             is also the minimum frame size */
64 33           state->target_size = 8;
65              
66 33           state->sock_inbound_buffer.len = AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE;
67 33           state->sock_inbound_buffer.bytes =
68 33           malloc(AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE);
69 33 50         if (state->sock_inbound_buffer.bytes == NULL) {
70 0           goto out_nomem;
71             }
72              
73 33           init_amqp_pool(&state->properties_pool, 512);
74              
75             /* Use address of the internal_handshake_timeout object by default. */
76 33           state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC;
77 33           state->internal_handshake_timeout.tv_usec = 0;
78 33           state->handshake_timeout = &state->internal_handshake_timeout;
79              
80 33           return state;
81              
82 0           out_nomem:
83 0           free(state->sock_inbound_buffer.bytes);
84 0           free(state);
85 0           return NULL;
86             }
87              
88 1           int amqp_get_sockfd(amqp_connection_state_t state) {
89 1 50         return state->socket ? amqp_socket_get_sockfd(state->socket) : -1;
90             }
91              
92 0           void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) {
93 0           amqp_socket_t *socket = amqp_tcp_socket_new(state);
94 0 0         if (!socket) {
95 0           amqp_abort("%s", strerror(errno));
96             }
97 0           amqp_tcp_socket_set_sockfd(socket, sockfd);
98 0           }
99              
100 1           void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) {
101 1           amqp_socket_delete(state->socket);
102 1           state->socket = socket;
103 1           }
104              
105 34           amqp_socket_t *amqp_get_socket(amqp_connection_state_t state) {
106 34           return state->socket;
107             }
108              
109 33           int amqp_tune_connection(amqp_connection_state_t state, int channel_max,
110             int frame_max, int heartbeat) {
111             void *newbuf;
112             int res;
113              
114 33 50         ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
115              
116 33           state->channel_max = channel_max;
117 33           state->frame_max = frame_max;
118              
119 33           state->heartbeat = heartbeat;
120 33 50         if (0 > state->heartbeat) {
121 0           state->heartbeat = 0;
122             }
123              
124 33           res = amqp_time_s_from_now(&state->next_send_heartbeat,
125             amqp_heartbeat_send(state));
126 33 50         if (AMQP_STATUS_OK != res) {
127 0           return res;
128             }
129 33           res = amqp_time_s_from_now(&state->next_recv_heartbeat,
130             amqp_heartbeat_recv(state));
131 33 50         if (AMQP_STATUS_OK != res) {
132 0           return res;
133             }
134              
135 33           state->outbound_buffer.len = frame_max;
136 33           newbuf = realloc(state->outbound_buffer.bytes, frame_max);
137 33 50         if (newbuf == NULL) {
138 0           return AMQP_STATUS_NO_MEMORY;
139             }
140 33           state->outbound_buffer.bytes = newbuf;
141              
142 33           return AMQP_STATUS_OK;
143             }
144              
145 0           int amqp_get_channel_max(amqp_connection_state_t state) {
146 0           return state->channel_max;
147             }
148              
149 0           int amqp_get_frame_max(amqp_connection_state_t state) {
150 0           return state->frame_max;
151             }
152              
153 0           int amqp_get_heartbeat(amqp_connection_state_t state) {
154 0           return state->heartbeat;
155             }
156              
157 33           int amqp_destroy_connection(amqp_connection_state_t state) {
158 33           int status = AMQP_STATUS_OK;
159 33 50         if (state) {
160             int i;
161 561 100         for (i = 0; i < POOL_TABLE_SIZE; ++i) {
162 528           amqp_pool_table_entry_t *entry = state->pool_table[i];
163 528 50         while (NULL != entry) {
164 0           amqp_pool_table_entry_t *todelete = entry;
165 0           empty_amqp_pool(&entry->pool);
166 0           entry = entry->next;
167 0           free(todelete);
168             }
169             }
170              
171 33           free(state->outbound_buffer.bytes);
172 33           free(state->sock_inbound_buffer.bytes);
173 33           amqp_socket_delete(state->socket);
174 33           empty_amqp_pool(&state->properties_pool);
175 33           free(state);
176             }
177 33           return status;
178             }
179              
180 0           static void return_to_idle(amqp_connection_state_t state) {
181 0           state->inbound_buffer.len = sizeof(state->header_buffer);
182 0           state->inbound_buffer.bytes = state->header_buffer;
183 0           state->inbound_offset = 0;
184 0           state->target_size = HEADER_SIZE;
185 0           state->state = CONNECTION_STATE_IDLE;
186 0           }
187              
188 0           static size_t consume_data(amqp_connection_state_t state,
189             amqp_bytes_t *received_data) {
190             /* how much data is available and will fit? */
191 0           size_t bytes_consumed = state->target_size - state->inbound_offset;
192 0 0         if (received_data->len < bytes_consumed) {
193 0           bytes_consumed = received_data->len;
194             }
195              
196 0           memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
197 0           received_data->bytes, bytes_consumed);
198 0           state->inbound_offset += bytes_consumed;
199 0           received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
200 0           received_data->len -= bytes_consumed;
201              
202 0           return bytes_consumed;
203             }
204              
205 0           int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data,
206             amqp_frame_t *decoded_frame) {
207             size_t bytes_consumed;
208             void *raw_frame;
209              
210             /* Returning frame_type of zero indicates either insufficient input,
211             or a complete, ignored frame was read. */
212 0           decoded_frame->frame_type = 0;
213              
214 0 0         if (received_data.len == 0) {
215 0           return AMQP_STATUS_OK;
216             }
217              
218 0 0         if (state->state == CONNECTION_STATE_IDLE) {
219 0           state->state = CONNECTION_STATE_HEADER;
220             }
221              
222 0           bytes_consumed = consume_data(state, &received_data);
223              
224             /* do we have target_size data yet? if not, return with the
225             expectation that more will arrive */
226 0 0         if (state->inbound_offset < state->target_size) {
227 0           return (int)bytes_consumed;
228             }
229              
230 0           raw_frame = state->inbound_buffer.bytes;
231              
232 0           switch (state->state) {
233 0           case CONNECTION_STATE_INITIAL:
234             /* check for a protocol header from the server */
235 0 0         if (memcmp(raw_frame, "AMQP", 4) == 0) {
236 0           decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
237 0           decoded_frame->channel = 0;
238              
239 0           decoded_frame->payload.protocol_header.transport_high =
240 0           amqp_d8(amqp_offset(raw_frame, 4));
241 0           decoded_frame->payload.protocol_header.transport_low =
242 0           amqp_d8(amqp_offset(raw_frame, 5));
243 0           decoded_frame->payload.protocol_header.protocol_version_major =
244 0           amqp_d8(amqp_offset(raw_frame, 6));
245 0           decoded_frame->payload.protocol_header.protocol_version_minor =
246 0           amqp_d8(amqp_offset(raw_frame, 7));
247              
248 0           return_to_idle(state);
249 0           return (int)bytes_consumed;
250             }
251              
252             /* it's not a protocol header; fall through to process it as a
253             regular frame header */
254              
255             case CONNECTION_STATE_HEADER: {
256             amqp_channel_t channel;
257             amqp_pool_t *channel_pool;
258             uint32_t frame_size;
259              
260 0           channel = amqp_d16(amqp_offset(raw_frame, 1));
261              
262             /* frame length is 3 bytes in */
263 0           frame_size = amqp_d32(amqp_offset(raw_frame, 3));
264             /* To prevent the target_size calculation below from overflowing, check
265             * that the stated frame_size is smaller than a signed 32-bit. Given
266             * the library only allows configuring frame_max as an int32_t, and
267             * frame_size is uint32_t, the math below is safe from overflow. */
268 0 0         if (frame_size >= INT32_MAX) {
269 0           return AMQP_STATUS_BAD_AMQP_DATA;
270             }
271              
272 0           frame_size = frame_size + HEADER_SIZE + FOOTER_SIZE;
273 0 0         if ((size_t)state->frame_max < frame_size) {
274 0           return AMQP_STATUS_BAD_AMQP_DATA;
275             }
276              
277 0           channel_pool = amqp_get_or_create_channel_pool(state, channel);
278 0 0         if (NULL == channel_pool) {
279 0           return AMQP_STATUS_NO_MEMORY;
280             }
281              
282 0           amqp_pool_alloc_bytes(channel_pool, frame_size, &state->inbound_buffer);
283 0 0         if (NULL == state->inbound_buffer.bytes) {
284 0           return AMQP_STATUS_NO_MEMORY;
285             }
286 0           memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE);
287 0           raw_frame = state->inbound_buffer.bytes;
288              
289 0           state->state = CONNECTION_STATE_BODY;
290 0           state->target_size = frame_size;
291 0           bytes_consumed += consume_data(state, &received_data);
292              
293             /* do we have target_size data yet? if not, return with the
294             expectation that more will arrive */
295 0 0         if (state->inbound_offset < state->target_size) {
296 0           return (int)bytes_consumed;
297             }
298             }
299             /* fall through to process body */
300              
301             case CONNECTION_STATE_BODY: {
302             amqp_bytes_t encoded;
303             int res;
304             amqp_pool_t *channel_pool;
305              
306             /* Check frame end marker (footer) */
307 0 0         if (amqp_d8(amqp_offset(raw_frame, state->target_size - 1)) !=
308             AMQP_FRAME_END) {
309 0           return AMQP_STATUS_BAD_AMQP_DATA;
310             }
311              
312 0           decoded_frame->frame_type = amqp_d8(amqp_offset(raw_frame, 0));
313 0           decoded_frame->channel = amqp_d16(amqp_offset(raw_frame, 1));
314              
315             channel_pool =
316 0           amqp_get_or_create_channel_pool(state, decoded_frame->channel);
317 0 0         if (NULL == channel_pool) {
318 0           return AMQP_STATUS_NO_MEMORY;
319             }
320              
321 0           switch (decoded_frame->frame_type) {
322 0           case AMQP_FRAME_METHOD:
323 0           decoded_frame->payload.method.id =
324 0           amqp_d32(amqp_offset(raw_frame, HEADER_SIZE));
325 0           encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
326 0           encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
327              
328 0           res = amqp_decode_method(decoded_frame->payload.method.id,
329             channel_pool, encoded,
330             &decoded_frame->payload.method.decoded);
331 0 0         if (res < 0) {
332 0           return res;
333             }
334              
335 0           break;
336              
337 0           case AMQP_FRAME_HEADER:
338 0           decoded_frame->payload.properties.class_id =
339 0           amqp_d16(amqp_offset(raw_frame, HEADER_SIZE));
340             /* unused 2-byte weight field goes here */
341 0           decoded_frame->payload.properties.body_size =
342 0           amqp_d64(amqp_offset(raw_frame, HEADER_SIZE + 4));
343 0           encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
344 0           encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
345 0           decoded_frame->payload.properties.raw = encoded;
346              
347 0           res = amqp_decode_properties(
348 0           decoded_frame->payload.properties.class_id, channel_pool, encoded,
349             &decoded_frame->payload.properties.decoded);
350 0 0         if (res < 0) {
351 0           return res;
352             }
353              
354 0           break;
355              
356 0           case AMQP_FRAME_BODY:
357 0           decoded_frame->payload.body_fragment.len =
358 0           state->target_size - HEADER_SIZE - FOOTER_SIZE;
359 0           decoded_frame->payload.body_fragment.bytes =
360 0           amqp_offset(raw_frame, HEADER_SIZE);
361 0           break;
362              
363 0           case AMQP_FRAME_HEARTBEAT:
364 0           break;
365              
366 0           default:
367             /* Ignore the frame */
368 0           decoded_frame->frame_type = 0;
369 0           break;
370             }
371              
372 0           return_to_idle(state);
373 0           return (int)bytes_consumed;
374             }
375              
376 0           default:
377 0           amqp_abort("Internal error: invalid amqp_connection_state_t->state %d",
378 0           state->state);
379             }
380             }
381              
382 0           amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
383 0           return (state->state == CONNECTION_STATE_IDLE);
384             }
385              
386 0           void amqp_release_buffers(amqp_connection_state_t state) {
387             int i;
388 0 0         ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
389              
390 0 0         for (i = 0; i < POOL_TABLE_SIZE; ++i) {
391 0           amqp_pool_table_entry_t *entry = state->pool_table[i];
392              
393 0 0         for (; NULL != entry; entry = entry->next) {
394 0           amqp_maybe_release_buffers_on_channel(state, entry->channel);
395             }
396             }
397 0           }
398              
399 0           void amqp_maybe_release_buffers(amqp_connection_state_t state) {
400 0 0         if (amqp_release_buffers_ok(state)) {
401 0           amqp_release_buffers(state);
402             }
403 0           }
404              
405 0           void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state,
406             amqp_channel_t channel) {
407             amqp_link_t *queued_link;
408             amqp_pool_t *pool;
409 0 0         if (CONNECTION_STATE_IDLE != state->state) {
410 0           return;
411             }
412              
413 0           queued_link = state->first_queued_frame;
414              
415 0 0         while (NULL != queued_link) {
416 0           amqp_frame_t *frame = queued_link->data;
417 0 0         if (channel == frame->channel) {
418 0           return;
419             }
420              
421 0           queued_link = queued_link->next;
422             }
423              
424 0           pool = amqp_get_channel_pool(state, channel);
425              
426 0 0         if (pool != NULL) {
427 0           recycle_amqp_pool(pool);
428             }
429             }
430              
431 1           static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
432             amqp_bytes_t *encoded) {
433 1           void *out_frame = buffer.bytes;
434             size_t out_frame_len;
435             int res;
436              
437 1           amqp_e8(frame->frame_type, amqp_offset(out_frame, 0));
438 1           amqp_e16(frame->channel, amqp_offset(out_frame, 1));
439              
440 1           switch (frame->frame_type) {
441 0           case AMQP_FRAME_BODY: {
442 0           const amqp_bytes_t *body = &frame->payload.body_fragment;
443              
444 0           memcpy(amqp_offset(out_frame, HEADER_SIZE), body->bytes, body->len);
445              
446 0           out_frame_len = body->len;
447 0           break;
448             }
449 1           case AMQP_FRAME_METHOD: {
450             amqp_bytes_t method_encoded;
451              
452 1           amqp_e32(frame->payload.method.id, amqp_offset(out_frame, HEADER_SIZE));
453              
454 1           method_encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4);
455 1           method_encoded.len = buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE;
456              
457 1           res = amqp_encode_method(frame->payload.method.id,
458 1           frame->payload.method.decoded, method_encoded);
459 1 50         if (res < 0) {
460 0           return res;
461             }
462              
463 1           out_frame_len = res + 4;
464 1           break;
465             }
466              
467 0           case AMQP_FRAME_HEADER: {
468             amqp_bytes_t properties_encoded;
469              
470 0           amqp_e16(frame->payload.properties.class_id,
471             amqp_offset(out_frame, HEADER_SIZE));
472 0           amqp_e16(0, amqp_offset(out_frame, HEADER_SIZE + 2)); /* "weight" */
473 0           amqp_e64(frame->payload.properties.body_size,
474             amqp_offset(out_frame, HEADER_SIZE + 4));
475              
476 0           properties_encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12);
477 0           properties_encoded.len = buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE;
478              
479 0           res = amqp_encode_properties(frame->payload.properties.class_id,
480 0           frame->payload.properties.decoded,
481             properties_encoded);
482 0 0         if (res < 0) {
483 0           return res;
484             }
485              
486 0           out_frame_len = res + 12;
487 0           break;
488             }
489              
490 0           case AMQP_FRAME_HEARTBEAT:
491 0           out_frame_len = 0;
492 0           break;
493              
494 0           default:
495 0           return AMQP_STATUS_INVALID_PARAMETER;
496             }
497              
498 1           amqp_e32((uint32_t)out_frame_len, amqp_offset(out_frame, 3));
499 1           amqp_e8(AMQP_FRAME_END, amqp_offset(out_frame, HEADER_SIZE + out_frame_len));
500              
501 1           encoded->bytes = out_frame;
502 1           encoded->len = out_frame_len + HEADER_SIZE + FOOTER_SIZE;
503              
504 1           return AMQP_STATUS_OK;
505             }
506              
507 0           int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) {
508 0           return amqp_send_frame_inner(state, frame, AMQP_SF_NONE,
509             amqp_time_infinite());
510             }
511              
512 1           int amqp_send_frame_inner(amqp_connection_state_t state,
513             const amqp_frame_t *frame, int flags,
514             amqp_time_t deadline) {
515             int res;
516             ssize_t sent;
517             amqp_bytes_t encoded;
518             amqp_time_t next_timeout;
519              
520             /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work
521             * correctly, then this could be un-done so that body-frames are sent as 3
522             * send calls, getting rid of the copy of the body content, some testing
523             * would need to be done to see if this would actually a win for performance.
524             * */
525 1           res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
526 1 50         if (AMQP_STATUS_OK != res) {
527 0           return res;
528             }
529              
530 1           start_send:
531              
532 1           next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat);
533              
534 1           sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags);
535 1 50         if (0 > sent) {
536 1           return (int)sent;
537             }
538              
539             /* A partial send has occurred, because of a heartbeat timeout (so try recv
540             * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */
541 0 0         if ((ssize_t)encoded.len != sent) {
542 0 0         if (amqp_time_equal(next_timeout, deadline)) {
543             /* timeout of method was received, so return from method*/
544 0           return AMQP_STATUS_TIMEOUT;
545             }
546              
547 0           res = amqp_try_recv(state);
548              
549 0 0         if (AMQP_STATUS_TIMEOUT == res) {
550 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
551 0 0         } else if (AMQP_STATUS_OK != res) {
552 0           return res;
553             }
554              
555 0           encoded.bytes = (uint8_t *)encoded.bytes + sent;
556 0           encoded.len -= sent;
557 0           goto start_send;
558             }
559              
560 0           res = amqp_time_s_from_now(&state->next_send_heartbeat,
561             amqp_heartbeat_send(state));
562 0           return res;
563             }
564              
565 0           amqp_table_t *amqp_get_server_properties(amqp_connection_state_t state) {
566 0           return &state->server_properties;
567             }
568              
569 0           amqp_table_t *amqp_get_client_properties(amqp_connection_state_t state) {
570 0           return &state->client_properties;
571             }