File Coverage

RabbitMQ.xs
Criterion Covered Total %
statement 53 888 5.9
branch 29 558 5.2
condition n/a
subroutine n/a
pod n/a
total 82 1446 5.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include
5             #include
6              
7             /* perl -MDevel::PPPort -e'Devel::PPPort::WriteFile();' */
8             /* perl ppport.h --compat-version=5.8.0 --cplusplus RabbitMQ.xs */
9             #define NEED_newSVpvn_flags
10             #include "ppport.h"
11              
12             /* ppport.h knows about MUTABLE_PTR and MUTABLE_SV, but not these?! */
13             #ifndef MUTABLE_AV
14             # define MUTABLE_AV(p) ((AV*)MUTABLE_PTR(p))
15             #endif
16             #ifndef MUTABLE_HV
17             # define MUTABLE_HV(p) ((HV*)MUTABLE_PTR(p))
18             #endif
19              
20             #include "rabbitmq-c/amqp.h"
21             #include "amqp_socket.h"
22             #include "rabbitmq-c/tcp_socket.h"
23             #include "rabbitmq-c/ssl_socket.h"
24             /* For struct timeval */
25             #include "amqp_time.h"
26              
27             /* This is for the Math::UInt64 integration */
28             #include "perl_math_int64.h"
29              
30             extern void dump_table(amqp_table_t table);
31              
32             /* perl Makefile.PL; make CCFLAGS=-DDEBUG */
33             #if DEBUG
34             #define __DEBUG__(X) X
35             extern void dump_table(amqp_table_t table);
36             #else
37             #define __DEBUG__(X) /* NOOP */
38             #endif
39              
40             typedef amqp_connection_state_t Net__AMQP__RabbitMQ;
41              
42             #define AMQP_STATUS_UNKNOWN_TYPE 0x500
43              
44             #ifdef USE_LONG_DOUBLE
45             /* stolen from Cpanel::JSON::XS
46             * so we don't mess up double => long double for perls with -Duselongdouble */
47             #if defined(_AIX) && (!defined(HAS_LONG_DOUBLE) || AIX_WORKAROUND)
48             #define HAVE_NO_POWL
49             #endif
50              
51             #ifdef HAVE_NO_POWL
52             /* Ulisse Monari: this is a patch for AIX 5.3, perl 5.8.8 without HAS_LONG_DOUBLE
53             There Perl_pow maps to pow(...) - NOT TO powl(...), core dumps at Perl_pow(...)
54             Base code is from http://bytes.com/topic/c/answers/748317-replacement-pow-function
55             This is my change to fs_pow that goes into libc/libm for calling fmod/exp/log.
56             NEED TO MODIFY Makefile, after perl Makefile.PL by adding "-lm" onto the LDDLFLAGS line */
57             static double fs_powEx(double x, double y)
58             {
59             double p = 0;
60              
61             if (0 > x && fmod(y, 1) == 0) {
62             if (fmod(y, 2) == 0) {
63             p = exp(log(-x) * y);
64             } else {
65             p = -exp(log(-x) * y);
66             }
67             } else {
68             if (x != 0 || 0 >= y) {
69             p = exp(log( x) * y);
70             }
71             }
72             return p;
73             }
74              
75             /* powf() unfortunately is not accurate enough */
76             const NV DOUBLE_POW = fs_powEx(10., DBL_DIG );
77             #else
78             const NV DOUBLE_POW = Perl_pow(10., DBL_DIG );
79             #endif
80             #endif
81              
82              
83             /* This is a place to put some stuff that we convert from perl,
84             it's transient and we recycle it as soon as it's finished being used
85             which means we keep memory we've used with the aim of reusing it */
86             /* temp_memory_pool is ugly and suffers from code smell */
87             static amqp_pool_t temp_memory_pool;
88              
89             /* Parallels amqp_maybe_release_buffers */
90 0           static void maybe_release_buffers(amqp_connection_state_t state) {
91 0 0         if (amqp_release_buffers_ok(state)) {
92 0           amqp_release_buffers(state);
93 0           recycle_amqp_pool(&temp_memory_pool);
94             }
95 0           }
96              
97             #define int_from_hv(hv,name) \
98             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvIV(*v); } while(0)
99             #define double_from_hv(hv,name) \
100             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvNV(*v); } while(0)
101             #define str_from_hv(hv,name) \
102             do { SV **v; if(NULL != (v = hv_fetchs(hv, #name, 0))) name = SvPV_nolen(*v); } while(0)
103             #define has_valid_connection(conn) \
104             ( amqp_get_socket( conn ) != NULL && amqp_get_sockfd( conn ) > -1 )
105             #define assert_amqp_connected(conn) \
106             do { \
107             if ( ! has_valid_connection(conn) ) { \
108             Perl_croak(aTHX_ "AMQP socket not connected"); \
109             } \
110             } while(0)
111              
112             void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8);
113             void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8);
114             SV* mq_array_to_arrayref(amqp_array_t *array);
115             SV* mq_table_to_hashref(amqp_table_t *table);
116              
117 1           void die_on_error(pTHX_ int x, amqp_connection_state_t conn, char const *context) {
118             /* Handle socket errors */
119 1 50         if ( x == AMQP_STATUS_CONNECTION_CLOSED || x == AMQP_STATUS_SOCKET_ERROR ) {
    50          
120 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
121 0           Perl_croak(aTHX_ "%s failed because AMQP socket connection was closed.", context);
122             }
123             /* Handle everything else */
124 1 50         else if (x < 0) {
125 1           Perl_croak(aTHX_ "%s: %s\n", context, amqp_error_string2(x));
126             }
127 0           }
128              
129 0           void die_on_amqp_error(pTHX_ amqp_rpc_reply_t x, amqp_connection_state_t conn, char const *context) {
130 0           switch (x.reply_type) {
131 0           case AMQP_RESPONSE_NORMAL:
132 0           return;
133              
134 0           case AMQP_RESPONSE_NONE:
135 0           Perl_croak(aTHX_ "%s: missing RPC reply type!", context);
136             break;
137              
138 0           case AMQP_RESPONSE_LIBRARY_EXCEPTION:
139             /* If we got a library error saying that there's a socket problem,
140             kill the connection and croak. */
141 0           if (
142 0 0         x.library_error == AMQP_STATUS_CONNECTION_CLOSED
143 0           ||
144 0 0         x.library_error == AMQP_STATUS_SOCKET_ERROR
145             ) {
146 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
147 0           Perl_croak(aTHX_ "%s: failed since AMQP socket connection closed.\n", context);
148             }
149             /* Otherwise, give a more generic croak. */
150             else {
151 0           Perl_croak(aTHX_ "%s: %s\n", context,
152 0 0         (!x.library_error) ? "(end-of-stream)" :
153 0 0         (x.library_error == AMQP_STATUS_UNKNOWN_TYPE) ? "unknown AMQP type id" :
154 0           amqp_error_string2(x.library_error));
155             }
156             break;
157              
158 0           case AMQP_RESPONSE_SERVER_EXCEPTION:
159 0           switch (x.reply.id) {
160 0           case AMQP_CONNECTION_CLOSE_METHOD:
161             {
162             amqp_connection_close_ok_t req;
163 0           req.dummy = '\0';
164 0           /* res = */ amqp_send_method(conn, 0, AMQP_CONNECTION_CLOSE_OK_METHOD, &req);
165             }
166 0           amqp_set_socket(conn, NULL);
167             {
168 0           amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
169 0           Perl_croak(aTHX_ "%s: server connection error %d, message: %.*s",
170             context,
171 0           m->reply_code,
172 0           (int) m->reply_text.len, (char *) m->reply_text.bytes);
173             }
174             break;
175              
176 0           case AMQP_CHANNEL_CLOSE_METHOD:
177             /* We don't know what channel provoked this error!
178             This information should be in amqp_rpc_reply_t, but it isn't.
179             {
180             amqp_channel_close_ok_t req;
181             req.dummy = '\0';
182             / * res = * / amqp_send_method(conn, channel, AMQP_CHANNEL_CLOSE_OK_METHOD, &req);
183             }
184             */
185             /* Only the channel should be invalidated, but we have no means of doing so! */
186             /* Even if we knew which channel we needed to invalidate! */
187 0           amqp_set_socket(conn, NULL);
188             {
189 0           amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
190 0           Perl_croak(aTHX_ "%s: server channel error %d, message: %.*s",
191             context,
192 0           m->reply_code,
193 0           (int) m->reply_text.len, (char *) m->reply_text.bytes);
194             }
195             break;
196              
197 0           default:
198 0           Perl_croak(aTHX_ "%s: unknown server error, method id 0x%08X", context, x.reply.id);
199             break;
200             }
201             break;
202             }
203             }
204              
205             /*
206             * amqp_kind_for_sv(SV**)
207             * Note: We could handle more types here... but we're trying to take Perl and go to
208             * C. We don't really need to handle much more than this from what I can tell.
209             */
210 0           amqp_field_value_kind_t amqp_kind_for_sv(SV** perl_value, short force_utf8) {
211              
212 0           switch (SvTYPE( *perl_value ))
213             {
214             /* Integer types (and references beyond 5.10) */
215 0           case SVt_IV:
216             /* References */
217 0 0         if ( SvROK( *perl_value ) ) {
218             /* Array Reference */
219 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
220 0           return AMQP_FIELD_KIND_ARRAY;
221             }
222              
223             /* Hash Reference */
224 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
225 0           return AMQP_FIELD_KIND_TABLE;
226             }
227 0           Perl_croak(
228             aTHX_ "Unsupported Perl Reference Type: %d",
229 0           SvTYPE( SvRV( *perl_value ) )
230             );
231             }
232              
233             /* Regular integers */
234             /* In the event that it could be unsigned */
235 0 0         if ( SvUOK( *perl_value ) ) {
236 0           return AMQP_FIELD_KIND_U64;
237             }
238 0           return AMQP_FIELD_KIND_I64;
239              
240             /* Numeric type */
241 0           case SVt_NV:
242 0           return AMQP_FIELD_KIND_F64;
243              
244             /* String (handle types which are upgraded to handle IV/UV/NV as well as PV) */
245 0           case SVt_PVIV:
246 0 0         if ( SvI64OK( *perl_value ) ) {
247 0           return AMQP_FIELD_KIND_I64;
248             }
249 0 0         if ( SvU64OK( *perl_value ) ) {
250 0           return AMQP_FIELD_KIND_U64;
251             }
252             /* It could be a PV or an IV/UV! */
253 0 0         if ( SvIOK( *perl_value ) ) {
254 0 0         if ( SvUOK( *perl_value ) ) {
255 0           return AMQP_FIELD_KIND_U64;
256             }
257 0           return AMQP_FIELD_KIND_I64;
258             }
259              
260             case SVt_PVNV:
261             /* It could be a PV or an NV */
262 0 0         if ( SvNOK( *perl_value ) ) {
263 0           return AMQP_FIELD_KIND_F64;
264             }
265              
266             case SVt_PV:
267             /* UTF-8? */
268 0 0         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    0          
269 0           return AMQP_FIELD_KIND_UTF8;
270             }
271 0           return AMQP_FIELD_KIND_BYTES;
272              
273 0           case SVt_PVMG:
274 0 0         if ( SvPOK( *perl_value ) || SvPOKp( *perl_value ) ) {
    0          
275 0 0         if ( force_utf8 || SvUTF8( *perl_value ) ) {
    0          
276 0           return AMQP_FIELD_KIND_UTF8;
277             }
278 0           return AMQP_FIELD_KIND_BYTES;
279             }
280 0 0         if ( SvIOK( *perl_value ) || SvIOKp( *perl_value ) ) {
    0          
281 0 0         if ( SvUOK( *perl_value ) ) {
282 0           return AMQP_FIELD_KIND_U64;
283             }
284 0           return AMQP_FIELD_KIND_I64;
285             }
286 0 0         if ( SvNOK( *perl_value ) || SvNOKp( *perl_value ) ) {
    0          
287 0           return AMQP_FIELD_KIND_F64;
288             }
289              
290             default:
291 0 0         if ( SvROK( *perl_value ) ) {
292             /* Array Reference */
293 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVAV ) {
294 0           return AMQP_FIELD_KIND_ARRAY;
295             }
296              
297             /* Hash Reference */
298 0 0         if ( SvTYPE( SvRV( *perl_value ) ) == SVt_PVHV ) {
299 0           return AMQP_FIELD_KIND_TABLE;
300             }
301 0           Perl_croak(
302             aTHX_ "Unsupported Perl Reference Type: %d",
303 0           SvTYPE( SvRV( *perl_value ) )
304             );
305             }
306              
307 0           Perl_croak(
308             aTHX_ "Unsupported scalar type detected >%s<(%d)",
309             SvPV_nolen(*perl_value),
310 0           SvTYPE( *perl_value )
311             );
312             }
313              
314             /* If we're still here... wtf */
315             Perl_croak( aTHX_ "The wheels have fallen off. Please call for help." );
316             }
317              
318             /* Parallels amqp_read_message */
319 0           static amqp_rpc_reply_t read_message(amqp_connection_state_t state, amqp_channel_t channel, SV **props_sv_ptr, SV **body_sv_ptr) {
320             HV *props_hv;
321             SV *body_sv;
322             amqp_rpc_reply_t ret;
323             int res;
324             amqp_frame_t frame;
325 0           int is_utf8_body = 1; /* The body is UTF-8 by default */
326              
327 0           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
328              
329 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
330 0 0         if (AMQP_STATUS_OK != res) {
331 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
332 0           ret.library_error = res;
333 0           goto error_out1;
334             }
335              
336 0 0         if (AMQP_FRAME_HEADER != frame.frame_type) {
337 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
338 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
339 0 0         AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
340              
341 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
342 0           ret.reply = frame.payload.method;
343              
344             } else {
345 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
346 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
347              
348 0           amqp_put_back_frame(state, &frame);
349             }
350              
351 0           goto error_out1;
352             }
353              
354             {
355             amqp_basic_properties_t *p;
356              
357 0           props_hv = newHV();
358              
359 0           p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
360 0 0         if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
361 0           hv_stores(props_hv, "content_type", newSVpvn(p->content_type.bytes, p->content_type.len));
362             }
363 0 0         if (p->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
364 0           hv_stores(props_hv, "content_encoding", newSVpvn(p->content_encoding.bytes, p->content_encoding.len));
365              
366             /*
367             * Since we could have UTF-8 in our content-encoding, and most people seem like they
368             * treat this like the default, we're looking for the presence of content-encoding but
369             * the absence of a case-insensitive "UTF-8".
370             */
371 0           if (
372 0 0         strnlen(p->content_encoding.bytes, p->content_encoding.len) > 0
373 0           &&
374 0 0         (strncasecmp(p->content_encoding.bytes, "UTF-8", p->content_encoding.len) != 0)
375             ) {
376 0           is_utf8_body = 0;
377             }
378             }
379 0 0         if (p->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
380 0           hv_stores(props_hv, "correlation_id", newSVpvn(p->correlation_id.bytes, p->correlation_id.len));
381             }
382 0 0         if (p->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
383 0           hv_stores(props_hv, "reply_to", newSVpvn(p->reply_to.bytes, p->reply_to.len));
384             }
385 0 0         if (p->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
386 0           hv_stores(props_hv, "expiration", newSVpvn(p->expiration.bytes, p->expiration.len));
387             }
388 0 0         if (p->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
389 0           hv_stores(props_hv, "message_id", newSVpvn(p->message_id.bytes, p->message_id.len));
390             }
391 0 0         if (p->_flags & AMQP_BASIC_TYPE_FLAG) {
392 0           hv_stores(props_hv, "type", newSVpvn(p->type.bytes, p->type.len));
393             }
394 0 0         if (p->_flags & AMQP_BASIC_USER_ID_FLAG) {
395 0           hv_stores(props_hv, "user_id", newSVpvn(p->user_id.bytes, p->user_id.len));
396             }
397 0 0         if (p->_flags & AMQP_BASIC_APP_ID_FLAG) {
398 0           hv_stores(props_hv, "app_id", newSVpvn(p->app_id.bytes, p->app_id.len));
399             }
400 0 0         if (p->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
401 0           hv_stores(props_hv, "delivery_mode", newSViv(p->delivery_mode));
402             }
403 0 0         if (p->_flags & AMQP_BASIC_PRIORITY_FLAG) {
404 0           hv_stores(props_hv, "priority", newSViv(p->priority));
405             }
406 0 0         if (p->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
407 0           hv_stores(props_hv, "timestamp", newSViv(p->timestamp));
408             }
409 0 0         if (p->_flags & AMQP_BASIC_HEADERS_FLAG) {
410             int i;
411 0           HV *headers = newHV();
412 0           hv_stores(props_hv, "headers", newRV_noinc(MUTABLE_SV(headers)));
413              
414             __DEBUG__( dump_table( p->headers ) );
415              
416 0 0         for( i=0; i < p->headers.num_entries; ++i ) {
417 0           amqp_table_entry_t *header_entry = &(p->headers.entries[i]);
418              
419             __DEBUG__(
420             fprintf(stderr,
421             "~~~ Length: %ld/%d, Key: %.*s, Kind: %c\n",
422             header_entry->key.len,
423             (int)header_entry->key.len,
424             (int)header_entry->key.len,
425             (char*)header_entry->key.bytes,
426             header_entry->value.kind
427             )
428             );
429              
430 0           switch (header_entry->value.kind) {
431 0           case AMQP_FIELD_KIND_BOOLEAN:
432 0           hv_store( headers,
433             header_entry->key.bytes, header_entry->key.len,
434             newSViv(header_entry->value.value.boolean),
435             0
436             );
437 0           break;
438              
439             /* Integer types */
440 0           case AMQP_FIELD_KIND_I8:
441 0           hv_store( headers,
442             header_entry->key.bytes, header_entry->key.len,
443             newSViv(header_entry->value.value.i8),
444             0
445             );
446 0           break;
447              
448 0           case AMQP_FIELD_KIND_I16:
449 0           hv_store( headers,
450             header_entry->key.bytes, header_entry->key.len,
451             newSViv(header_entry->value.value.i16),
452             0
453             );
454 0           break;
455              
456 0           case AMQP_FIELD_KIND_I32:
457 0           hv_store( headers,
458             header_entry->key.bytes, header_entry->key.len,
459             newSViv(header_entry->value.value.i32),
460             0
461             );
462 0           break;
463              
464 0           case AMQP_FIELD_KIND_I64:
465 0           hv_store( headers,
466             header_entry->key.bytes, header_entry->key.len,
467             newSVi64(header_entry->value.value.i64),
468             0
469             );
470 0           break;
471              
472 0           case AMQP_FIELD_KIND_U8:
473 0           hv_store( headers,
474             header_entry->key.bytes, header_entry->key.len,
475             newSVuv(header_entry->value.value.u8),
476             0
477             );
478 0           break;
479              
480 0           case AMQP_FIELD_KIND_U16:
481 0           hv_store( headers,
482             header_entry->key.bytes, header_entry->key.len,
483             newSVuv(header_entry->value.value.u16),
484             0
485             );
486 0           break;
487              
488 0           case AMQP_FIELD_KIND_U32:
489 0           hv_store( headers,
490             header_entry->key.bytes, header_entry->key.len,
491             newSVuv(header_entry->value.value.u32),
492             0
493             );
494 0           break;
495              
496 0           case AMQP_FIELD_KIND_U64:
497 0           hv_store( headers,
498             header_entry->key.bytes, header_entry->key.len,
499             newSVu64(header_entry->value.value.u64),
500             0
501             );
502 0           break;
503              
504             /* Floating point precision */
505 0           case AMQP_FIELD_KIND_F32:
506 0           hv_store( headers,
507             header_entry->key.bytes, header_entry->key.len,
508             newSVnv(header_entry->value.value.f32),
509             0
510             );
511 0           break;
512              
513 0           case AMQP_FIELD_KIND_F64:
514             /* TODO: I don't think this is a natively supported type on all Perls. */
515              
516 0           hv_store( headers,
517             header_entry->key.bytes, header_entry->key.len,
518             #ifdef USE_LONG_DOUBLE
519             /* amqp uses doubles, if perl is -Duselongdouble it messes up the precision
520             * so we always want take the max precision from a double and discard the rest
521             * because it can't be any more precise than a double */
522             newSVnv( ( rint( header_entry->value.value.f64 * DOUBLE_POW ) / DOUBLE_POW ) ),
523             #else
524             /* both of these are doubles so it's ok */
525             newSVnv( header_entry->value.value.f64 ),
526             #endif
527             0
528             );
529 0           break;
530              
531             /* Handle kind UTF8 and kind BYTES */
532 0           case AMQP_FIELD_KIND_UTF8:
533             case AMQP_FIELD_KIND_BYTES:
534 0 0         hv_store( headers,
535             header_entry->key.bytes, header_entry->key.len,
536             newSVpvn_utf8(
537             header_entry->value.value.bytes.bytes,
538             header_entry->value.value.bytes.len,
539             AMQP_FIELD_KIND_UTF8 == header_entry->value.kind
540             ),
541             0
542             );
543 0           break;
544              
545             /* Handle arrays */
546 0           case AMQP_FIELD_KIND_ARRAY:
547             __DEBUG__(
548             fprintf(stderr, "ARRAY KIND FOR KEY:>%.*s< KIND:>%c< AMQP_FIELD_KIND_ARRAY:[%c].\n",
549             (int)header_entry->key.len,
550             (char*)header_entry->key.bytes,
551             header_entry->value.kind,
552             AMQP_FIELD_KIND_ARRAY
553             )
554             );
555 0           hv_store( headers,
556             header_entry->key.bytes, header_entry->key.len,
557             mq_array_to_arrayref( &header_entry->value.value.array ),
558             0
559             );
560 0           break;
561              
562             /* Handle tables (hashes when translated to Perl) */
563 0           case AMQP_FIELD_KIND_TABLE:
564 0           hv_store( headers,
565             header_entry->key.bytes, header_entry->key.len,
566             mq_table_to_hashref( &header_entry->value.value.table ),
567             0
568             );
569 0           break;
570              
571 0           default:
572 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
573 0           ret.library_error = AMQP_STATUS_UNKNOWN_TYPE;
574 0           goto error_out2;
575             }
576             }
577             }
578             }
579              
580             {
581             char *body;
582 0           size_t body_target = frame.payload.properties.body_size;
583 0           size_t body_remaining = body_target;
584              
585 0           body_sv = newSV(0);
586 0           sv_grow(body_sv, body_target + 1);
587 0           SvCUR_set(body_sv, body_target);
588 0           SvPOK_on(body_sv);
589 0 0         if (is_utf8_body)
590 0           SvUTF8_on(body_sv);
591              
592 0           body = SvPVX(body_sv);
593              
594 0 0         while (body_remaining > 0) {
595             size_t fragment_len;
596              
597 0           res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
598 0 0         if (AMQP_STATUS_OK != res) {
599 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
600 0           ret.library_error = res;
601 0           goto error_out3;
602             }
603              
604 0 0         if (AMQP_FRAME_BODY != frame.frame_type) {
605 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
606 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
607 0 0         AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
608              
609 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
610 0           ret.reply = frame.payload.method;
611             } else {
612 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
613 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
614             }
615 0           goto error_out3;
616             }
617              
618 0           fragment_len = frame.payload.body_fragment.len;
619 0 0         if (fragment_len > body_remaining) {
620 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
621 0           ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
622 0           goto error_out3;
623             }
624              
625 0           memcpy(body, frame.payload.body_fragment.bytes, fragment_len);
626 0           body += fragment_len;
627 0           body_remaining -= fragment_len;
628             }
629              
630 0           *body = '\0';
631             }
632              
633 0           *props_sv_ptr = newRV_noinc(MUTABLE_SV(props_hv));
634 0           *body_sv_ptr = body_sv;
635 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
636 0           return ret;
637              
638 0           error_out3:
639 0           SvREFCNT_dec(props_hv);
640 0           error_out2:
641 0           SvREFCNT_dec(body_sv);
642 0           error_out1:
643 0           *props_sv_ptr = &PL_sv_undef;
644 0           *body_sv_ptr = &PL_sv_undef;
645 0           return ret;
646             }
647              
648             /* Parallels amqp_consume_message */
649 0           static amqp_rpc_reply_t consume_message(amqp_connection_state_t state, SV **envelope_sv_ptr, struct timeval *timeout) {
650             amqp_rpc_reply_t ret;
651             HV *envelope_hv;
652             int res;
653             amqp_frame_t frame;
654             amqp_channel_t channel;
655             SV *props;
656             SV *body;
657              
658 0           memset(&ret, 0, sizeof(amqp_rpc_reply_t));
659 0           *envelope_sv_ptr = &PL_sv_undef;
660              
661 0           res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
662 0 0         if (AMQP_STATUS_OK != res) {
663 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
664 0           ret.library_error = res;
665 0           goto error_out1;
666             }
667              
668 0 0         if (AMQP_FRAME_METHOD != frame.frame_type ||
669 0 0         AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
670              
671 0 0         if (AMQP_FRAME_METHOD == frame.frame_type &&
672 0 0         (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
673 0 0         AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
674              
675 0           ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
676 0           ret.reply = frame.payload.method;
677             } else {
678 0           amqp_put_back_frame(state, &frame);
679 0           ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
680 0           ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
681             }
682              
683 0           goto error_out1;
684             }
685              
686 0           channel = frame.channel;
687              
688 0           envelope_hv = newHV();
689              
690             {
691 0           amqp_basic_deliver_t *d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
692 0           hv_stores(envelope_hv, "channel", newSViv(channel));
693 0           hv_stores(envelope_hv, "delivery_tag", newSVu64(d->delivery_tag));
694 0           hv_stores(envelope_hv, "redelivered", newSViv(d->redelivered));
695 0           hv_stores(envelope_hv, "exchange", newSVpvn(d->exchange.bytes, d->exchange.len));
696 0           hv_stores(envelope_hv, "consumer_tag", newSVpvn(d->consumer_tag.bytes, d->consumer_tag.len));
697 0           hv_stores(envelope_hv, "routing_key", newSVpvn(d->routing_key.bytes, d->routing_key.len));
698             }
699              
700 0           ret = read_message(state, channel, &props, &body );
701 0 0         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
702 0           goto error_out2;
703              
704 0           hv_stores(envelope_hv, "props", props);
705 0           hv_stores(envelope_hv, "body", body);
706              
707 0           *envelope_sv_ptr = newRV_noinc(MUTABLE_SV(envelope_hv));
708 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
709 0           return ret;
710              
711 0           error_out2:
712 0           SvREFCNT_dec(envelope_hv);
713 0           error_out1:
714 0           *envelope_sv_ptr = &PL_sv_undef;
715 0           return ret;
716             }
717              
718 0           void array_to_amqp_array(AV *perl_array, amqp_array_t *mq_array, short force_utf8) {
719 0           int idx = 0;
720             SV **value;
721              
722 0           amqp_field_value_t *new_elements = amqp_pool_alloc(
723             &temp_memory_pool,
724 0           ((av_len(perl_array)+1) * sizeof(amqp_field_value_t))
725             );
726             amqp_field_value_t *element;
727              
728 0           mq_array->entries = new_elements;
729 0           mq_array->num_entries = 0;
730              
731 0 0         for ( idx = 0; idx <= av_len(perl_array); idx += 1) {
732 0           value = av_fetch( perl_array, idx, 0 );
733              
734             /* We really should never see NULL here. */
735             assert(value != NULL);
736              
737             /* Let's start getting the type... */
738 0           element = &mq_array->entries[mq_array->num_entries];
739 0           mq_array->num_entries += 1;
740 0           element->kind = amqp_kind_for_sv(value, force_utf8);
741              
742             __DEBUG__( warn("%d KIND >%c<", __LINE__, (unsigned char)element->kind) );
743              
744 0           switch (element->kind) {
745              
746 0           case AMQP_FIELD_KIND_I64:
747 0           element->value.i64 = (int64_t) SvI64(*value);
748 0           break;
749              
750 0           case AMQP_FIELD_KIND_U64:
751 0           element->value.u64 = (uint64_t) SvU64(*value);
752 0           break;
753              
754 0           case AMQP_FIELD_KIND_F64:
755             /* TODO: I don't think this is a native type on all Perls */
756 0           element->value.f64 = (double) SvNV(*value);
757 0           break;
758              
759 0           case AMQP_FIELD_KIND_UTF8:
760             case AMQP_FIELD_KIND_BYTES:
761 0           element->value.bytes = amqp_cstring_bytes(SvPV_nolen(*value));
762 0           break;
763              
764 0           case AMQP_FIELD_KIND_ARRAY:
765 0           array_to_amqp_array(MUTABLE_AV(SvRV(*value)), &(element->value.array), force_utf8);
766 0           break;
767              
768 0           case AMQP_FIELD_KIND_TABLE:
769 0           hash_to_amqp_table(MUTABLE_HV(SvRV(*value)), &(element->value.table), force_utf8);
770 0           break;
771              
772 0           default:
773 0           Perl_croak( aTHX_ "Unsupported SvType for array index %d", idx );
774             }
775             }
776 0           }
777              
778             /* Iterate over the array entries and decode them to Perl... */
779 0           SV* mq_array_to_arrayref(amqp_array_t *mq_array) {
780 0           AV* perl_array = newAV();
781              
782 0           SV* perl_element = &PL_sv_undef;
783             amqp_field_value_t* mq_element;
784              
785 0           int current_entry = 0;
786              
787 0 0         for (; current_entry < mq_array->num_entries; current_entry += 1) {
788 0           mq_element = &mq_array->entries[current_entry];
789              
790             __DEBUG__( warn("%d KIND >%c<", __LINE__, mq_element->kind) );
791              
792 0           switch (mq_element->kind) {
793             /* Boolean */
794 0           case AMQP_FIELD_KIND_BOOLEAN:
795 0           perl_element = newSViv(mq_element->value.boolean);
796 0           break;
797              
798             /* Signed values */
799 0           case AMQP_FIELD_KIND_I8:
800 0           perl_element = newSViv(mq_element->value.i8);
801 0           break;
802 0           case AMQP_FIELD_KIND_I16:
803 0           perl_element = newSViv(mq_element->value.i16);
804 0           break;
805 0           case AMQP_FIELD_KIND_I32:
806 0           perl_element = newSViv(mq_element->value.i32);
807 0           break;
808 0           case AMQP_FIELD_KIND_I64:
809 0           perl_element = newSVi64(mq_element->value.i64);
810 0           break;
811              
812             /* Unsigned values */
813 0           case AMQP_FIELD_KIND_U8:
814 0           perl_element = newSViv(mq_element->value.u8);
815 0           break;
816 0           case AMQP_FIELD_KIND_U16:
817 0           perl_element = newSViv(mq_element->value.u16);
818 0           break;
819 0           case AMQP_FIELD_KIND_U32:
820 0           perl_element = newSVuv(mq_element->value.u32);
821 0           break;
822 0           case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
823             case AMQP_FIELD_KIND_U64:
824 0           perl_element = newSVu64(mq_element->value.u64);
825 0           break;
826              
827             /* Floats */
828 0           case AMQP_FIELD_KIND_F32:
829 0           perl_element = newSVnv(mq_element->value.f32);
830 0           break;
831 0           case AMQP_FIELD_KIND_F64:
832             /* TODO: I don't think this is a native type on all Perls */
833 0           perl_element = newSVnv(mq_element->value.f64);
834 0           break;
835              
836             /* Strings and bytes */
837 0           case AMQP_FIELD_KIND_BYTES:
838 0           perl_element = newSVpvn(
839             mq_element->value.bytes.bytes,
840             mq_element->value.bytes.len
841             );
842 0           break;
843              
844             /* UTF-8 strings */
845 0           case AMQP_FIELD_KIND_UTF8:
846 0           perl_element = newSVpvn(
847             mq_element->value.bytes.bytes,
848             mq_element->value.bytes.len
849             );
850 0           SvUTF8_on(perl_element); /* It's UTF-8! */
851 0           break;
852              
853             /* Arrays */
854 0           case AMQP_FIELD_KIND_ARRAY:
855 0           perl_element = mq_array_to_arrayref(&(mq_element->value.array));
856 0           break;
857              
858             /* Tables */
859 0           case AMQP_FIELD_KIND_TABLE:
860 0           perl_element = mq_table_to_hashref(&(mq_element->value.table));
861 0           break;
862              
863             /* WTF */
864 0           default:
865             /* ACK! */
866 0           Perl_croak(
867             aTHX_ "Unsupported Perl type >%c< at index %d",
868 0           (unsigned char)mq_element->kind,
869             current_entry
870             );
871             }
872              
873 0           av_push(perl_array, perl_element);
874             }
875              
876 0           return newRV_noinc(MUTABLE_SV(perl_array));
877             }
878              
879 0           SV* mq_table_to_hashref( amqp_table_t *mq_table ) {
880             /* Iterate over the table keys and decode them to Perl... */
881             int i;
882             SV *perl_element;
883 0           HV *perl_hash = newHV();
884 0           amqp_table_entry_t *hash_entry = (amqp_table_entry_t*)NULL;
885              
886 0 0         for( i=0; i < mq_table->num_entries; i += 1 ) {
887 0           hash_entry = &(mq_table->entries[i]);
888             __DEBUG__(
889             fprintf(
890             stderr,
891             "!!! Key: >%.*s< Kind: >%c<\n",
892             (int)hash_entry->key.len,
893             (char*)hash_entry->key.bytes,
894             hash_entry->value.kind
895             );
896             );
897              
898 0           switch (hash_entry->value.kind) {
899             /* Boolean */
900 0           case AMQP_FIELD_KIND_BOOLEAN:
901 0           perl_element = newSViv(hash_entry->value.value.boolean);
902 0           break;
903              
904             /* Integers */
905 0           case AMQP_FIELD_KIND_I8:
906 0           perl_element = newSViv(hash_entry->value.value.i8);
907 0           break;
908 0           case AMQP_FIELD_KIND_I16:
909 0           perl_element = newSViv(hash_entry->value.value.i16);
910 0           break;
911 0           case AMQP_FIELD_KIND_I32:
912 0           perl_element = newSViv(hash_entry->value.value.i32);
913 0           break;
914 0           case AMQP_FIELD_KIND_I64:
915 0           perl_element = newSVi64(hash_entry->value.value.i64);
916 0           break;
917 0           case AMQP_FIELD_KIND_U8:
918 0           perl_element = newSViv(hash_entry->value.value.u8);
919 0           break;
920 0           case AMQP_FIELD_KIND_U16:
921 0           perl_element = newSViv(hash_entry->value.value.u16);
922 0           break;
923 0           case AMQP_FIELD_KIND_U32:
924 0           perl_element = newSVuv(hash_entry->value.value.u32);
925 0           break;
926 0           case AMQP_FIELD_KIND_TIMESTAMP: /* Timestamps */
927             case AMQP_FIELD_KIND_U64:
928 0           perl_element = newSVu64(hash_entry->value.value.u64);
929 0           break;
930              
931             /* Foats */
932 0           case AMQP_FIELD_KIND_F32:
933 0           perl_element = newSVnv(hash_entry->value.value.f32);
934 0           break;
935 0           case AMQP_FIELD_KIND_F64:
936             /* TODO: I don't think this is a native type on all Perls. */
937 0           perl_element = newSVnv(hash_entry->value.value.f64);
938 0           break;
939              
940 0           case AMQP_FIELD_KIND_BYTES:
941 0           perl_element = newSVpvn(
942             hash_entry->value.value.bytes.bytes,
943             hash_entry->value.value.bytes.len
944             );
945 0           break;
946              
947 0           case AMQP_FIELD_KIND_UTF8:
948 0           perl_element = newSVpvn(
949             hash_entry->value.value.bytes.bytes,
950             hash_entry->value.value.bytes.len
951             );
952 0           SvUTF8_on(perl_element); /* It's UTF-8! */
953 0           break;
954              
955 0           case AMQP_FIELD_KIND_ARRAY:
956 0           perl_element = mq_array_to_arrayref(&(hash_entry->value.value.array));
957 0           break;
958              
959 0           case AMQP_FIELD_KIND_TABLE:
960 0           perl_element = mq_table_to_hashref(&(hash_entry->value.value.table));
961 0           break;
962              
963 0           default:
964             /* ACK! */
965 0           Perl_croak(
966             aTHX_ "Unsupported Perl type >%c< at index %d",
967 0           (unsigned char)hash_entry->value.kind,
968             i
969             );
970             }
971              
972             /* Stash this in our hash. */
973 0           hv_store(
974             perl_hash,
975             hash_entry->key.bytes, hash_entry->key.len,
976             perl_element,
977             0
978             );
979              
980             }
981              
982 0           return newRV_noinc(MUTABLE_SV(perl_hash));
983             }
984              
985 0           void hash_to_amqp_table(HV *hash, amqp_table_t *table, short force_utf8) {
986             HE *he;
987             char *key;
988             SV *value;
989             I32 retlen;
990             amqp_table_entry_t *entry;
991              
992 0 0         amqp_table_entry_t *new_entries = amqp_pool_alloc( &temp_memory_pool, HvKEYS(hash) * sizeof(amqp_table_entry_t) );
993 0           table->entries = new_entries;
994              
995 0           hv_iterinit(hash);
996 0 0         while (NULL != (he = hv_iternext(hash))) {
997 0           key = hv_iterkey(he, &retlen);
998             __DEBUG__( warn("Key: %s\n", key) );
999 0           value = hv_iterval(hash, he);
1000              
1001 0 0         if (SvGMAGICAL(value)) {
1002 0           mg_get(value);
1003             }
1004              
1005 0           entry = &table->entries[table->num_entries];
1006 0           entry->key = amqp_cstring_bytes( key );
1007              
1008             /* Reserved headers, per spec must force UTF-8 for strings. */
1009             /* Other headers aren't necessarily required to do so. */
1010 0           if (
1011             /* "x-*" exchanges */
1012             (
1013 0 0         strlen(key) > 2
1014 0           &&
1015 0 0         key[0] == 'x'
1016 0           &&
1017 0 0         key[1] == '-'
1018             )
1019             ) {
1020 0           entry->value.kind = amqp_kind_for_sv( &value, 1 );
1021             }
1022             else {
1023 0           entry->value.kind = amqp_kind_for_sv( &value, force_utf8 );
1024             }
1025              
1026              
1027             __DEBUG__(
1028             warn("hash_to_amqp_table()");
1029             warn("%s", SvPV_nolen(value) );
1030             fprintf(
1031             stderr,
1032             "Key: >%.*s< Kind: >%c<\n",
1033             (int)entry->key.len,
1034             (char*)entry->key.bytes,
1035             entry->value.kind
1036             );
1037             );
1038              
1039 0           switch ( entry->value.kind ) {
1040 0           case AMQP_FIELD_KIND_I64:
1041 0           entry->value.value.i64 = (int64_t) SvI64( value );
1042 0           break;
1043              
1044 0           case AMQP_FIELD_KIND_U64:
1045 0           entry->value.value.u64 = (uint64_t) SvU64( value );
1046 0           break;
1047              
1048 0           case AMQP_FIELD_KIND_F64:
1049             /* TODO: I don't think this is a native type on all Perls. */
1050 0           entry->value.value.f64 = (double) SvNV( value );
1051 0           break;
1052              
1053 0           case AMQP_FIELD_KIND_BYTES:
1054             case AMQP_FIELD_KIND_UTF8:
1055 0           entry->value.value.bytes = amqp_cstring_bytes( SvPV_nolen( value )
1056             );
1057 0           break;
1058              
1059 0           case AMQP_FIELD_KIND_ARRAY:
1060 0           array_to_amqp_array(
1061 0           MUTABLE_AV(SvRV(value)),
1062             &(entry->value.value.array),
1063             force_utf8
1064             );
1065 0           break;
1066              
1067 0           case AMQP_FIELD_KIND_TABLE:
1068 0           hash_to_amqp_table(
1069 0           MUTABLE_HV(SvRV(value)),
1070             &(entry->value.value.table),
1071             force_utf8
1072             );
1073 0           break;
1074              
1075 0           default:
1076 0           Perl_croak( aTHX_ "amqp_kind_for_sv() returned a type I don't understand." );
1077             }
1078              
1079             /* Successfully (we think) added an entry to the table. */
1080 0           table->num_entries++;
1081             }
1082              
1083 0           return;
1084             }
1085              
1086 0           static amqp_rpc_reply_t basic_get(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, SV **envelope_sv_ptr, amqp_boolean_t no_ack) {
1087             amqp_rpc_reply_t ret;
1088 0           HV *envelope_hv = NULL;
1089             SV *props;
1090             SV *body;
1091              
1092 0           ret = amqp_basic_get(state, channel, queue, no_ack);
1093 0 0         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1094 0           goto error_out1;
1095              
1096 0 0         if (AMQP_BASIC_GET_OK_METHOD != ret.reply.id)
1097 0           goto success_out;
1098              
1099 0           envelope_hv = newHV();
1100              
1101             {
1102 0           amqp_basic_get_ok_t *ok = (amqp_basic_get_ok_t *) ret.reply.decoded;
1103 0           hv_stores(envelope_hv, "delivery_tag", newSVu64(ok->delivery_tag));
1104 0           hv_stores(envelope_hv, "redelivered", newSViv(ok->redelivered));
1105 0           hv_stores(envelope_hv, "exchange", newSVpvn(ok->exchange.bytes, ok->exchange.len));
1106 0           hv_stores(envelope_hv, "routing_key", newSVpvn(ok->routing_key.bytes, ok->routing_key.len));
1107 0           hv_stores(envelope_hv, "message_count", newSViv(ok->message_count));
1108             }
1109              
1110 0           ret = read_message(state, channel, &props, &body );
1111 0 0         if (AMQP_RESPONSE_NORMAL != ret.reply_type)
1112 0           goto error_out2;
1113              
1114 0           hv_stores(envelope_hv, "props", props);
1115 0           hv_stores(envelope_hv, "body", body);
1116            
1117 0           success_out:
1118 0 0         *envelope_sv_ptr = envelope_hv ? newRV_noinc(MUTABLE_SV(envelope_hv)) : &PL_sv_undef;
1119 0           ret.reply_type = AMQP_RESPONSE_NORMAL;
1120 0           return ret;
1121              
1122 0           error_out2:
1123 0           SvREFCNT_dec(envelope_hv);
1124 0           error_out1:
1125 0           *envelope_sv_ptr = &PL_sv_undef;
1126 0           return ret;
1127             }
1128              
1129              
1130              
1131             MODULE = Net::AMQP::RabbitMQ PACKAGE = Net::AMQP::RabbitMQ PREFIX = net_amqp_rabbitmq_
1132              
1133             REQUIRE: 1.9505
1134             PROTOTYPES: DISABLE
1135              
1136             BOOT:
1137 35 50         PERL_MATH_INT64_LOAD_OR_CROAK;
1138              
1139             int
1140             net_amqp_rabbitmq_connect(conn, hostname, options, client_properties = NULL)
1141             Net::AMQP::RabbitMQ conn
1142             char *hostname
1143             HV *options
1144             HV *client_properties
1145             PREINIT:
1146             amqp_socket_t *sock;
1147 1           char *user = "guest";
1148 1           char *password = "guest";
1149 1           char *vhost = "/";
1150 1           int port = 5672;
1151 1           int channel_max = 0;
1152 1           int frame_max = 131072;
1153 1           int heartbeat = 0;
1154 1           double timeout = -1;
1155             struct timeval to;
1156              
1157 1           int ssl = 0;
1158 1           char *ssl_cacert = NULL;
1159 1           char *ssl_cert = NULL;
1160 1           char *ssl_key = NULL;
1161 1           int ssl_verify_host = 1;
1162 1           char *sasl_method = "plain";
1163 1           amqp_sasl_method_enum sasl_type = AMQP_SASL_METHOD_PLAIN;
1164 1 50         amqp_table_t client_properties_tbl = amqp_empty_table;
1165             CODE:
1166 1 50         str_from_hv(options, user);
1167 1 50         str_from_hv(options, password);
1168 1 50         str_from_hv(options, vhost);
1169 1 50         int_from_hv(options, channel_max);
1170 1 50         int_from_hv(options, frame_max);
1171 1 50         int_from_hv(options, heartbeat);
1172 1 50         int_from_hv(options, port);
1173 1 50         double_from_hv(options, timeout);
1174              
1175 1 50         int_from_hv(options, ssl);
1176 1 50         str_from_hv(options, ssl_cacert);
1177 1 50         str_from_hv(options, ssl_cert);
1178 1 50         str_from_hv(options, ssl_key);
1179 1 50         int_from_hv(options, ssl_verify_host);
1180 1 50         str_from_hv(options, sasl_method);
1181              
1182 1 50         if(client_properties)
1183             {
1184 0           hash_to_amqp_table(client_properties, &client_properties_tbl, 1);
1185             }
1186              
1187 1 50         if(timeout >= 0) {
1188 1           to.tv_sec = floor(timeout);
1189 1           to.tv_usec = 1000000.0 * (timeout - floor(timeout));
1190             }
1191              
1192 1 50         if ( ssl ) {
1193             __DEBUG__(warn("!!! SSL ENABLED !!!\n"));
1194             #ifndef NAR_HAVE_OPENSSL
1195             Perl_croak(aTHX_ "no ssl support, please install openssl and reinstall");
1196             #endif
1197 0 0         if (!hv_exists(options, "port", 4)) {
1198 0           port = 5671;
1199             }
1200 0           sock = amqp_ssl_socket_new(conn);
1201 0 0         if ( !sock ) {
1202 0           Perl_croak(aTHX_ "error creating SSL socket");
1203             }
1204              
1205 0           amqp_ssl_socket_set_verify_hostname( sock, (amqp_boolean_t)ssl_verify_host );
1206              
1207 0 0         if ( ( ssl_cacert != NULL ) && strlen(ssl_cacert) ) {
    0          
1208 0 0         if ( amqp_ssl_socket_set_cacert(sock, ssl_cacert) ) {
1209 0           Perl_croak(aTHX_ "error setting CA certificate");
1210             }
1211             }
1212              
1213 0 0         if ( ( ssl_key != NULL ) && strlen(ssl_key) && ( ssl_cert != NULL ) && strlen(ssl_cert) ) {
    0          
    0          
    0          
1214 0 0         if ( amqp_ssl_socket_set_key( sock, ssl_cert, ssl_key ) ) {
1215 0           Perl_croak(aTHX_ "error setting client cert");
1216             }
1217             }
1218             }
1219             else {
1220             __DEBUG__(
1221             warn("!!! SSL DISABLED !!!\n"));
1222 1           sock = amqp_tcp_socket_new(conn);
1223 1 50         if (!sock) {
1224 0           Perl_croak(aTHX_ "error creating TCP socket");
1225             }
1226             }
1227              
1228             /*if there's data in the buffer, clear it */
1229 1 50         while ( amqp_data_in_buffer(conn) ) {
1230             amqp_frame_t frame;
1231 0           amqp_simple_wait_frame( conn, &frame );
1232             }
1233              
1234             /* should probably be amqp_raw_equal, but this is a minimal hack */
1235 1 50         if (strcasecmp(sasl_method, "external") == 0) {
1236 0           sasl_type = AMQP_SASL_METHOD_EXTERNAL;
1237             }
1238              
1239 1 50         die_on_error(aTHX_ amqp_socket_open_noblock(sock, hostname, port, (timeout<0)?NULL:&to), conn, "opening socket");
1240 0           die_on_amqp_error(aTHX_ amqp_login_with_properties(conn, vhost, channel_max, frame_max, heartbeat, &client_properties_tbl, sasl_type, user, password), conn, "Logging in");
1241              
1242 0           maybe_release_buffers(conn);
1243              
1244 0 0         RETVAL = 1;
1245             OUTPUT:
1246             RETVAL
1247              
1248             void
1249             net_amqp_rabbitmq_channel_open(conn, channel)
1250             Net::AMQP::RabbitMQ conn
1251             int channel
1252             CODE:
1253 0 0         assert_amqp_connected(conn);
    0          
1254              
1255 0           amqp_channel_open(conn, channel);
1256 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Opening channel");
1257              
1258             void
1259             net_amqp_rabbitmq_channel_close(conn, channel)
1260             Net::AMQP::RabbitMQ conn
1261             int channel
1262             CODE:
1263             /* If we don't have a socket, just return. */
1264 1 50         if ( ! has_valid_connection( conn ) ) {
    0          
1265 1           return;
1266             }
1267 0           die_on_amqp_error(aTHX_ amqp_channel_close(conn, channel, AMQP_REPLY_SUCCESS), conn, "Closing channel");
1268              
1269             void
1270             net_amqp_rabbitmq_exchange_declare(conn, channel, exchange, options = NULL, args = NULL)
1271             Net::AMQP::RabbitMQ conn
1272             int channel
1273             char *exchange
1274             HV *options
1275             HV *args
1276             PREINIT:
1277 0           char *exchange_type = "direct";
1278 0           int passive = 0;
1279 0           int durable = 0;
1280 0           int auto_delete = 0;
1281 0           int internal = 0;
1282 0 0         amqp_table_t arguments = amqp_empty_table;
1283             CODE:
1284 0 0         assert_amqp_connected(conn);
    0          
1285              
1286 0 0         if(options) {
1287 0 0         str_from_hv(options, exchange_type);
1288 0 0         int_from_hv(options, passive);
1289 0 0         int_from_hv(options, durable);
1290 0 0         int_from_hv(options, auto_delete);
1291 0 0         int_from_hv(options, internal);
1292             }
1293 0 0         if(args)
1294             {
1295 0           hash_to_amqp_table(args, &arguments, 1);
1296             }
1297             __DEBUG__(
1298             warn("%d: amqp_declare_exchange with channel:%d, exchange:%s, and exchange_type:%s\n",
1299             __LINE__,
1300             channel,
1301             exchange,
1302             exchange_type
1303             );
1304             dump_table(arguments);
1305             );
1306 0           amqp_exchange_declare(
1307             conn,
1308             channel,
1309             amqp_cstring_bytes(exchange),
1310             amqp_cstring_bytes(exchange_type),
1311             passive,
1312             (amqp_boolean_t)durable,
1313             (amqp_boolean_t)auto_delete,
1314             (amqp_boolean_t)internal,
1315             arguments);
1316 0           maybe_release_buffers(conn);
1317 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring exchange");
1318              
1319             void
1320             net_amqp_rabbitmq_exchange_delete(conn, channel, exchange, options = NULL)
1321             Net::AMQP::RabbitMQ conn
1322             int channel
1323             char *exchange
1324             HV *options
1325             PREINIT:
1326 0 0         int if_unused = 1;
1327             CODE:
1328 0 0         assert_amqp_connected(conn);
    0          
1329              
1330 0 0         if(options) {
1331 0 0         int_from_hv(options, if_unused);
1332             }
1333 0           amqp_exchange_delete(conn, channel, amqp_cstring_bytes(exchange), if_unused);
1334 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting exchange");
1335              
1336             void net_amqp_rabbitmq_exchange_bind(conn, channel, destination, source, routing_key, args = NULL)
1337             Net::AMQP::RabbitMQ conn
1338             int channel
1339             char *destination
1340             char *source
1341             char *routing_key
1342             HV *args
1343             PREINIT:
1344 0           amqp_exchange_bind_ok_t *reply = (amqp_exchange_bind_ok_t*)NULL;
1345 0 0         amqp_table_t arguments = amqp_empty_table;
1346             CODE:
1347             /* We must be connected */
1348 0 0         assert_amqp_connected(conn);
    0          
1349              
1350             /* Parameter validation */
1351 0 0         if( ( source == NULL || 0 == strlen(source) )
    0          
1352 0 0         ||
1353 0 0         ( destination == NULL || 0 == strlen(destination) )
1354             )
1355             {
1356 0           Perl_croak(aTHX_ "source and destination must both be specified");
1357             }
1358              
1359             /* Pull in arguments if we have any */
1360 0 0         if(args)
1361             {
1362 0           hash_to_amqp_table(args, &arguments, 1);
1363             }
1364              
1365 0           reply = amqp_exchange_bind(
1366             conn,
1367             channel,
1368             amqp_cstring_bytes(destination),
1369             amqp_cstring_bytes(source),
1370             amqp_cstring_bytes(routing_key),
1371             arguments
1372             );
1373 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding Exchange");
1374              
1375             void net_amqp_rabbitmq_exchange_unbind(conn, channel, destination, source, routing_key, args = NULL)
1376             Net::AMQP::RabbitMQ conn
1377             int channel
1378             char *destination
1379             char *source
1380             char *routing_key
1381             HV *args
1382             PREINIT:
1383 0           amqp_exchange_unbind_ok_t *reply = (amqp_exchange_unbind_ok_t*)NULL;
1384 0 0         amqp_table_t arguments = amqp_empty_table;
1385             CODE:
1386             /* We must be connected */
1387 0 0         assert_amqp_connected(conn);
    0          
1388              
1389             /* Parameter validation */
1390 0 0         if( ( source == NULL || 0 == strlen(source) )
    0          
1391 0 0         ||
1392 0 0         ( destination == NULL || 0 == strlen(destination) )
1393             )
1394             {
1395 0           Perl_croak(aTHX_ "source and destination must both be specified");
1396             }
1397              
1398             /* Pull in arguments if we have any */
1399 0 0         if(args)
1400             {
1401 0           hash_to_amqp_table(args, &arguments, 1);
1402             }
1403              
1404 0           reply = amqp_exchange_unbind(
1405             conn,
1406             channel,
1407             amqp_cstring_bytes(destination),
1408             amqp_cstring_bytes(source),
1409             amqp_cstring_bytes(routing_key),
1410             arguments
1411             );
1412 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding Exchange");
1413              
1414             void net_amqp_rabbitmq_queue_delete(conn, channel, queuename, options = NULL)
1415             Net::AMQP::RabbitMQ conn
1416             int channel
1417             char *queuename
1418             HV *options
1419             PREINIT:
1420 0           int if_unused = 1;
1421 0           int if_empty = 1;
1422 0 0         amqp_queue_delete_ok_t *reply = (amqp_queue_delete_ok_t*)NULL;
1423             CODE:
1424 0 0         assert_amqp_connected(conn);
    0          
1425              
1426 0 0         if(options) {
1427 0 0         int_from_hv(options, if_unused);
1428 0 0         int_from_hv(options, if_empty);
1429             }
1430 0           reply = amqp_queue_delete(
1431             conn,
1432             channel,
1433             amqp_cstring_bytes(queuename),
1434             if_unused,
1435             if_empty
1436             );
1437 0 0         if (reply == NULL) {
1438 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting queue");
1439             }
1440 0 0         XPUSHs(sv_2mortal(newSVuv(reply->message_count)));
1441              
1442             void
1443             net_amqp_rabbitmq_queue_declare(conn, channel, queuename, options = NULL, args = NULL)
1444             Net::AMQP::RabbitMQ conn
1445             int channel
1446             char *queuename
1447             HV *options
1448             HV *args
1449             PREINIT:
1450 0           int passive = 0;
1451 0           int durable = 0;
1452 0           int exclusive = 0;
1453 0           int auto_delete = 1;
1454 0           amqp_table_t arguments = amqp_empty_table;
1455 0           amqp_bytes_t queuename_b = amqp_empty_bytes;
1456 0 0         amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t*)NULL;
1457             PPCODE:
1458 0 0         assert_amqp_connected(conn);
    0          
