File Coverage

rabbitmq-include/amqp_private.h
Criterion Covered Total %
statement 0 84 0.0
branch 0 34 0.0
condition n/a
subroutine n/a
pod n/a
total 0 118 0.0


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #ifndef librabbitmq_amqp_private_h
5             #define librabbitmq_amqp_private_h
6              
7             #ifdef HAVE_CONFIG_H
8             #include "config.h"
9             #endif
10              
11             #define AMQ_COPYRIGHT \
12             "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones," \
13             " and Alan Antonuk."
14              
15             #include "rabbitmq-c/amqp.h"
16             #include "rabbitmq-c/framing.h"
17             #include
18              
19             #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
20             #ifndef WINVER
21             /* WINVER 0x0502 is WinXP SP2+, Windows Server 2003 SP1+
22             * See:
23             * http://msdn.microsoft.com/en-us/library/windows/desktop/aa383745(v=vs.85).aspx#macros_for_conditional_declarations
24             */
25             #define WINVER 0x0502
26             #endif
27             #ifndef WIN32_LEAN_AND_MEAN
28             #define WIN32_LEAN_AND_MEAN
29             #endif
30             #include
31             #else
32             #include
33             #include
34             #endif
35              
36             /* GCC attributes */
37             #if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
38             #define AMQP_NORETURN __attribute__((__noreturn__))
39             #define AMQP_UNUSED __attribute__((__unused__))
40             #elif defined(_MSC_VER)
41             #define AMQP_NORETURN __declspec(noreturn)
42             #define AMQP_UNUSED __pragma(warning(suppress : 4100))
43             #else
44             #define AMQP_NORETURN
45             #define AMQP_UNUSED
46             #endif
47              
48             #if (defined(_MSC_VER) && (_MSC_VER <= 1800)) || \
49             (defined(__BORLANDC__) && (__BORLANDC__ <= 0x0564))
50             #define inline __inline
51             #endif
52              
53             char *amqp_os_error_string(int err);
54              
55             #include "amqp_socket.h"
56             #include "amqp_time.h"
57              
58             /*
59             * Connection states: XXX FIX THIS
60             *
61             * - CONNECTION_STATE_INITIAL: The initial state, when we cannot be
62             * sure if the next thing we will get is the first AMQP frame, or a
63             * protocol header from the server.
64             *
65             * - CONNECTION_STATE_IDLE: The normal state between
66             * frames. Connections may only be reconfigured, and the
67             * connection's pools recycled, when in this state. Whenever we're
68             * in this state, the inbound_buffer's bytes pointer must be NULL;
69             * any other state, and it must point to a block of memory allocated
70             * from the frame_pool.
71             *
72             * - CONNECTION_STATE_HEADER: Some bytes of an incoming frame have
73             * been seen, but not a complete frame header's worth.
74             *
75             * - CONNECTION_STATE_BODY: A complete frame header has been seen, but
76             * the frame is not yet complete. When it is completed, it will be
77             * returned, and the connection will return to IDLE state.
78             *
79             */
80             typedef enum amqp_connection_state_enum_ {
81             CONNECTION_STATE_IDLE = 0,
82             CONNECTION_STATE_INITIAL,
83             CONNECTION_STATE_HEADER,
84             CONNECTION_STATE_BODY
85             } amqp_connection_state_enum;
86              
87             typedef enum amqp_status_private_enum_ {
88             /* 0x00xx -> AMQP_STATUS_*/
89             /* 0x01xx -> AMQP_STATUS_TCP_* */
90             /* 0x02xx -> AMQP_STATUS_SSL_* */
91             AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD = -0x1301,
92             AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE = -0x1302
93             } amqp_status_private_enum;
94              
95             /* 7 bytes up front, then payload, then 1 byte footer */
96             #define HEADER_SIZE 7
97             #define FOOTER_SIZE 1
98              
99             #define AMQP_PSEUDOFRAME_PROTOCOL_HEADER 'A'
100              
101             typedef struct amqp_link_t_ {
102             struct amqp_link_t_ *next;
103             void *data;
104             } amqp_link_t;
105              
106             #define POOL_TABLE_SIZE 16
107              
108             typedef struct amqp_pool_table_entry_t_ {
109             struct amqp_pool_table_entry_t_ *next;
110             amqp_pool_t pool;
111             amqp_channel_t channel;
112             } amqp_pool_table_entry_t;
113              
114             struct amqp_connection_state_t_ {
115             amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
116              
117             amqp_connection_state_enum state;
118              
119             int channel_max;
120             int frame_max;
121              
122             /* Heartbeat interval in seconds. If this is <= 0, then heartbeats are not
123             * enabled, and next_recv_heartbeat and next_send_heartbeat are set to
124             * infinite */
125             int heartbeat;
126             amqp_time_t next_recv_heartbeat;
127             amqp_time_t next_send_heartbeat;
128              
129             /* buffer for holding frame headers. Allows us to delay allocating
130             * the raw frame buffer until the type, channel, and size are all known
131             */
132             char header_buffer[HEADER_SIZE + 1];
133             amqp_bytes_t inbound_buffer;
134              
135             size_t inbound_offset;
136             size_t target_size;
137              
138             amqp_bytes_t outbound_buffer;
139              
140             amqp_socket_t *socket;
141              
142             amqp_bytes_t sock_inbound_buffer;
143             size_t sock_inbound_offset;
144             size_t sock_inbound_limit;
145              
146             amqp_link_t *first_queued_frame;
147             amqp_link_t *last_queued_frame;
148              
149             amqp_rpc_reply_t most_recent_api_result;
150              
151             amqp_table_t server_properties;
152             amqp_table_t client_properties;
153             amqp_pool_t properties_pool;
154              
155             struct timeval *handshake_timeout;
156             struct timeval internal_handshake_timeout;
157             struct timeval *rpc_timeout;
158             struct timeval internal_rpc_timeout;
159             };
160              
161             amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection,
162             amqp_channel_t channel);
163             amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state,
164             amqp_channel_t channel);
165              
166             static inline int amqp_heartbeat_send(amqp_connection_state_t state) {
167             return state->heartbeat;
168             }
169              
170             static inline int amqp_heartbeat_recv(amqp_connection_state_t state) {
171             return 2 * state->heartbeat;
172             }
173              
174             int amqp_try_recv(amqp_connection_state_t state);
175              
176 0           static inline void *amqp_offset(void *data, size_t offset) {
177 0           return (char *)data + offset;
178             }
179              
180             /* This macro defines the encoding and decoding functions associated with a
181             simple type. */
182              
183             #define DECLARE_CODEC_BASE_TYPE(bits) \
184             \
185             static inline int amqp_encode_##bits(amqp_bytes_t encoded, size_t *offset, \
186             uint##bits##_t input) { \
187             size_t o = *offset; \
188             if ((*offset = o + bits / 8) <= encoded.len) { \
189             amqp_e##bits(input, amqp_offset(encoded.bytes, o)); \
190             return 1; \
191             } \
192             return 0; \
193             } \
194             \
195             static inline int amqp_decode_##bits(amqp_bytes_t encoded, size_t *offset, \
196             uint##bits##_t *output) { \
197             size_t o = *offset; \
198             if ((*offset = o + bits / 8) <= encoded.len) { \
199             *output = amqp_d##bits(amqp_offset(encoded.bytes, o)); \
200             return 1; \
201             } \
202             return 0; \
203             }
204              
205 0           static inline int is_bigendian(void) {
206             union {
207             uint32_t i;
208             char c[4];
209 0           } bint = {0x01020304};
210 0           return bint.c[0] == 1;
211             }
212              
213 0           static inline void amqp_e8(uint8_t val, void *data) {
214 0           memcpy(data, &val, sizeof(val));
215 0           }
216              
217 0           static inline uint8_t amqp_d8(void *data) {
218             uint8_t val;
219 0           memcpy(&val, data, sizeof(val));
220 0           return val;
221             }
222              
223 0           static inline void amqp_e16(uint16_t val, void *data) {
224 0 0         if (!is_bigendian()) {
225 0           val = ((val & 0xFF00u) >> 8u) | ((val & 0x00FFu) << 8u);
226             }
227 0           memcpy(data, &val, sizeof(val));
228 0           }
229              
230 0           static inline uint16_t amqp_d16(void *data) {
231             uint16_t val;
232 0           memcpy(&val, data, sizeof(val));
233 0 0         if (!is_bigendian()) {
234 0           val = ((val & 0xFF00u) >> 8u) | ((val & 0x00FFu) << 8u);
235             }
236 0           return val;
237             }
238              
239 0           static inline void amqp_e32(uint32_t val, void *data) {
240 0 0         if (!is_bigendian()) {
241 0           val = ((val & 0xFF000000u) >> 24u) | ((val & 0x00FF0000u) >> 8u) |
242 0           ((val & 0x0000FF00u) << 8u) | ((val & 0x000000FFu) << 24u);
243             }
244 0           memcpy(data, &val, sizeof(val));
245 0           }
246              
247 0           static inline uint32_t amqp_d32(void *data) {
248             uint32_t val;
249 0           memcpy(&val, data, sizeof(val));
250 0 0         if (!is_bigendian()) {
251 0           val = ((val & 0xFF000000u) >> 24u) | ((val & 0x00FF0000u) >> 8u) |
252 0           ((val & 0x0000FF00u) << 8u) | ((val & 0x000000FFu) << 24u);
253             }
254 0           return val;
255             }
256              
257 0           static inline void amqp_e64(uint64_t val, void *data) {
258 0 0         if (!is_bigendian()) {
259 0           val = ((val & 0xFF00000000000000u) >> 56u) |
260 0           ((val & 0x00FF000000000000u) >> 40u) |
261 0           ((val & 0x0000FF0000000000u) >> 24u) |
262 0           ((val & 0x000000FF00000000u) >> 8u) |
263 0           ((val & 0x00000000FF000000u) << 8u) |
264 0           ((val & 0x0000000000FF0000u) << 24u) |
265 0           ((val & 0x000000000000FF00u) << 40u) |
266 0           ((val & 0x00000000000000FFu) << 56u);
267             }
268 0           memcpy(data, &val, sizeof(val));
269 0           }
270              
271 0           static inline uint64_t amqp_d64(void *data) {
272             uint64_t val;
273 0           memcpy(&val, data, sizeof(val));
274 0 0         if (!is_bigendian()) {
275 0           val = ((val & 0xFF00000000000000u) >> 56u) |
276 0           ((val & 0x00FF000000000000u) >> 40u) |
277 0           ((val & 0x0000FF0000000000u) >> 24u) |
278 0           ((val & 0x000000FF00000000u) >> 8u) |
279 0           ((val & 0x00000000FF000000u) << 8u) |
280 0           ((val & 0x0000000000FF0000u) << 24u) |
281 0           ((val & 0x000000000000FF00u) << 40u) |
282 0           ((val & 0x00000000000000FFu) << 56u);
283             }
284 0           return val;
285             }
286              
287 0 0         DECLARE_CODEC_BASE_TYPE(8)
  0 0          
  0            
