File Coverage

amqp_api.c
Criterion Covered Total %
statement 90 142 63.3
branch 21 52 40.3
condition n/a
subroutine n/a
pod n/a
total 111 194 57.2


line stmt bran cond sub pod time code
1             /*
2             * ***** BEGIN LICENSE BLOCK *****
3             * Version: MIT
4             *
5             * Portions created by Alan Antonuk are Copyright (c) 2012-2013
6             * Alan Antonuk. All Rights Reserved.
7             *
8             * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
9             * All Rights Reserved.
10             *
11             * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
12             * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
13             *
14             * Permission is hereby granted, free of charge, to any person
15             * obtaining a copy of this software and associated documentation
16             * files (the "Software"), to deal in the Software without
17             * restriction, including without limitation the rights to use, copy,
18             * modify, merge, publish, distribute, sublicense, and/or sell copies
19             * of the Software, and to permit persons to whom the Software is
20             * furnished to do so, subject to the following conditions:
21             *
22             * The above copyright notice and this permission notice shall be
23             * included in all copies or substantial portions of the Software.
24             *
25             * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26             * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27             * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28             * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29             * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30             * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31             * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32             * SOFTWARE.
33             * ***** END LICENSE BLOCK *****
34             */
35              
36             #ifdef HAVE_CONFIG_H
37             #include "config.h"
38             #endif
39              
40             #ifdef _MSC_VER
41             /* MSVC complains about sprintf being deprecated in favor of sprintf_s */
42             #define _CRT_SECURE_NO_WARNINGS
43             /* MSVC complains about strdup being deprecated in favor of _strdup */
44             #define _CRT_NONSTDC_NO_DEPRECATE
45             #endif
46              
47             #include "amqp_private.h"
48             #include "amqp_time.h"
49             #include
50             #include
51             #include
52             #include
53             #include
54              
55             #define ERROR_MASK (0x00FF)
56             #define ERROR_CATEGORY_MASK (0xFF00)
57              
58             enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
59              
60             static const char *base_error_strings[] = {
61             /* AMQP_STATUS_OK 0x0 */
62             "operation completed successfully",
63             /* AMQP_STATUS_NO_MEMORY -0x0001 */
64             "could not allocate memory",
65             /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
66             "invalid AMQP data",
67             /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
68             "unknown AMQP class id",
69             /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
70             "unknown AMQP method id",
71             /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
72             "hostname lookup failed",
73             /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
74             "incompatible AMQP version",
75             /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
76             "connection closed unexpectedly",
77             /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
78             "could not parse AMQP URL",
79             /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
80             "a socket error occurred",
81             /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
82             "invalid parameter",
83             /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
84             "table too large for buffer",
85             /* AMQP_STATUS_WRONG_METHOD -0x000C */
86             "unexpected method received",
87             /* AMQP_STATUS_TIMEOUT -0x000D */
88             "request timed out",
89             /* AMQP_STATUS_TIMER_FAILED -0x000E */
90             "system timer has failed",
91             /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
92             "heartbeat timeout, connection closed",
93             /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
94             "unexpected protocol state",
95             /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
96             "socket is closed",
97             /* AMQP_STATUS_SOCKET_INUSE -0x0012 */
98             "socket already open",
99             /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
100             "unsupported sasl method requested",
101             /* AMQP_STATUS_UNSUPPORTED -0x0014 */
102             "parameter value is unsupported"};
103              
104             static const char *tcp_error_strings[] = {
105             /* AMQP_STATUS_TCP_ERROR -0x0100 */
106             "a socket error occurred",
107             /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
108             "socket library initialization failed"};
109              
110             static const char *ssl_error_strings[] = {
111             /* AMQP_STATUS_SSL_ERRO R -0x0200 */
112             "a SSL error occurred",
113             /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
114             "SSL hostname verification failed",
115             /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
116             "SSL peer cert verification failed",
117             /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
118             "SSL handshake failed"};
119              
120             static const char *unknown_error_string = "(unknown error)";
121              
122 1           const char *amqp_error_string2(int code) {
123             const char *error_string;
124 1           size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
125 1           size_t error = (-code) & ERROR_MASK;
126              
127 1           switch (category) {
128             case EC_base:
129 1 50         if (error < (sizeof(base_error_strings) / sizeof(char *))) {
130 1           error_string = base_error_strings[error];
131             } else {
132 0           error_string = unknown_error_string;
133             }
134 1           break;
135              
136             case EC_tcp:
137 0 0         if (error < (sizeof(tcp_error_strings) / sizeof(char *))) {
138 0           error_string = tcp_error_strings[error];
139             } else {
140 0           error_string = unknown_error_string;
141             }
142 0           break;
143              
144             case EC_ssl:
145 0 0         if (error < (sizeof(ssl_error_strings) / sizeof(char *))) {
146 0           error_string = ssl_error_strings[error];
147             } else {
148 0           error_string = unknown_error_string;
149             }
150              
151 0           break;
152              
153             default:
154 0           error_string = unknown_error_string;
155 0           break;
156             }
157              
158 1           return error_string;
159             }
160              
161 0           char *amqp_error_string(int code) {
162             /* Previously sometimes clients had to flip the sign on a return value from a
163             * function to get the correct error code. Now, all error codes are negative.
164             * To keep people's legacy code running correctly, we map all error codes to
165             * negative values.
166             *
167             * This is only done with this deprecated function.
168             */
169 0 0         if (code > 0) {
170 0           code = -code;
171             }
172 0           return strdup(amqp_error_string2(code));
173             }
174              
175 0           void amqp_abort(const char *fmt, ...) {
176             va_list ap;
177 0           va_start(ap, fmt);
178 0           vfprintf(stderr, fmt, ap);
179 0           va_end(ap);
180 0           fputc('\n', stderr);
181 0           abort();
182             }
183              
184             const amqp_bytes_t amqp_empty_bytes = {0, NULL};
185             const amqp_table_t amqp_empty_table = {0, NULL};
186             const amqp_array_t amqp_empty_array = {0, NULL};
187              
188 29           int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
189             amqp_bytes_t exchange, amqp_bytes_t routing_key,
190             amqp_boolean_t mandatory, amqp_boolean_t immediate,
191             amqp_basic_properties_t const *properties,
192             amqp_bytes_t body) {
193             amqp_frame_t f;
194             size_t body_offset;
195 29           size_t usable_body_payload_size =
196 29           state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
197             int res;
198             int flagz;
199              
200             amqp_basic_publish_t m;
201             amqp_basic_properties_t default_properties;
202              
203 29           m.exchange = exchange;
204 29           m.routing_key = routing_key;
205 29           m.mandatory = mandatory;
206 29           m.immediate = immediate;
207 29           m.ticket = 0;
208              
209             /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
210             * should really be done in amqp_try_send/writev */
211 29           res = amqp_time_has_past(state->next_recv_heartbeat);
212 29 50         if (AMQP_STATUS_TIMER_FAILURE == res) {
213 0           return res;
214 29 100         } else if (AMQP_STATUS_TIMEOUT == res) {
215 2           res = amqp_try_recv(state);
216 2 50         if (AMQP_STATUS_TIMEOUT == res) {
217 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
218 2 50         } else if (AMQP_STATUS_OK != res) {
219 0           return res;
220             }
221             }
222              
223 29           res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
224             AMQP_SF_MORE, amqp_time_infinite());
225 29 100         if (res < 0) {
226 1           return res;
227             }
228              
229 28 50         if (properties == NULL) {
230 0           memset(&default_properties, 0, sizeof(default_properties));
231 0           properties = &default_properties;
232             }
233              
234 28           f.frame_type = AMQP_FRAME_HEADER;
235 28           f.channel = channel;
236 28           f.payload.properties.class_id = AMQP_BASIC_CLASS;
237 28           f.payload.properties.body_size = body.len;
238 28           f.payload.properties.decoded = (void *)properties;
239              
240 28 50         if (body.len > 0) {
241 28           flagz = AMQP_SF_MORE;
242             } else {
243 0           flagz = AMQP_SF_NONE;
244             }
245 28           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
246 28 50         if (res < 0) {
247 0           return res;
248             }
249              
250 28           body_offset = 0;
251 65 100         while (body_offset < body.len) {
252 37           size_t remaining = body.len - body_offset;
253              
254 37 50         if (remaining == 0) {
255 0           break;
256             }
257              
258 37           f.frame_type = AMQP_FRAME_BODY;
259 37           f.channel = channel;
260 37           f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
261 37 100         if (remaining >= usable_body_payload_size) {
262 9           f.payload.body_fragment.len = usable_body_payload_size;
263 9           flagz = AMQP_SF_MORE;
264             } else {
265 28           f.payload.body_fragment.len = remaining;
266 28           flagz = AMQP_SF_NONE;
267             }
268              
269 37           body_offset += f.payload.body_fragment.len;
270 37           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
271 37 50         if (res < 0) {
272 0           return res;
273             }
274             }
275              
276 29           return AMQP_STATUS_OK;
277             }
278              
279 1           amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
280             amqp_channel_t channel, int code) {
281             char codestr[13];
282 1           amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
283             amqp_channel_close_t req;
284              
285 1 50         if (code < 0 || code > UINT16_MAX) {
    50          
286 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
287             }
288              
289 1           req.reply_code = (uint16_t)code;
290 1           req.reply_text.bytes = codestr;
291 1           req.reply_text.len = sprintf(codestr, "%d", code);
292 1           req.class_id = 0;
293 1           req.method_id = 0;
294              
295 1           return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
296             &req);
297             }
298              
299 31           amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
300             int code) {
301             char codestr[13];
302 31           amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
303             amqp_channel_close_t req;
304              
305 31 50         if (code < 0 || code > UINT16_MAX) {
    50          
306 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
307             }
308              
309 31           req.reply_code = (uint16_t)code;
310 31           req.reply_text.bytes = codestr;
311 31           req.reply_text.len = sprintf(codestr, "%d", code);
312 31           req.class_id = 0;
313 31           req.method_id = 0;
314              
315 31           return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
316             }
317              
318 2           int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
319             uint64_t delivery_tag, amqp_boolean_t multiple) {
320             amqp_basic_ack_t m;
321 2           m.delivery_tag = delivery_tag;
322 2           m.multiple = multiple;
323 2           return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
324             }
325              
326 22           amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
327             amqp_channel_t channel, amqp_bytes_t queue,
328             amqp_boolean_t no_ack) {
329 22           amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
330             AMQP_BASIC_GET_EMPTY_METHOD, 0};
331             amqp_basic_get_t req;
332 22           req.ticket = 0;
333 22           req.queue = queue;
334 22           req.no_ack = no_ack;
335              
336 22           state->most_recent_api_result =
337 22           amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
338 22           return state->most_recent_api_result;
339             }
340              
341 1           int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
342             uint64_t delivery_tag, amqp_boolean_t requeue) {
343             amqp_basic_reject_t req;
344 1           req.delivery_tag = delivery_tag;
345 1           req.requeue = requeue;
346 1           return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
347             }
348              
349 1           int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
350             uint64_t delivery_tag, amqp_boolean_t multiple,
351             amqp_boolean_t requeue) {
352             amqp_basic_nack_t req;
353 1           req.delivery_tag = delivery_tag;
354 1           req.multiple = multiple;
355 1           req.requeue = requeue;
356 1           return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
357             }
358              
359 0           struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
360 0           return state->handshake_timeout;
361             }
362              
363 0           int amqp_set_handshake_timeout(amqp_connection_state_t state,
364             const struct timeval *timeout) {
365 0 0         if (timeout) {
366 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
367 0           return AMQP_STATUS_INVALID_PARAMETER;
368             }
369 0           state->internal_handshake_timeout = *timeout;
370 0           state->handshake_timeout = &state->internal_handshake_timeout;
371             } else {
372 0           state->handshake_timeout = NULL;
373             }
374              
375 0           return AMQP_STATUS_OK;
376             }
377              
378 0           struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
379 0           return state->rpc_timeout;
380             }
381              
382 0           int amqp_set_rpc_timeout(amqp_connection_state_t state,
383             const struct timeval *timeout) {
384 0 0         if (timeout) {
385 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
386 0           return AMQP_STATUS_INVALID_PARAMETER;
387             }
388 0           state->rpc_timeout = &state->internal_rpc_timeout;
389 0           *state->rpc_timeout = *timeout;
390             } else {
391 0           state->rpc_timeout = NULL;
392             }
393 0           return AMQP_STATUS_OK;
394             }