1459              
1460 0 0         if(queuename && strcmp(queuename, "")) queuename_b = amqp_cstring_bytes(queuename);
    0          
1461 0 0         if(options) {
1462 0 0         int_from_hv(options, passive);
1463 0 0         int_from_hv(options, durable);
1464 0 0         int_from_hv(options, exclusive);
1465 0 0         int_from_hv(options, auto_delete);
1466             }
1467 0 0         if(args)
1468             {
1469 0           hash_to_amqp_table(args, &arguments, 1);
1470             }
1471 0           r = amqp_queue_declare(conn, channel, queuename_b, passive,
1472             durable, exclusive, auto_delete,
1473             arguments);
1474 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring queue");
1475 0 0         XPUSHs(sv_2mortal(newSVpvn(r->queue.bytes, r->queue.len)));
1476 0 0         if(GIMME_V == G_LIST) {
1477 0 0         XPUSHs(sv_2mortal(newSVuv(r->message_count)));
1478 0 0         XPUSHs(sv_2mortal(newSVuv(r->consumer_count)));
1479             }
1480              
1481             void
1482             net_amqp_rabbitmq_queue_bind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1483             Net::AMQP::RabbitMQ conn
1484             int channel
1485             char *queuename
1486             char *exchange
1487             char *bindingkey
1488             HV *args
1489             PREINIT:
1490 0 0         amqp_table_t arguments = amqp_empty_table;
1491             CODE:
1492 0 0         assert_amqp_connected(conn);
    0          
