File Coverage

amqp_consumer.c
Criterion Covered Total %
statement 0 149 0.0
branch 0 114 0.0
condition n/a
subroutine n/a
pod n/a
total 0 263 0.0


line stmt bran cond sub pod time code
1             // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2             // SPDX-License-Identifier: mit
3              
4             #include "amqp_private.h"
5             #include "amqp_socket.h"
6             #include "rabbitmq-c/amqp.h"
7              
8             #include
9             #include
10              
11 0           static int amqp_basic_properties_clone(amqp_basic_properties_t *original,
12             amqp_basic_properties_t *clone,
13             amqp_pool_t *pool) {
14 0           memset(clone, 0, sizeof(*clone));
15 0           clone->_flags = original->_flags;
16              
17             #define CLONE_BYTES_POOL(original, clone, pool) \
18             if (0 == original.len) { \
19             clone = amqp_empty_bytes; \
20             } else { \
21             amqp_pool_alloc_bytes(pool, original.len, &clone); \
22             if (NULL == clone.bytes) { \
23             return AMQP_STATUS_NO_MEMORY; \
24             } \
25             memcpy(clone.bytes, original.bytes, clone.len); \
26             }
27              
28 0 0         if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
29 0 0         CLONE_BYTES_POOL(original->content_type, clone->content_type, pool)
    0          
30             }
31              
32 0 0         if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
33 0 0         CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool)
    0          
34             }
35              
36 0 0         if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) {
37 0           int res = amqp_table_clone(&original->headers, &clone->headers, pool);
38 0 0         if (AMQP_STATUS_OK != res) {
39 0           return res;
40             }
41             }
42              
43 0 0         if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
44 0           clone->delivery_mode = original->delivery_mode;
45             }
46              
47 0 0         if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) {
48 0           clone->priority = original->priority;
49             }
50              
51 0 0         if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
52 0 0         CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool)
    0          
53             }
54              
55 0 0         if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
56 0 0         CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool)
    0          
57             }
58              
59 0 0         if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
60 0 0         CLONE_BYTES_POOL(original->expiration, clone->expiration, pool)
    0          
61             }
62              
63 0 0         if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
64 0 0         CLONE_BYTES_POOL(original->message_id, clone->message_id, pool)
    0          
65             }
66              
67 0 0         if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
68 0           clone->timestamp = original->timestamp;
69             }
70              
71 0 0         if (clone->_flags & AMQP_BASIC_TYPE_FLAG) {
72 0 0         CLONE_BYTES_POOL(original->type, clone->type, pool)
    0          
73             }
74              
75 0 0         if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) {
76 0 0         CLONE_BYTES_POOL(original->user_id, clone->user_id, pool)
    0          
77             }
78              
79 0 0         if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) {
80 0 0         CLONE_BYTES_POOL(original->app_id, clone->app_id, pool)
    0          
81             }
82              
83 0 0         if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) {
84 0 0         CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool)
    0          