288 0 0         DECLARE_CODEC_BASE_TYPE(16)
  0 0          
  0            
289 0 0         DECLARE_CODEC_BASE_TYPE(32)
  0 0          
  0            
290 0 0         DECLARE_CODEC_BASE_TYPE(64)
  0 0          
  0            
291              
292 0           static inline int amqp_encode_bytes(amqp_bytes_t encoded, size_t *offset,
293             amqp_bytes_t input) {
294 0           size_t o = *offset;
295             /* The memcpy below has undefined behavior if the input is NULL. It is valid
296             * for a 0-length amqp_bytes_t to have .bytes == NULL. Thus we should check
297             * before encoding.
298             */
299 0 0         if (input.len == 0) {
300 0           return 1;
301             }
302 0 0         if ((*offset = o + input.len) <= encoded.len) {
303 0           memcpy(amqp_offset(encoded.bytes, o), input.bytes, input.len);
304 0           return 1;
305             } else {
306 0           return 0;
307             }
308             }
309              
310 0           static inline int amqp_decode_bytes(amqp_bytes_t encoded, size_t *offset,
311             amqp_bytes_t *output, size_t len) {
312 0           size_t o = *offset;
313 0 0         if ((*offset = o + len) <= encoded.len) {
314 0           output->bytes = amqp_offset(encoded.bytes, o);
315 0           output->len = len;
316 0           return 1;
317             } else {
318 0           return 0;
319             }
320             }
321              
322             AMQP_NORETURN
323             void amqp_abort(const char *fmt, ...);
324              
325             int amqp_bytes_equal(amqp_bytes_t r, amqp_bytes_t l);
326              
327             static inline amqp_rpc_reply_t amqp_rpc_reply_error(amqp_status_enum status) {
328             amqp_rpc_reply_t reply;
329             reply.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
330             reply.library_error = status;
331             return reply;
332             }
333              
334             int amqp_send_frame_inner(amqp_connection_state_t state,
335             const amqp_frame_t *frame, int flags,
336             amqp_time_t deadline);
337             #endif