1493              
1494 0 0         if(queuename == NULL
1495 0 0         ||
1496             exchange == NULL
1497 0           ||
1498 0 0         0 == strlen(queuename)
1499 0           ||
1500 0 0         0 == strlen(exchange)
1501             )
1502             {
1503 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1504             }
1505              
1506 0 0         if(args)
1507 0           hash_to_amqp_table(args, &arguments, 0);
1508 0           amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename),
1509             amqp_cstring_bytes(exchange),
1510             amqp_cstring_bytes(bindingkey),
1511             arguments);
1512 0           maybe_release_buffers(conn);
1513 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding queue");
1514              
1515             void
1516             net_amqp_rabbitmq_queue_unbind(conn, channel, queuename, exchange, bindingkey, args = NULL)
1517             Net::AMQP::RabbitMQ conn
1518             int channel
1519             char *queuename
1520             char *exchange
1521             char *bindingkey
1522             HV *args
1523             PREINIT:
1524 0 0         amqp_table_t arguments = amqp_empty_table;
1525             CODE:
1526 0 0         assert_amqp_connected(conn);
    0          
1527              
1528 0 0         if(queuename == NULL || exchange == NULL)
    0          
1529             {
1530 0           Perl_croak(aTHX_ "queuename and exchange must both be specified");
1531             }
1532              
1533 0 0         if(args)
1534             {
1535 0           hash_to_amqp_table(args, &arguments, 0);
1536             }
1537 0           amqp_queue_unbind(conn, channel, amqp_cstring_bytes(queuename),
1538             amqp_cstring_bytes(exchange),
1539             amqp_cstring_bytes(bindingkey),
1540             arguments);
1541 0           maybe_release_buffers(conn);
1542 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding queue");
1543              
1544             SV *
1545             net_amqp_rabbitmq_consume(conn, channel, queuename, options = NULL)
1546             Net::AMQP::RabbitMQ conn
1547             int channel
1548             char *queuename
1549             HV *options
1550             PREINIT:
1551             amqp_basic_consume_ok_t *r;
1552 0           char *consumer_tag = NULL;
1553 0           int no_local = 0;
1554 0           int no_ack = 1;
1555 0 0         int exclusive = 0;
1556             CODE:
1557 0 0         assert_amqp_connected(conn);
    0          
