File Coverage

amqp_connection.c
Criterion Covered Total %
statement 207 267 77.5
branch 50 86 58.1
condition n/a
subroutine n/a
pod n/a
total 257 353 72.8


line stmt bran cond sub pod time code
1             /*
2             * ***** BEGIN LICENSE BLOCK *****
3             * Version: MIT
4             *
5             * Portions created by Alan Antonuk are Copyright (c) 2012-2014
6             * Alan Antonuk. All Rights Reserved.
7             *
8             * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
9             * All Rights Reserved.
10             *
11             * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
12             * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
13             *
14             * Permission is hereby granted, free of charge, to any person
15             * obtaining a copy of this software and associated documentation
16             * files (the "Software"), to deal in the Software without
17             * restriction, including without limitation the rights to use, copy,
18             * modify, merge, publish, distribute, sublicense, and/or sell copies
19             * of the Software, and to permit persons to whom the Software is
20             * furnished to do so, subject to the following conditions:
21             *
22             * The above copyright notice and this permission notice shall be
23             * included in all copies or substantial portions of the Software.
24             *
25             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32             * SOFTWARE.
33             * ***** END LICENSE BLOCK *****
34             */
35              
36             #ifdef HAVE_CONFIG_H
37             #include "config.h"
38             #endif
39              
40             #ifdef _MSC_VER
41             #define _CRT_SECURE_NO_WARNINGS
42             #endif
43              
44             #include "amqp_private.h"
45             #include "amqp_tcp_socket.h"
46             #include "amqp_time.h"
47             #include
48             #include
49             #include
50             #include
51             #include
52              
53             #ifndef AMQP_INITIAL_FRAME_POOL_PAGE_SIZE
54             #define AMQP_INITIAL_FRAME_POOL_PAGE_SIZE 65536
55             #endif
56              
57             #ifndef AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE
58             #define AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE 131072
59             #endif
60              
61             #ifndef AMQP_DEFAULT_LOGIN_TIMEOUT_SEC
62             #define AMQP_DEFAULT_LOGIN_TIMEOUT_SEC 12
63             #endif
64              
65             #define ENFORCE_STATE(statevec, statenum) \
66             { \
67             amqp_connection_state_t _check_state = (statevec); \
68             amqp_connection_state_enum _wanted_state = (statenum); \
69             if (_check_state->state != _wanted_state) \
70             amqp_abort( \
71             "Programming error: invalid AMQP connection state: expected %d, " \
72             "got %d", \
73             _wanted_state, _check_state->state); \
74             }
75              
76 30           amqp_connection_state_t amqp_new_connection(void) {
77             int res;
78 30           amqp_connection_state_t state = (amqp_connection_state_t)calloc(
79             1, sizeof(struct amqp_connection_state_t_));
80              
81 30 50         if (state == NULL) {
82 0           return NULL;
83             }
84              
85 30           res = amqp_tune_connection(state, 0, AMQP_INITIAL_FRAME_POOL_PAGE_SIZE, 0);
86 30 50         if (0 != res) {
87 0           goto out_nomem;
88             }
89              
90 30           state->inbound_buffer.bytes = state->header_buffer;
91 30           state->inbound_buffer.len = sizeof(state->header_buffer);
92              
93 30           state->state = CONNECTION_STATE_INITIAL;
94             /* the server protocol version response is 8 bytes, which conveniently
95             is also the minimum frame size */
96 30           state->target_size = 8;
97              
98 30           state->sock_inbound_buffer.len = AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE;
99 30           state->sock_inbound_buffer.bytes =
100 30           malloc(AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE);
101 30 50         if (state->sock_inbound_buffer.bytes == NULL) {
102 0           goto out_nomem;
103             }
104              
105 30           init_amqp_pool(&state->properties_pool, 512);
106              
107             /* Use address of the internal_handshake_timeout object by default. */
108 30           state->internal_handshake_timeout.tv_sec = AMQP_DEFAULT_LOGIN_TIMEOUT_SEC;
109 30           state->internal_handshake_timeout.tv_usec = 0;
110 30           state->handshake_timeout = &state->internal_handshake_timeout;
111              
112 30           return state;
113              
114             out_nomem:
115 0           free(state->sock_inbound_buffer.bytes);
116 0           free(state);
117 0           return NULL;
118             }
119              
120 780           int amqp_get_sockfd(amqp_connection_state_t state) {
121 780 50         return state->socket ? amqp_socket_get_sockfd(state->socket) : -1;
122             }
123              
124 0           void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) {
125 0           amqp_socket_t *socket = amqp_tcp_socket_new(state);
126 0 0         if (!socket) {
127 0           amqp_abort("%s", strerror(errno));
128             }
129 0           amqp_tcp_socket_set_sockfd(socket, sockfd);
130 0           }
131              
132 44           void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) {
133 44           amqp_socket_delete(state->socket);
134 44           state->socket = socket;
135 44           }
136              
137 445           amqp_socket_t *amqp_get_socket(amqp_connection_state_t state) {
138 445           return state->socket;
139             }
140              
141 67           int amqp_tune_connection(amqp_connection_state_t state, int channel_max,
142             int frame_max, int heartbeat) {
143             void *newbuf;
144             int res;
145              
146 67 50         ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
147              
148 67           state->channel_max = channel_max;
149 67           state->frame_max = frame_max;
150              
151 67           state->heartbeat = heartbeat;
152 67 50         if (0 > state->heartbeat) {
153 0           state->heartbeat = 0;
154             }
155              
156 67           res = amqp_time_s_from_now(&state->next_send_heartbeat,
157             amqp_heartbeat_send(state));
158 67 50         if (AMQP_STATUS_OK != res) {
159 0           return res;
160             }
161 67           res = amqp_time_s_from_now(&state->next_recv_heartbeat,
162             amqp_heartbeat_recv(state));
163 67 50         if (AMQP_STATUS_OK != res) {
164 0           return res;
165             }
166              
167 67           state->outbound_buffer.len = frame_max;
168 67           newbuf = realloc(state->outbound_buffer.bytes, frame_max);
169 67 50         if (newbuf == NULL) {
170 0           return AMQP_STATUS_NO_MEMORY;
171             }
172 67           state->outbound_buffer.bytes = newbuf;
173              
174 67           return AMQP_STATUS_OK;
175             }
176              
177 1           int amqp_get_channel_max(amqp_connection_state_t state) {
178 1           return state->channel_max;
179             }
180              
181 0           int amqp_get_frame_max(amqp_connection_state_t state) {
182 0           return state->frame_max;
183             }
184              
185 0           int amqp_get_heartbeat(amqp_connection_state_t state) {
186 0           return state->heartbeat;
187             }
188              
189 30           int amqp_destroy_connection(amqp_connection_state_t state) {
190 30           int status = AMQP_STATUS_OK;
191 30 50         if (state) {
192             int i;
193 510 100         for (i = 0; i < POOL_TABLE_SIZE; ++i) {
194 480           amqp_pool_table_entry_t *entry = state->pool_table[i];
195 532 100         while (NULL != entry) {
196 52           amqp_pool_table_entry_t *todelete = entry;
197 52           empty_amqp_pool(&entry->pool);
198 52           entry = entry->next;
199 52           free(todelete);
200             }
201             }
202              
203 30           free(state->outbound_buffer.bytes);
204 30           free(state->sock_inbound_buffer.bytes);
205 30           amqp_socket_delete(state->socket);
206 30           empty_amqp_pool(&state->properties_pool);
207 30           free(state);
208             }
209 30           return status;
210             }
211              
212 480           static void return_to_idle(amqp_connection_state_t state) {
213 480           state->inbound_buffer.len = sizeof(state->header_buffer);
214 480           state->inbound_buffer.bytes = state->header_buffer;
215 480           state->inbound_offset = 0;
216 480           state->target_size = HEADER_SIZE;
217 480           state->state = CONNECTION_STATE_IDLE;
218 480           }
219              
220 968           static size_t consume_data(amqp_connection_state_t state,
221             amqp_bytes_t *received_data) {
222             /* how much data is available and will fit? */
223 968           size_t bytes_consumed = state->target_size - state->inbound_offset;
224 968 100         if (received_data->len < bytes_consumed) {
225 8           bytes_consumed = received_data->len;
226             }
227              
228 968           memcpy(amqp_offset(state->inbound_buffer.bytes, state->inbound_offset),
229 968           received_data->bytes, bytes_consumed);
230 968           state->inbound_offset += bytes_consumed;
231 968           received_data->bytes = amqp_offset(received_data->bytes, bytes_consumed);
232 968           received_data->len -= bytes_consumed;
233              
234 968           return bytes_consumed;
235             }
236              
237 488           int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data,
238             amqp_frame_t *decoded_frame) {
239             size_t bytes_consumed;
240             void *raw_frame;
241              
242             /* Returning frame_type of zero indicates either insufficient input,
243             or a complete, ignored frame was read. */
244 488           decoded_frame->frame_type = 0;
245              
246 488 50         if (received_data.len == 0) {
247 0           return AMQP_STATUS_OK;
248             }
249              
250 488 100         if (state->state == CONNECTION_STATE_IDLE) {
251 453           state->state = CONNECTION_STATE_HEADER;
252             }
253              
254 488           bytes_consumed = consume_data(state, &received_data);
255              
256             /* do we have target_size data yet? if not, return with the
257             expectation that more will arrive */
258 488 100         if (state->inbound_offset < state->target_size) {
259 1           return (int)bytes_consumed;
260             }
261              
262 487           raw_frame = state->inbound_buffer.bytes;
263              
264 487           switch (state->state) {
265             case CONNECTION_STATE_INITIAL:
266             /* check for a protocol header from the server */
267 27 50         if (memcmp(raw_frame, "AMQP", 4) == 0) {
268 0           decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER;
269 0           decoded_frame->channel = 0;
270              
271 0           decoded_frame->payload.protocol_header.transport_high =
272 0           amqp_d8(amqp_offset(raw_frame, 4));
273 0           decoded_frame->payload.protocol_header.transport_low =
274 0           amqp_d8(amqp_offset(raw_frame, 5));
275 0           decoded_frame->payload.protocol_header.protocol_version_major =
276 0           amqp_d8(amqp_offset(raw_frame, 6));
277 0           decoded_frame->payload.protocol_header.protocol_version_minor =
278 0           amqp_d8(amqp_offset(raw_frame, 7));
279              
280 0           return_to_idle(state);
281 0           return (int)bytes_consumed;
282             }
283              
284             /* it's not a protocol header; fall through to process it as a
285             regular frame header */
286              
287             case CONNECTION_STATE_HEADER: {
288             amqp_channel_t channel;
289             amqp_pool_t *channel_pool;
290             uint32_t frame_size;
291              
292 480           channel = amqp_d16(amqp_offset(raw_frame, 1));
293              
294             /* frame length is 3 bytes in */
295 480           frame_size = amqp_d32(amqp_offset(raw_frame, 3));
296             /* To prevent the target_size calculation below from overflowing, check
297             * that the stated frame_size is smaller than a signed 32-bit. Given
298             * the library only allows configuring frame_max as an int32_t, and
299             * frame_size is uint32_t, the math below is safe from overflow. */
300 480 50         if (frame_size >= INT32_MAX) {
301 0           return AMQP_STATUS_BAD_AMQP_DATA;
302             }
303              
304 480           state->target_size = frame_size + HEADER_SIZE + FOOTER_SIZE;
305 480 50         if ((size_t)state->frame_max < state->target_size) {
306 0           return AMQP_STATUS_BAD_AMQP_DATA;
307             }
308              
309 480           channel_pool = amqp_get_or_create_channel_pool(state, channel);
310 480 50         if (NULL == channel_pool) {
311 0           return AMQP_STATUS_NO_MEMORY;
312             }
313              
314 480           amqp_pool_alloc_bytes(channel_pool, state->target_size,
315             &state->inbound_buffer);
316 480 50         if (NULL == state->inbound_buffer.bytes) {
317 0           return AMQP_STATUS_NO_MEMORY;
318             }
319 480           memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE);
320 480           raw_frame = state->inbound_buffer.bytes;
321              
322 480           state->state = CONNECTION_STATE_BODY;
323              
324 480           bytes_consumed += consume_data(state, &received_data);
325              
326             /* do we have target_size data yet? if not, return with the
327             expectation that more will arrive */
328 480 100         if (state->inbound_offset < state->target_size) {
329 7           return (int)bytes_consumed;
330             }
331             }
332             /* fall through to process body */
333              
334             case CONNECTION_STATE_BODY: {
335             amqp_bytes_t encoded;
336             int res;
337             amqp_pool_t *channel_pool;
338              
339             /* Check frame end marker (footer) */
340 480 50         if (amqp_d8(amqp_offset(raw_frame, state->target_size - 1)) !=
341             AMQP_FRAME_END) {
342 0           return AMQP_STATUS_BAD_AMQP_DATA;
343             }
344              
345 480           decoded_frame->frame_type = amqp_d8(amqp_offset(raw_frame, 0));
346 480           decoded_frame->channel = amqp_d16(amqp_offset(raw_frame, 1));
347              
348 480           channel_pool =
349 480           amqp_get_or_create_channel_pool(state, decoded_frame->channel);
350 480 50         if (NULL == channel_pool) {
351 0           return AMQP_STATUS_NO_MEMORY;
352             }
353              
354 480           switch (decoded_frame->frame_type) {
355             case AMQP_FRAME_METHOD:
356 396           decoded_frame->payload.method.id =
357 396           amqp_d32(amqp_offset(raw_frame, HEADER_SIZE));
358 396           encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4);
359 396           encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
360              
361 396           res = amqp_decode_method(decoded_frame->payload.method.id,
362             channel_pool, encoded,
363             &decoded_frame->payload.method.decoded);
364 396 50         if (res < 0) {
365 0           return res;
366             }
367              
368 396           break;
369              
370             case AMQP_FRAME_HEADER:
371 25           decoded_frame->payload.properties.class_id =
372 25           amqp_d16(amqp_offset(raw_frame, HEADER_SIZE));
373             /* unused 2-byte weight field goes here */
374 25           decoded_frame->payload.properties.body_size =
375 25           amqp_d64(amqp_offset(raw_frame, HEADER_SIZE + 4));
376 25           encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12);
377 25           encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE;
378 25           decoded_frame->payload.properties.raw = encoded;
379              
380 25           res = amqp_decode_properties(
381 25           decoded_frame->payload.properties.class_id, channel_pool, encoded,
382             &decoded_frame->payload.properties.decoded);
383 25 50         if (res < 0) {
384 0           return res;
385             }
386              
387 25           break;
388              
389             case AMQP_FRAME_BODY:
390 34           decoded_frame->payload.body_fragment.len =
391 34           state->target_size - HEADER_SIZE - FOOTER_SIZE;
392 34           decoded_frame->payload.body_fragment.bytes =
393 34           amqp_offset(raw_frame, HEADER_SIZE);
394 34           break;
395              
396             case AMQP_FRAME_HEARTBEAT:
397 25           break;
398              
399             default:
400             /* Ignore the frame */
401 0           decoded_frame->frame_type = 0;
402 0           break;
403             }
404              
405 480           return_to_idle(state);
406 480           return (int)bytes_consumed;
407             }
408              
409             default:
410 0           amqp_abort("Internal error: invalid amqp_connection_state_t->state %d",
411 0           state->state);
412             }
413             }
414              
415 217           amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) {
416 217           return (state->state == CONNECTION_STATE_IDLE);
417             }
418              
419 291           void amqp_release_buffers(amqp_connection_state_t state) {
420             int i;
421 291 50         ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
422              
423 4947 100         for (i = 0; i < POOL_TABLE_SIZE; ++i) {
424 4656           amqp_pool_table_entry_t *entry = state->pool_table[i];
425              
426 5130 100         for (; NULL != entry; entry = entry->next) {
427 474           amqp_maybe_release_buffers_on_channel(state, entry->channel);
428             }
429             }
430 291           }
431              
432 37           void amqp_maybe_release_buffers(amqp_connection_state_t state) {
433 37 50         if (amqp_release_buffers_ok(state)) {
434 37           amqp_release_buffers(state);
435             }
436 37           }
437              
438 479           void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state,
439             amqp_channel_t channel) {
440             amqp_link_t *queued_link;
441             amqp_pool_t *pool;
442 479 50         if (CONNECTION_STATE_IDLE != state->state) {
443 0           return;
444             }
445              
446 479           queued_link = state->first_queued_frame;
447              
448 499 100         while (NULL != queued_link) {
449 21           amqp_frame_t *frame = queued_link->data;
450 21 100         if (channel == frame->channel) {
451 1           return;
452             }
453              
454 20           queued_link = queued_link->next;
455             }
456              
457 478           pool = amqp_get_channel_pool(state, channel);
458              
459 478 50         if (pool != NULL) {
460 478           recycle_amqp_pool(pool);
461             }
462             }
463              
464 486           static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer,
465             amqp_bytes_t *encoded) {
466 486           void *out_frame = buffer.bytes;
467             size_t out_frame_len;
468             int res;
469              
470 486           amqp_e8(frame->frame_type, amqp_offset(out_frame, 0));
471 486           amqp_e16(frame->channel, amqp_offset(out_frame, 1));
472              
473 486           switch (frame->frame_type) {
474             case AMQP_FRAME_BODY: {
475 37           const amqp_bytes_t *body = &frame->payload.body_fragment;
476              
477 37           memcpy(amqp_offset(out_frame, HEADER_SIZE), body->bytes, body->len);
478              
479 37           out_frame_len = body->len;
480 37           break;
481             }
482             case AMQP_FRAME_METHOD: {
483             amqp_bytes_t method_encoded;
484              
485 409           amqp_e32(frame->payload.method.id, amqp_offset(out_frame, HEADER_SIZE));
486              
487 409           method_encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 4);
488 409           method_encoded.len = buffer.len - HEADER_SIZE - 4 - FOOTER_SIZE;
489              
490 409           res = amqp_encode_method(frame->payload.method.id,
491             frame->payload.method.decoded, method_encoded);
492 409 50         if (res < 0) {
493 0           return res;
494             }
495              
496 409           out_frame_len = res + 4;
497 409           break;
498             }
499              
500             case AMQP_FRAME_HEADER: {
501             amqp_bytes_t properties_encoded;
502              
503 28           amqp_e16(frame->payload.properties.class_id,
504             amqp_offset(out_frame, HEADER_SIZE));
505 28           amqp_e16(0, amqp_offset(out_frame, HEADER_SIZE + 2)); /* "weight" */
506 28           amqp_e64(frame->payload.properties.body_size,
507             amqp_offset(out_frame, HEADER_SIZE + 4));
508              
509 28           properties_encoded.bytes = amqp_offset(out_frame, HEADER_SIZE + 12);
510 28           properties_encoded.len = buffer.len - HEADER_SIZE - 12 - FOOTER_SIZE;
511              
512 28           res = amqp_encode_properties(frame->payload.properties.class_id,
513             frame->payload.properties.decoded,
514             properties_encoded);
515 28 50         if (res < 0) {
516 0           return res;
517             }
518              
519 28           out_frame_len = res + 12;
520 28           break;
521             }
522              
523             case AMQP_FRAME_HEARTBEAT:
524 12           out_frame_len = 0;
525 12           break;
526              
527             default:
528 0           return AMQP_STATUS_INVALID_PARAMETER;
529             }
530              
531 486           amqp_e32((uint32_t)out_frame_len, amqp_offset(out_frame, 3));
532 486           amqp_e8(AMQP_FRAME_END, amqp_offset(out_frame, HEADER_SIZE + out_frame_len));
533              
534 486           encoded->bytes = out_frame;
535 486           encoded->len = out_frame_len + HEADER_SIZE + FOOTER_SIZE;
536              
537 486           return AMQP_STATUS_OK;
538             }
539              
540 12           int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) {
541 12           return amqp_send_frame_inner(state, frame, AMQP_SF_NONE,
542             amqp_time_infinite());
543             }
544              
545 486           int amqp_send_frame_inner(amqp_connection_state_t state,
546             const amqp_frame_t *frame, int flags,
547             amqp_time_t deadline) {
548             int res;
549             ssize_t sent;
550             amqp_bytes_t encoded;
551             amqp_time_t next_timeout;
552              
553             /* TODO: if the AMQP_SF_MORE socket optimization can be shown to work
554             * correctly, then this could be un-done so that body-frames are sent as 3
555             * send calls, getting rid of the copy of the body content, some testing
556             * would need to be done to see if this would actually a win for performance.
557             * */
558 486           res = amqp_frame_to_bytes(frame, state->outbound_buffer, &encoded);
559 486 50         if (AMQP_STATUS_OK != res) {
560 0           return res;
561             }
562              
563             start_send:
564              
565 486           next_timeout = amqp_time_first(deadline, state->next_recv_heartbeat);
566              
567 486           sent = amqp_try_send(state, encoded.bytes, encoded.len, next_timeout, flags);
568 486 100         if (0 > sent) {
569 4           return (int)sent;
570             }
571              
572             /* A partial send has occurred, because of a heartbeat timeout (so try recv
573             * something) or common timeout (so return AMQP_STATUS_TIMEOUT) */
574 482 50         if ((ssize_t)encoded.len != sent) {
575 0 0         if (amqp_time_equal(next_timeout, deadline)) {
576             /* timeout of method was received, so return from method*/
577 0           return AMQP_STATUS_TIMEOUT;
578             }
579              
580 0           res = amqp_try_recv(state);
581              
582 0 0         if (AMQP_STATUS_TIMEOUT == res) {
583 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
584 0 0         } else if (AMQP_STATUS_OK != res) {
585 0           return res;
586             }
587              
588 0           encoded.bytes = (uint8_t *)encoded.bytes + sent;
589 0           encoded.len -= sent;
590 0           goto start_send;
591             }
592              
593 482           res = amqp_time_s_from_now(&state->next_send_heartbeat,
594             amqp_heartbeat_send(state));
595 486           return res;
596             }
597              
598 1           amqp_table_t *amqp_get_server_properties(amqp_connection_state_t state) {
599 1           return &state->server_properties;
600             }
601              
602 1           amqp_table_t *amqp_get_client_properties(amqp_connection_state_t state) {
603 1           return &state->client_properties;
604             }