85             }
86              
87 0           return AMQP_STATUS_OK;
88             #undef CLONE_BYTES_POOL
89             }
90              
91 0           void amqp_destroy_message(amqp_message_t *message) {
92 0           empty_amqp_pool(&message->pool);
93 0           amqp_bytes_free(message->body);
94 0           }
95              
96 0           void amqp_destroy_envelope(amqp_envelope_t *envelope) {
97 0           amqp_destroy_message(&envelope->message);
98 0           amqp_bytes_free(envelope->routing_key);
99 0           amqp_bytes_free(envelope->exchange);
100 0           amqp_bytes_free(envelope->consumer_tag);
101 0           }
102              
103 0           static int amqp_bytes_malloc_dup_failed(amqp_bytes_t bytes) {
104 0 0         if (bytes.len != 0 && bytes.bytes == NULL) {
    0          
105 0           return 1;
106             }
107 0           return 0;
108             }
109              
110 0           amqp_rpc_reply_t amqp_consume_message(amqp_connection_state_t state,
111             amqp_envelope_t *envelope,
112             const struct timeval *timeout,
113             AMQP_UNUSED int flags) {
114             int res;
115             amqp_frame_t frame;
116             amqp_basic_deliver_t *delivery_method;
117             amqp_rpc_reply_t ret;
118              
119 0           memset(&ret, 0, sizeof(ret));
120 0           memset(envelope, 0, sizeof(*envelope));
121              
122 0           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
123 0 0         if (AMQP_STATUS_OK != res) {
124 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
125 0           ret.library_error = res;
126 0           goto error_out1;
127             }
128              
129 0 0         if (AMQP_FRAME_METHOD != frame.frame_type ||
130 0 0         AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
131 0           amqp_put_back_frame(state, &frame);
132 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
133 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
134 0           goto error_out1;
135             }
136              
137 0           delivery_method = frame.payload.method.decoded;
138              
139 0           envelope->channel = frame.channel;
140 0           envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
141 0           envelope->delivery_tag = delivery_method->delivery_tag;
142 0           envelope->redelivered = delivery_method->redelivered;
143 0           envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
144 0           envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);
145              
146 0           if (amqp_bytes_malloc_dup_failed(envelope->consumer_tag) ||
147 0 0         amqp_bytes_malloc_dup_failed(envelope->exchange) ||
148 0           amqp_bytes_malloc_dup_failed(envelope->routing_key)) {
149 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
150 0           ret.library_error = AMQP_STATUS_NO_MEMORY;
151 0           goto error_out2;
152             }
153              
154 0           ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
155 0 0         if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
156 0           goto error_out2;
157             }
158              
159 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
160 0           return ret;
161              
162 0           error_out2:
163 0           amqp_bytes_free(envelope->routing_key);
164 0           amqp_bytes_free(envelope->exchange);
165 0           amqp_bytes_free(envelope->consumer_tag);
166 0           error_out1:
167 0           return ret;
168             }
169              
170 0           amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state,
171             amqp_channel_t channel,
172             amqp_message_t *message,
173             AMQP_UNUSED int flags) {
174             amqp_frame_t frame;
175             amqp_rpc_reply_t ret;
176              
177             size_t body_read;
178             char *body_read_ptr;
179             int res;
180              
181 0           memset(&ret, 0, sizeof(ret));
182 0           memset(message, 0, sizeof(*message));
183              
184 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
185 0 0         if (AMQP_STATUS_OK != res) {
186 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
187 0           ret.library_error = res;
188              
189 0           goto error_out1;
190             }
191              
192 0 0         if (AMQP_FRAME_HEADER != frame.frame_type) {
193 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
194 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
195 0 0         AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
196              
197 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
198 0           ret.reply = frame.payload.method;
199              
200             } else {
201 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
202 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
203              
204 0           amqp_put_back_frame(state, &frame);
205             }
206 0           goto error_out1;
207             }
208              
209 0           init_amqp_pool(&message->pool, 4096);
210 0           res = amqp_basic_properties_clone(frame.payload.properties.decoded,
211             &message->properties, &message->pool);
212              
213 0 0         if (AMQP_STATUS_OK != res) {
214 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
215 0           ret.library_error = res;
216 0           goto error_out3;
217             }
218              
219 0 0         if (0 == frame.payload.properties.body_size) {
220 0           message->body = amqp_empty_bytes;
221             } else {
222             if (SIZE_MAX < frame.payload.properties.body_size) {
223             ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
224             ret.library_error = AMQP_STATUS_NO_MEMORY;
225             goto error_out1;
226             }
227             message->body =
228 0           amqp_bytes_malloc((size_t)frame.payload.properties.body_size);
229 0 0         if (NULL == message->body.bytes) {
230 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
231 0           ret.library_error = AMQP_STATUS_NO_MEMORY;
232 0           goto error_out1;
233             }
234             }
235              
236 0           body_read = 0;
237 0           body_read_ptr = message->body.bytes;
238              
239 0 0         while (body_read < message->body.len) {
240 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
241 0 0         if (AMQP_STATUS_OK != res) {
242 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
243 0           ret.library_error = res;
244 0           goto error_out2;
245             }
246 0 0         if (AMQP_FRAME_BODY != frame.frame_type) {
247 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
248 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
249 0 0         AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
250              
251 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
252 0           ret.reply = frame.payload.method;
253             } else {
254 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
255 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
256             }
257 0           goto error_out2;
258             }
259              
260 0 0         if (body_read + frame.payload.body_fragment.len > message->body.len) {
261 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
262 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
263 0           goto error_out2;
264             }
265              
266 0           memcpy(body_read_ptr, frame.payload.body_fragment.bytes,
267             frame.payload.body_fragment.len);
268              
269 0           body_read += frame.payload.body_fragment.len;
270 0           body_read_ptr += frame.payload.body_fragment.len;
271             }
272              
273 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
274 0           return ret;
275              
276 0           error_out2:
277 0           amqp_bytes_free(message->body);
278 0           error_out3:
279 0           empty_amqp_pool(&message->pool);
280 0           error_out1:
281 0           return ret;
282             }