1558              
1559 0 0         if(options) {
1560 0 0         str_from_hv(options, consumer_tag);
1561 0 0         int_from_hv(options, no_local);
1562 0 0         int_from_hv(options, no_ack);
1563 0 0         int_from_hv(options, exclusive);
1564             }
1565 0 0         r = amqp_basic_consume(conn, channel, amqp_cstring_bytes(queuename),
1566 0           consumer_tag ? amqp_cstring_bytes(consumer_tag) : amqp_empty_bytes,
1567             no_local, no_ack, exclusive, amqp_empty_table);
1568 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Consume queue");
1569 0           RETVAL = newSVpvn(r->consumer_tag.bytes, r->consumer_tag.len);
1570             OUTPUT:
1571             RETVAL
1572              
1573             int
1574             net_amqp_rabbitmq_cancel(conn, channel, consumer_tag)
1575             Net::AMQP::RabbitMQ conn
1576             int channel
1577             char *consumer_tag
1578             PREINIT:
1579             amqp_basic_cancel_ok_t *r;
1580             CODE:
1581 0 0         assert_amqp_connected(conn);
    0          
1582              
1583 0           r = amqp_basic_cancel(conn, channel, amqp_cstring_bytes(consumer_tag));
1584 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "cancel");
1585              
1586 0 0         if ( r == NULL ) {
1587 0           RETVAL = 0;
1588             }
1589             else {
1590 0 0         if(strlen(consumer_tag) == r->consumer_tag.len && 0 == strcmp(consumer_tag, (char *)r->consumer_tag.bytes)) {
    0          
1591 0           RETVAL = 1;
1592             } else {
1593 0           RETVAL = 0;
1594             }
1595             }
1596             OUTPUT:
1597             RETVAL
1598              
1599             SV *
1600             net_amqp_rabbitmq_recv(conn, timeout = 0)
1601             Net::AMQP::RabbitMQ conn
1602             int timeout
1603             PREINIT:
1604             SV *envelope;
1605             amqp_rpc_reply_t ret;
1606             struct timeval timeout_tv;
1607             CODE:
1608 0 0         assert_amqp_connected(conn);
    0          
