File Coverage

amqp_api.c
Criterion Covered Total %
statement 18 180 10.0
branch 3 62 4.8
condition n/a
subroutine n/a
pod n/a
total 21 242 8.6


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #ifdef HAVE_CONFIG_H
5             #include "config.h"
6             #endif
7              
8             #ifdef _MSC_VER
9             /* MSVC complains about sprintf being deprecated in favor of sprintf_s */
10             #define _CRT_SECURE_NO_WARNINGS
11             /* MSVC complains about strdup being deprecated in favor of _strdup */
12             #define _CRT_NONSTDC_NO_DEPRECATE
13             #endif
14              
15             #include "amqp_private.h"
16             #include "amqp_time.h"
17             #include
18             #include
19             #include
20             #include
21             #include
22              
23             #define ERROR_MASK (0x00FF)
24             #define ERROR_CATEGORY_MASK (0xFF00)
25              
26             enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
27              
28             static const char *base_error_strings[] = {
29             /* AMQP_STATUS_OK 0x0 */
30             "operation completed successfully",
31             /* AMQP_STATUS_NO_MEMORY -0x0001 */
32             "could not allocate memory",
33             /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */
34             "invalid AMQP data",
35             /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */
36             "unknown AMQP class id",
37             /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */
38             "unknown AMQP method id",
39             /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
40             "hostname lookup failed",
41             /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */
42             "incompatible AMQP version",
43             /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */
44             "connection closed unexpectedly",
45             /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */
46             "could not parse AMQP URL",
47             /* AMQP_STATUS_SOCKET_ERROR -0x0009 */
48             "a socket error occurred",
49             /* AMQP_STATUS_INVALID_PARAMETER -0x000A */
50             "invalid parameter",
51             /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */
52             "table too large for buffer",
53             /* AMQP_STATUS_WRONG_METHOD -0x000C */
54             "unexpected method received",
55             /* AMQP_STATUS_TIMEOUT -0x000D */
56             "request timed out",
57             /* AMQP_STATUS_TIMER_FAILED -0x000E */
58             "system timer has failed",
59             /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */
60             "heartbeat timeout, connection closed",
61             /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */
62             "unexpected protocol state",
63             /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */
64             "socket is closed",
65             /* AMQP_STATUS_SOCKET_INUSE -0x0012 */
66             "socket already open",
67             /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
68             "unsupported sasl method requested",
69             /* AMQP_STATUS_UNSUPPORTED -0x0014 */
70             "parameter value is unsupported"};
71              
72             static const char *tcp_error_strings[] = {
73             /* AMQP_STATUS_TCP_ERROR -0x0100 */
74             "a socket error occurred",
75             /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */
76             "socket library initialization failed"};
77              
78             static const char *ssl_error_strings[] = {
79             /* AMQP_STATUS_SSL_ERROR -0x0200 */
80             "a SSL error occurred",
81             /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
82             "SSL hostname verification failed",
83             /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */
84             "SSL peer cert verification failed",
85             /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */
86             "SSL handshake failed",
87             /* AMQP_STATUS_SSL_SET_ENGINE_FAILED -0x0204 */
88             "SSL setting engine failed",
89             /* AMQP_STATUS_SSL_UNIMPLEMENTED -0x0204 */
90             "SSL API is not implemented"};
91              
92             static const char *unknown_error_string = "(unknown error)";
93              
94 1           const char *amqp_error_string2(int code) {
95             const char *error_string;
96 1           size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
97 1           size_t error = (-code) & ERROR_MASK;
98              
99 1           switch (category) {
100 1           case EC_base:
101 1 50         if (error < (sizeof(base_error_strings) / sizeof(char *))) {
102 1           error_string = base_error_strings[error];
103             } else {
104 0           error_string = unknown_error_string;
105             }
106 1           break;
107              
108 0           case EC_tcp:
109 0 0         if (error < (sizeof(tcp_error_strings) / sizeof(char *))) {
110 0           error_string = tcp_error_strings[error];
111             } else {
112 0           error_string = unknown_error_string;
113             }
114 0           break;
115              
116 0           case EC_ssl:
117 0 0         if (error < (sizeof(ssl_error_strings) / sizeof(char *))) {
118 0           error_string = ssl_error_strings[error];
119             } else {
120 0           error_string = unknown_error_string;
121             }
122              
123 0           break;
124              
125 0           default:
126 0           error_string = unknown_error_string;
127 0           break;
128             }
129              
130 1           return error_string;
131             }
132              
133 0           char *amqp_error_string(int code) {
134             /* Previously sometimes clients had to flip the sign on a return value from a
135             * function to get the correct error code. Now, all error codes are negative.
136             * To keep people's legacy code running correctly, we map all error codes to
137             * negative values.
138             *
139             * This is only done with this deprecated function.
140             */
141 0 0         if (code > 0) {
142 0           code = -code;
143             }
144 0           return strdup(amqp_error_string2(code));
145             }
146              
147 0           void amqp_abort(const char *fmt, ...) {
148             va_list ap;
149 0           va_start(ap, fmt);
150 0           vfprintf(stderr, fmt, ap);
151 0           va_end(ap);
152 0           fputc('\n', stderr);
153 0           abort();
154             }
155              
156             const amqp_bytes_t amqp_empty_bytes = {0, NULL};
157             const amqp_table_t amqp_empty_table = {0, NULL};
158             const amqp_array_t amqp_empty_array = {0, NULL};
159              
160 0           int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
161             amqp_bytes_t exchange, amqp_bytes_t routing_key,
162             amqp_boolean_t mandatory, amqp_boolean_t immediate,
163             amqp_basic_properties_t const *properties,
164             amqp_bytes_t body) {
165             amqp_frame_t f;
166             size_t body_offset;
167 0           size_t usable_body_payload_size =
168 0           state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
169             int res;
170             int flagz;
171              
172             amqp_basic_publish_t m;
173             amqp_basic_properties_t default_properties;
174              
175 0           m.exchange = exchange;
176 0           m.routing_key = routing_key;
177 0           m.mandatory = mandatory;
178 0           m.immediate = immediate;
179 0           m.ticket = 0;
180              
181             /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
182             * should really be done in amqp_try_send/writev */
183 0           res = amqp_time_has_past(state->next_recv_heartbeat);
184 0 0         if (AMQP_STATUS_TIMER_FAILURE == res) {
185 0           return res;
186 0 0         } else if (AMQP_STATUS_TIMEOUT == res) {
187 0           res = amqp_try_recv(state);
188 0 0         if (AMQP_STATUS_TIMEOUT == res) {
189 0           return AMQP_STATUS_HEARTBEAT_TIMEOUT;
190 0 0         } else if (AMQP_STATUS_OK != res) {
191 0           return res;
192             }
193             }
194              
195 0           res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
196             AMQP_SF_MORE, amqp_time_infinite());
197 0 0         if (res < 0) {
198 0           return res;
199             }
200              
201 0 0         if (properties == NULL) {
202 0           memset(&default_properties, 0, sizeof(default_properties));
203 0           properties = &default_properties;
204             }
205              
206 0           f.frame_type = AMQP_FRAME_HEADER;
207 0           f.channel = channel;
208 0           f.payload.properties.class_id = AMQP_BASIC_CLASS;
209 0           f.payload.properties.body_size = body.len;
210 0           f.payload.properties.decoded = (void *)properties;
211              
212 0 0         if (body.len > 0) {
213 0           flagz = AMQP_SF_MORE;
214             } else {
215 0           flagz = AMQP_SF_NONE;
216             }
217 0           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
218 0 0         if (res < 0) {
219 0           return res;
220             }
221              
222 0           body_offset = 0;
223 0 0         while (body_offset < body.len) {
224 0           size_t remaining = body.len - body_offset;
225              
226 0 0         if (remaining == 0) {
227 0           break;
228             }
229              
230 0           f.frame_type = AMQP_FRAME_BODY;
231 0           f.channel = channel;
232 0           f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
233 0 0         if (remaining >= usable_body_payload_size) {
234 0           f.payload.body_fragment.len = usable_body_payload_size;
235 0           flagz = AMQP_SF_MORE;
236             } else {
237 0           f.payload.body_fragment.len = remaining;
238 0           flagz = AMQP_SF_NONE;
239             }
240              
241 0           body_offset += f.payload.body_fragment.len;
242 0           res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
243 0 0         if (res < 0) {
244 0           return res;
245             }
246             }
247              
248 0           return AMQP_STATUS_OK;
249             }
250              
251 0           amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
252             amqp_channel_t channel, int code) {
253             char codestr[13];
254 0           amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
255             amqp_channel_close_t req;
256              
257 0 0         if (code < 0 || code > UINT16_MAX) {
    0          
258 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
259             }
260              
261 0           req.reply_code = (uint16_t)code;
262 0           req.reply_text.bytes = codestr;
263 0           req.reply_text.len = sprintf(codestr, "%d", code);
264 0           req.class_id = 0;
265 0           req.method_id = 0;
266              
267 0           return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
268             &req);
269             }
270              
271 1           amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
272             int code) {
273             char codestr[13];
274 1           amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
275             amqp_channel_close_t req;
276              
277 1 50         if (code < 0 || code > UINT16_MAX) {
    50          
278 0           return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
279             }
280              
281 1           req.reply_code = (uint16_t)code;
282 1           req.reply_text.bytes = codestr;
283 1           req.reply_text.len = sprintf(codestr, "%d", code);
284 1           req.class_id = 0;
285 1           req.method_id = 0;
286              
287 1           return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
288             }
289              
290 0           int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
291             uint64_t delivery_tag, amqp_boolean_t multiple) {
292             amqp_basic_ack_t m;
293 0           m.delivery_tag = delivery_tag;
294 0           m.multiple = multiple;
295 0           return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
296             }
297              
298 0           amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
299             amqp_channel_t channel, amqp_bytes_t queue,
300             amqp_boolean_t no_ack) {
301 0           amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
302             AMQP_BASIC_GET_EMPTY_METHOD, 0};
303             amqp_basic_get_t req;
304 0           req.ticket = 0;
305 0           req.queue = queue;
306 0           req.no_ack = no_ack;
307              
308             state->most_recent_api_result =
309 0           amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
310 0           return state->most_recent_api_result;
311             }
312              
313 0           int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
314             uint64_t delivery_tag, amqp_boolean_t requeue) {
315             amqp_basic_reject_t req;
316 0           req.delivery_tag = delivery_tag;
317 0           req.requeue = requeue;
318 0           return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
319             }
320              
321 0           int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
322             uint64_t delivery_tag, amqp_boolean_t multiple,
323             amqp_boolean_t requeue) {
324             amqp_basic_nack_t req;
325 0           req.delivery_tag = delivery_tag;
326 0           req.multiple = multiple;
327 0           req.requeue = requeue;
328 0           return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
329             }
330              
331 0           struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
332 0           return state->handshake_timeout;
333             }
334              
335 0           int amqp_set_handshake_timeout(amqp_connection_state_t state,
336             const struct timeval *timeout) {
337 0 0         if (timeout) {
338 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
339 0           return AMQP_STATUS_INVALID_PARAMETER;
340             }
341 0           state->internal_handshake_timeout = *timeout;
342 0           state->handshake_timeout = &state->internal_handshake_timeout;
343             } else {
344 0           state->handshake_timeout = NULL;
345             }
346              
347 0           return AMQP_STATUS_OK;
348             }
349              
350 0           struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
351 0           return state->rpc_timeout;
352             }
353              
354 0           int amqp_set_rpc_timeout(amqp_connection_state_t state,
355             const struct timeval *timeout) {
356 0 0         if (timeout) {
357 0 0         if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
    0          
358 0           return AMQP_STATUS_INVALID_PARAMETER;
359             }
360 0           state->rpc_timeout = &state->internal_rpc_timeout;
361 0           *state->rpc_timeout = *timeout;
362             } else {
363 0           state->rpc_timeout = NULL;
364             }
365 0           return AMQP_STATUS_OK;
366             }
367              
368 0           amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state,
369             const struct timeval *timeout,
370             amqp_publisher_confirm_t *result) {
371             int res;
372             amqp_frame_t frame;
373             amqp_rpc_reply_t ret;
374              
375 0           memset(&ret, 0x0, sizeof(ret));
376 0           memset(result, 0x0, sizeof(amqp_publisher_confirm_t));
377              
378 0           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
379              
380 0 0         if (AMQP_STATUS_OK != res) {
381 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
382 0           ret.library_error = res;
383 0           return ret;
384 0 0         } else if (AMQP_FRAME_METHOD != frame.frame_type ||
385 0 0         (AMQP_BASIC_ACK_METHOD != frame.payload.method.id &&
386 0 0         AMQP_BASIC_NACK_METHOD != frame.payload.method.id &&
387 0 0         AMQP_BASIC_REJECT_METHOD != frame.payload.method.id)) {
388 0           amqp_put_back_frame(state, &frame);
389 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
390 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
391 0           return ret;
392             }
393              
394 0           switch (frame.payload.method.id) {
395 0           case AMQP_BASIC_ACK_METHOD:
396 0           memcpy(&(result->payload.ack), frame.payload.method.decoded,
397             sizeof(amqp_basic_ack_t));
398 0           break;
399              
400 0           case AMQP_BASIC_NACK_METHOD:
401 0           memcpy(&(result->payload.nack), frame.payload.method.decoded,
402             sizeof(amqp_basic_nack_t));
403 0           break;
404              
405 0           case AMQP_BASIC_REJECT_METHOD:
406 0           memcpy(&(result->payload.reject), frame.payload.method.decoded,
407             sizeof(amqp_basic_reject_t));
408 0           break;
409              
410 0           default:
411 0           amqp_put_back_frame(state, &frame);
412 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
413 0           ret.library_error = AMQP_STATUS_UNSUPPORTED;
414 0           return ret;
415             }
416 0           result->method = frame.payload.method.id;
417 0           result->channel = frame.channel;
418 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
419              
420 0           return ret;
421             }