1609             __DEBUG__(fprintf(stderr, "RECV()!\n"););
1610              
1611 0 0         if (timeout > 0) {
1612 0           timeout_tv.tv_sec = timeout / 1000;
1613 0           timeout_tv.tv_usec = (timeout % 1000) * 1000;
1614             }
1615              
1616             /* Set the waiting time to 0 */
1617 0 0         if (timeout == -1) {
1618 0           timeout_tv.tv_sec = 0;
1619 0           timeout_tv.tv_usec = 0;
1620             }
1621              
1622 0           maybe_release_buffers(conn);
1623 0 0         ret = consume_message(conn, &RETVAL, timeout ? &timeout_tv : NULL);
1624 0 0         if (AMQP_RESPONSE_LIBRARY_EXCEPTION != ret.reply_type || AMQP_STATUS_TIMEOUT != ret.library_error)
    0          
1625 0           die_on_amqp_error(aTHX_ ret, conn, "recv");
1626              
1627             OUTPUT:
1628             RETVAL
1629              
1630             void
1631             net_amqp_rabbitmq_ack(conn, channel, delivery_tag, multiple = 0)
1632             Net::AMQP::RabbitMQ conn
1633             int channel
1634             uint64_t delivery_tag
1635             int multiple
1636             CODE:
1637 0 0         assert_amqp_connected(conn);
    0          
1638              
1639 0           die_on_error(aTHX_ amqp_basic_ack(conn, channel, delivery_tag, multiple), conn,
1640             "ack");
1641              
1642             void
1643             net_amqp_rabbitmq_nack(conn, channel, delivery_tag, multiple = 0, requeue = 0)
1644             Net::AMQP::RabbitMQ conn
1645             int channel
1646             uint64_t delivery_tag
1647             int multiple
1648             int requeue
1649             CODE:
1650 0 0         assert_amqp_connected(conn);
    0          
1651              
1652 0           die_on_error(
1653             aTHX_ amqp_basic_nack(
1654             conn,
1655             channel,
1656             delivery_tag,
1657             (amqp_boolean_t)multiple,
1658             (amqp_boolean_t)requeue
1659             ),
1660             conn,
1661             "nack"
1662             );
1663              
1664             void
1665             net_amqp_rabbitmq_reject(conn, channel, delivery_tag, requeue = 0)
1666             Net::AMQP::RabbitMQ conn
1667             int channel
1668             uint64_t delivery_tag
1669             int requeue
1670             CODE:
1671 0 0         assert_amqp_connected(conn);
    0          
1672              
1673 0           die_on_error(
1674             aTHX_ amqp_basic_reject(
1675             conn,
1676             channel,
1677             delivery_tag,
1678             requeue
1679             ),
1680             conn,
1681             "reject"
1682             );
1683              
1684              
1685             void
1686             net_amqp_rabbitmq_purge(conn, channel, queuename)
1687             Net::AMQP::RabbitMQ conn
1688             int channel
1689             char *queuename
1690             CODE:
1691 0 0         assert_amqp_connected(conn);
    0          
1692              
1693 0           amqp_queue_purge(conn, channel, amqp_cstring_bytes(queuename));
1694 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Purging queue");
1695              
1696             void
1697             net_amqp_rabbitmq__publish(conn, channel, routing_key, body, options = NULL, props = NULL)
1698             Net::AMQP::RabbitMQ conn
1699             int channel
1700             HV *options;
1701             char *routing_key
1702             SV *body
1703             HV *props
1704             PREINIT:
1705             SV **v;
1706 0           char *exchange = "amq.direct";
1707 0           amqp_boolean_t mandatory = 0;
1708 0           amqp_boolean_t immediate = 0;
1709             int rv;
1710 0           amqp_bytes_t exchange_b = { 0 };
1711             amqp_bytes_t routing_key_b;
1712             amqp_bytes_t body_b;
1713             struct amqp_basic_properties_t_ properties;
1714             STRLEN len;
1715 0 0         int force_utf8_in_header_strings = 0;
1716             CODE:
1717 0 0         assert_amqp_connected(conn);
    0          
1718              
1719 0           routing_key_b = amqp_cstring_bytes(routing_key);
1720 0           body_b.bytes = SvPV(body, len);
1721 0           body_b.len = len;
1722 0 0         if(options) {
1723 0 0         if(NULL != (v = hv_fetchs(options, "mandatory", 0))) {
1724 0           mandatory = SvIV(*v) ? 1 : 0;
1725             }
1726 0 0         if(NULL != (v = hv_fetchs(options, "immediate", 0))) {
1727 0           immediate = SvIV(*v) ? 1 : 0;
1728             }
1729 0 0         if(NULL != (v = hv_fetchs(options, "exchange", 0))) {
1730 0           exchange_b = amqp_cstring_bytes(SvPV_nolen(*v));
1731             }
1732              
1733             /* This is an internal option, only for determining if we want to force utf8 */
1734 0 0         int_from_hv(options, force_utf8_in_header_strings);
1735             }
1736 0           properties.headers = amqp_empty_table;
1737 0           properties._flags = 0;
1738 0 0         if (props) {
1739 0 0         if (NULL != (v = hv_fetchs(props, "content_type", 0))) {
1740 0           properties.content_type = amqp_cstring_bytes(SvPV_nolen(*v));
1741 0           properties._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
1742             }
1743 0 0         if (NULL != (v = hv_fetchs(props, "content_encoding", 0))) {
1744 0           properties.content_encoding = amqp_cstring_bytes(SvPV_nolen(*v));
1745 0           properties._flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG;
1746             }
1747 0 0         if (NULL != (v = hv_fetchs(props, "correlation_id", 0))) {
1748 0           properties.correlation_id = amqp_cstring_bytes(SvPV_nolen(*v));
1749 0           properties._flags |= AMQP_BASIC_CORRELATION_ID_FLAG;
1750             }
1751 0 0         if (NULL != (v = hv_fetchs(props, "reply_to", 0))) {
1752 0           properties.reply_to = amqp_cstring_bytes(SvPV_nolen(*v));
1753 0           properties._flags |= AMQP_BASIC_REPLY_TO_FLAG;
1754             }
1755 0 0         if (NULL != (v = hv_fetchs(props, "expiration", 0))) {
1756 0           properties.expiration = amqp_cstring_bytes(SvPV_nolen(*v));
1757 0           properties._flags |= AMQP_BASIC_EXPIRATION_FLAG;
1758             }
1759 0 0         if (NULL != (v = hv_fetchs(props, "message_id", 0))) {
1760 0           properties.message_id = amqp_cstring_bytes(SvPV_nolen(*v));
1761 0           properties._flags |= AMQP_BASIC_MESSAGE_ID_FLAG;
1762             }
1763 0 0         if (NULL != (v = hv_fetchs(props, "type", 0))) {
1764 0           properties.type = amqp_cstring_bytes(SvPV_nolen(*v));
1765 0           properties._flags |= AMQP_BASIC_TYPE_FLAG;
1766             }
1767 0 0         if (NULL != (v = hv_fetchs(props, "user_id", 0))) {
1768 0           properties.user_id = amqp_cstring_bytes(SvPV_nolen(*v));
1769 0           properties._flags |= AMQP_BASIC_USER_ID_FLAG;
1770             }
1771 0 0         if (NULL != (v = hv_fetchs(props, "app_id", 0))) {
1772 0           properties.app_id = amqp_cstring_bytes(SvPV_nolen(*v));
1773 0           properties._flags |= AMQP_BASIC_APP_ID_FLAG;
1774             }
1775 0 0         if (NULL != (v = hv_fetchs(props, "delivery_mode", 0))) {
1776 0           properties.delivery_mode = (uint8_t) SvIV(*v);
1777 0           properties._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
1778             }
1779 0 0         if (NULL != (v = hv_fetchs(props, "priority", 0))) {
1780 0           properties.priority = (uint8_t) SvIV(*v);
1781 0           properties._flags |= AMQP_BASIC_PRIORITY_FLAG;
1782             }
1783 0 0         if (NULL != (v = hv_fetchs(props, "timestamp", 0))) {
1784 0           properties.timestamp = (uint64_t) SvI64(*v);
1785 0           properties._flags |= AMQP_BASIC_TIMESTAMP_FLAG;
1786             }
1787 0 0         if (NULL != (v = hv_fetchs(props, "headers", 0)) && SvOK(*v)) {
    0          
1788 0           hash_to_amqp_table(MUTABLE_HV(SvRV(*v)), &properties.headers, force_utf8_in_header_strings);
1789 0           properties._flags |= AMQP_BASIC_HEADERS_FLAG;
1790             }
1791             }
1792             __DEBUG__( warn("PUBLISHING HEADERS..."); dump_table( properties.headers ) );
1793 0           rv = amqp_basic_publish(conn, channel, exchange_b, routing_key_b, mandatory, immediate, &properties, body_b);
1794 0           maybe_release_buffers(conn);
1795              
1796             /* If the connection failed, blast the file descriptor! */
1797 0 0         if ( rv == AMQP_STATUS_CONNECTION_CLOSED || rv == AMQP_STATUS_SOCKET_ERROR ) {
    0          
1798 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_FORCE );
1799 0           Perl_croak(aTHX_ "Publish failed because AMQP socket connection was closed.");
1800             }
1801              
1802             /* Otherwise, just croak */
1803 0 0         if ( rv != AMQP_STATUS_OK ) {
1804 0           Perl_croak( aTHX_ "Publish failed, %s\n", amqp_error_string2(rv));
1805             }
1806              
1807             SV *
1808             net_amqp_rabbitmq_get(conn, channel, queuename, options = NULL)
1809             Net::AMQP::RabbitMQ conn
1810             int channel
1811             char *queuename
1812             HV *options
1813             PREINIT:
1814 0 0         int no_ack = 1;
1815             CODE:
1816 0 0         assert_amqp_connected(conn);
    0          
1817              
1818 0 0         if (options)
1819 0 0         int_from_hv(options, no_ack);
1820              
1821 0           maybe_release_buffers(conn);
1822 0 0         die_on_amqp_error(aTHX_ basic_get(conn, channel, queuename ? amqp_cstring_bytes(queuename) : amqp_empty_bytes, &RETVAL, no_ack), conn, "basic_get");
1823              
1824             OUTPUT:
1825             RETVAL
1826              
1827             int
1828             net_amqp_rabbitmq_get_channel_max(conn)
1829             Net::AMQP::RabbitMQ conn
1830             CODE:
1831 0           RETVAL = amqp_get_channel_max(conn);
1832             OUTPUT:
1833             RETVAL
1834              
1835             SV*
1836             net_amqp_rabbitmq_get_sockfd(conn)
1837             Net::AMQP::RabbitMQ conn
1838             CODE:
1839             /**
1840             * this is the warning from librabbitmq-c. you have been warned.
1841             *
1842             * \warning Use the socket returned from this function carefully, incorrect use
1843             * of the socket outside of the library will lead to undefined behavior.
1844             * Additionally rabbitmq-c may use the socket differently version-to-version,
1845             * what may work in one version, may break in the next version. Be sure to
1846             * throughly test any applications that use the socket returned by this
1847             * function especially when using a newer version of rabbitmq-c
1848             *
1849             */
1850 0 0         if ( has_valid_connection( conn ) ) {
    0          
1851 0           RETVAL = newSViv( amqp_get_sockfd(conn) );
1852             }
1853             else {
1854             /* We don't have a connection, we're still here. */
1855 0           RETVAL = &PL_sv_undef;
1856             }
1857             OUTPUT:
1858             RETVAL
1859              
1860             SV*
1861             net_amqp_rabbitmq_is_connected(conn)
1862             Net::AMQP::RabbitMQ conn
1863             CODE:
1864 0 0         if ( has_valid_connection( conn ) ) {
    0          
1865 0           RETVAL = newSViv(1);
1866             }
1867             else {
1868             /* We don't have a connection, we're still here. */
1869 0           RETVAL = &PL_sv_undef;
1870             }
1871             OUTPUT:
1872             RETVAL
1873              
1874             void
1875             net_amqp_rabbitmq_disconnect(conn)
1876             Net::AMQP::RabbitMQ conn
1877             PREINIT:
1878             int sockfd;
1879             CODE:
1880 0 0         if ( amqp_get_socket(conn) != NULL ) {
1881 0           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1882 0           amqp_socket_close( amqp_get_socket( conn ), AMQP_SC_NONE );
1883             }
1884              
1885             Net::AMQP::RabbitMQ
1886             net_amqp_rabbitmq__new(clazz)
1887             char *clazz
1888             CODE:
1889 33           RETVAL = amqp_new_connection();
1890             OUTPUT:
1891             RETVAL
1892              
1893             void
1894             net_amqp_rabbitmq__destroy_connection_close(conn)
1895             Net::AMQP::RabbitMQ conn
1896             CODE:
1897 33 100         if ( amqp_get_socket(conn) != NULL ) {
1898 1           amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
1899             }
1900              
1901             void
1902             net_amqp_rabbitmq__destroy_cleanup(conn)
1903             Net::AMQP::RabbitMQ conn
1904             CODE:
1905 33           empty_amqp_pool( &temp_memory_pool );
1906 33           amqp_destroy_connection(conn);
1907              
1908             void
1909             net_amqp_rabbitmq_heartbeat(conn)
1910             Net::AMQP::RabbitMQ conn
1911             PREINIT:
1912             amqp_frame_t f;
1913             CODE:
1914 0           f.frame_type = AMQP_FRAME_HEARTBEAT;
1915 0           f.channel = 0;
1916             __DEBUG__(fprintf(stderr, "HEARTBEAT!\n"););
1917 0           amqp_send_frame(conn, &f);
1918              
1919             void
1920             net_amqp_rabbitmq_tx_select(conn, channel, args = NULL)
1921             Net::AMQP::RabbitMQ conn
1922             int channel
1923             HV *args
1924             CODE:
1925 0           amqp_tx_select(conn, channel);
1926 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Selecting transaction");
1927              
1928             void
1929             net_amqp_rabbitmq_tx_commit(conn, channel, args = NULL)
1930             Net::AMQP::RabbitMQ conn
1931             int channel
1932             HV *args
1933             CODE:
1934 0           amqp_tx_commit(conn, channel);
1935 0           maybe_release_buffers(conn);
1936 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Commiting transaction");
1937              
1938             void
1939             net_amqp_rabbitmq_tx_rollback(conn, channel, args = NULL)
1940             Net::AMQP::RabbitMQ conn
1941             int channel
1942             HV *args
1943             CODE:
1944 0           amqp_tx_rollback(conn, channel);
1945 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Rolling Back transaction");
1946              
1947             SV* net_amqp_rabbitmq_get_rpc_timeout(conn)
1948             Net::AMQP::RabbitMQ conn
1949             PREINIT:
1950             struct timeval *timeout_tv;
1951             HV *output;
1952             CODE:
1953 0           timeout_tv = amqp_get_rpc_timeout(conn);
1954 0 0         if (timeout_tv == NULL) {
1955             __DEBUG__( warn("%d get_rpc_timeout: Timeout is NULL, returning undef.", __LINE__) );
1956 0           RETVAL = &PL_sv_undef;
1957             } else {
1958             __DEBUG__( warn("%d get_rpc_timeout: Timeout is non-NULL, returning hashref.", __LINE__) );
1959 0           output = newHV();
1960 0           hv_stores(output, "tv_sec", newSVi64( timeout_tv->tv_sec ));
1961 0           hv_stores(output, "tv_usec", newSVi64( timeout_tv->tv_usec ));
1962 0           RETVAL = newRV_noinc((SV*) output );
1963             }
1964             OUTPUT:
1965             RETVAL
1966              
1967             void net_amqp_rabbitmq__set_rpc_timeout(conn, args = NULL)
1968             Net::AMQP::RabbitMQ conn
1969             SV* args
1970             PREINIT:
1971 0           struct timeval timeout = {0,0};
1972 0           struct timeval *old_timeout = NULL;
1973 0           int tv_sec = 0;
1974 0           int tv_usec = 0;
1975 0 0         int res = 0;
1976             CODE:
1977 0           old_timeout = amqp_get_rpc_timeout(conn);
1978              
1979             /* If we are setting the RPC timeout to NULL... */
1980 0 0         if (args == NULL || !SvOK(args) || args == &PL_sv_undef) {
    0          
    0          
1981             __DEBUG__( warn("%d set_rpc_timeout: No args. Setting to unlimited RPC timeout.", __LINE__) );
1982              
1983 0 0         if (old_timeout != NULL) {
1984             __DEBUG__( warn("%d set_rpc_timeout: Changing to unlimited RPC timeout.", __LINE__) );
1985 0           amqp_set_rpc_timeout( conn, NULL );
1986             }
1987             }
1988              
1989             /* If we are setting the RPC timeout to something other than NULL... */
1990             else {
1991 0 0         int_from_hv((HV*)SvRV(args), tv_sec);
1992 0 0         int_from_hv((HV*)SvRV(args), tv_usec);
1993             __DEBUG__( warn("%d set_rpc_timeout: Setting to tv_sec:%d and tv_usec:%d.", __LINE__, tv_sec, tv_usec) );
1994             /* If we need to allocate the timeout... */
1995              
1996 0           timeout.tv_sec = tv_sec;
1997 0           timeout.tv_usec = tv_usec;
1998 0           die_on_error(aTHX_ amqp_set_rpc_timeout(conn, &timeout), conn, "Set RPC Timeout");
1999             }
2000              
2001             void
2002             net_amqp_rabbitmq_basic_qos(conn, channel, args = NULL)
2003             Net::AMQP::RabbitMQ conn
2004             int channel
2005             HV *args
2006             PREINIT:
2007             SV **v;
2008 0           uint32_t prefetch_size = 0;
2009 0           uint16_t prefetch_count = 0;
2010 0 0         amqp_boolean_t global = 0;
2011             CODE:
2012 0 0         if(args) {
2013 0 0         if(NULL != (v = hv_fetchs(args, "prefetch_size", 0))) prefetch_size = SvIV(*v);
2014 0 0         if(NULL != (v = hv_fetchs(args, "prefetch_count", 0))) prefetch_count = SvIV(*v);
2015 0 0         if(NULL != (v = hv_fetchs(args, "global", 0))) global = SvIV(*v) ? 1 : 0;
2016             }
2017 0           amqp_basic_qos(conn, channel,
2018             prefetch_size, prefetch_count, global);
2019 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Basic QoS");
2020              
2021             void
2022             net_amqp_rabbitmq_confirm_select(conn, channel)
2023             Net::AMQP::RabbitMQ conn
2024             int channel
2025             CODE:
2026 0           amqp_confirm_select(conn, channel);
2027 0           die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn),
2028             conn,
2029             "Confirm Select");
2030              
2031             SV* net_amqp_rabbitmq_publisher_confirm_wait(conn, timeout)
2032             Net::AMQP::RabbitMQ conn
2033             int timeout
2034              
2035             PREINIT:
2036              
2037 0           HV *output = (HV*)NULL;
2038 0 0         struct timeval timeout_tv = {0,0};
2039             amqp_publisher_confirm_t result;
2040             amqp_rpc_reply_t ret;
2041              
2042             CODE:
2043              
2044 0 0         if (timeout > 0) {
2045 0           timeout_tv.tv_sec = timeout / 1000;
2046 0           timeout_tv.tv_usec = (timeout % 1000) * 1000;
2047 0 0         } else if (timeout < 0) {
2048 0           timeout_tv.tv_sec = 0;
2049 0           timeout_tv.tv_usec = 0;
2050             }
2051 0 0         ret = amqp_publisher_confirm_wait(
2052             conn,
2053             timeout ? &timeout_tv : (struct timeval*)NULL,
2054             &result
2055             );
2056              
2057             // This condition is for when there's no method received in
2058             // the allotted time.
2059 0 0         if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
2060 0 0         AMQP_STATUS_TIMEOUT == ret.library_error) {
2061 0           XSRETURN_UNDEF;
2062             } else {
2063 0           die_on_amqp_error(aTHX_ ret, conn, "Publisher Confirm Wait");
2064             }
2065              
2066             // Allocate our output hash
2067 0           output = (HV*)newHV();
2068 0           hv_stores(output, "channel", newSViv(result.channel));
2069 0           switch (result.method) {
2070 0           case AMQP_BASIC_ACK_METHOD:
2071             __DEBUG__(warn("AMQP_BASIC_ACK_METHOD received (%ld).", result.payload.ack.delivery_tag));
2072 0           hv_stores(output,
2073             "method",
2074             newSVpvs("basic.ack"));
2075 0           hv_stores(output,
2076             "delivery_tag",
2077             newSVu64(result.payload.ack.delivery_tag));
2078 0           hv_stores(output,
2079             "multiple",
2080             newSViv(result.payload.ack.multiple));
2081 0           break;
2082              
2083 0           case AMQP_BASIC_NACK_METHOD:
2084             __DEBUG__(warn("AMQP_BASIC_NACK_METHOD received (%ld).", result.payload.nack.delivery_tag));
2085 0           hv_stores(output,
2086             "method",
2087             newSVpvs("basic.nack"));
2088 0           hv_stores(output,
2089             "delivery_tag",
2090             newSVu64(result.payload.nack.delivery_tag));
2091 0           hv_stores(output,
2092             "multiple",
2093             newSViv(result.payload.nack.multiple));
2094 0           hv_stores(output,
2095             "requeue",
2096             newSViv(result.payload.nack.requeue));
2097 0           break;
2098              
2099 0           case AMQP_BASIC_REJECT_METHOD:
2100             __DEBUG__(warn("AMQP_BASIC_REJECT_METHOD received (%ld).", result.payload.reject.delivery_tag));
2101 0           hv_stores(output,
2102             "method",
2103             newSVpvs("basic.reject"));
2104 0           hv_stores(output,
2105             "delivery_tag",
2106             newSViv(result.payload.reject.delivery_tag));
2107 0           hv_stores(output,
2108             "requeue",
2109             newSViv(result.payload.reject.requeue));
2110 0           break;
2111 0           default:
2112 0           Perl_croak(aTHX_ "Unexpected method received waiting for publisher confirm: %s", amqp_method_name(result.method));
2113             };
2114              
2115             // Make our hashref
2116 0           RETVAL = newRV_noinc(MUTABLE_SV(output));
2117             OUTPUT:
2118             RETVAL
2119              
2120             SV* net_amqp_rabbitmq_get_server_properties(conn)
2121             Net::AMQP::RabbitMQ conn
2122             PREINIT:
2123             amqp_table_t* server_properties;
2124             CODE:
2125 0 0         assert_amqp_connected(conn);
    0          
2126 0           server_properties = amqp_get_server_properties(conn);
2127 0 0         if ( server_properties )
2128             {
2129 0           RETVAL = mq_table_to_hashref(server_properties);
2130             }
2131             else
2132             {
2133 0           RETVAL = &PL_sv_undef;
2134             }
2135             OUTPUT:
2136             RETVAL
2137              
2138             SV* net_amqp_rabbitmq_get_client_properties(conn)
2139             Net::AMQP::RabbitMQ conn
2140             PREINIT:
2141             amqp_table_t* client_properties;
2142             CODE:
2143 0 0         assert_amqp_connected(conn);
    0          
2144 0           client_properties = amqp_get_client_properties(conn);
2145 0 0         if ( client_properties )
2146             {
2147 0           RETVAL = mq_table_to_hashref(client_properties);
2148             }
2149             else
2150             {
2151 0           RETVAL = &PL_sv_undef;
2152             }
2153             OUTPUT:
2154             RETVAL
2155              
2156             SV* net_amqp_rabbitmq_has_ssl()
2157             CODE:
2158             #ifdef NAR_HAVE_OPENSSL
2159 1           RETVAL = &PL_sv_yes;
2160             #else
2161             RETVAL = &PL_sv_no;
2162             #endif
2163             OUTPUT:
2164             RETVAL