File Coverage

blib/lib/Net/AMQP/RabbitMQ.pm
Criterion Covered Total %
statement 20 35 57.1
branch 2 12 16.6
condition 2 13 15.3
subroutine 6 8 75.0
pod 3 3 100.0
total 33 71 46.4


line stmt bran cond sub pod time code
1             package Net::AMQP::RabbitMQ;
2 35     35   4424600 use strict;
  35         84  
  35         1472  
3 35     35   393 use warnings;
  35         103  
  35         2649  
4              
5             our $VERSION = '2.40014';
6              
7 35     35   261 use XSLoader;
  35         77  
  35         1769  
8             XSLoader::load "Net::AMQP::RabbitMQ", $VERSION;
9 35     35   197 use Scalar::Util qw(blessed);
  35         137  
  35         27358  
10              
11             # Hash::FieldHash and/or Hash::Util's fieldhash may or may not be available
12             #
13             # On versions earlier than 5.9.4, Hash::Util::FieldHash didn't exist and
14             # hence Hash::Util::fieldhash didn't either.
15             #
16             # We need one of them to fix #151: REQUEST: optionally leave connection alive
17             # in net_amqp_rabbitmq_DESTROY to allow forking
18             #
19             # If neither is found #151 will remain unfixed
20             my $have_fieldhash = eval {
21             require Hash::FieldHash;
22             Hash::FieldHash->import('all');
23             1;
24             } || eval {
25             require Hash::Util;
26             Hash::Util->import('fieldhash');
27             1;
28             };
29              
30             =encoding UTF-8
31              
32             =head1 NAME
33              
34             Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
35              
36             =head1 SYNOPSIS
37              
38             use Net::AMQP::RabbitMQ;
39             my $mq = Net::AMQP::RabbitMQ->new();
40             $mq->connect("localhost", { user => "guest", password => "guest" });
41             $mq->channel_open(1);
42             $mq->queue_declare(1, "queuename");
43             $mq->publish(1, "queuename", "Hi there!");
44             my $gotten = $mq->get(1, "queuename");
45             print $gotten->{body} . "\n";
46             $mq->disconnect();
47              
48             =head1 DESCRIPTION
49              
50             C provides a simple wrapper around the librabbitmq library
51             that allows connecting, declaring exchanges and queues, binding and unbinding
52             queues, publishing, consuming and receiving events.
53              
54             Error handling in this module is primarily achieve by C (die). You
55             should be making good use of C around these methods to ensure that you
56             appropriately catch the errors.
57              
58             =head1 INSTALLATION
59              
60             C or C
61              
62             Note that the C module includes the associated librabbitmq
63             C library. Thus there is no need to install this separately beforehand.
64              
65             =head1 METHODS
66              
67             All methods, unless specifically stated, return nothing on success
68             and die on failure.
69              
70             Failure to be connected is a fatal failure for most methods.
71              
72             =head2 new()
73              
74             Creates a new Net::AMQP::RabbitMQ object.
75              
76             =head2 connect( $hostname, $options )
77              
78             Connect to RabbitMQ server.
79              
80             C<$hostname> is the host to which a connection will be attempted.
81              
82             C<$options> is an optional hash respecting the following keys:
83              
84             {
85             user => $user, #default 'guest'
86             password => $password, #default 'guest'
87             port => $port, #default 5672
88             vhost => $vhost, #default '/'
89             channel_max => $cmax, #default 0
90             frame_max => $fmax, #default 131072
91             heartbeat => $heartbeat, #default 0
92             timeout => $seconds, #default undef (no timeout)
93              
94             ssl => 1 | 0, #default 0
95             ssl_verify_host => 1 | 0, #default 1
96             ssl_cacert => $caert_path, #needed for ssl
97            
98             ssl_cert => $cert_path, #client cert.pem and key.pem when using ssl certificate chains
99             ssl_key => $key_path #(with RabbitMQ's fail_if_no_peer_cert = true)
100             }
101              
102             For now there is no option to disable ssl peer checking, meaning to use C, C is required.
103              
104             B
105              
106             if the connection is cut when using ssl, openssl will throw a C, you should catch this or perl
107             will exit with error code 141
108              
109             $SIG{PIPE} = 'IGNORE';
110              
111             =head2 disconnect()
112              
113             Disconnect from the RabbitMQ server.
114              
115             =head2 get_server_properties()
116              
117             Get a reference to hash (hashref) of server properties. These may vary, you should use C to inspect. Properties will be provided for the RabbitMQ server to which you are connected.
118              
119             =head2 get_client_properties()
120              
121             Get a reference to hash (hashref) of client properties. These may vary, you should use C to inspect.
122              
123             =head2 is_connected()
124              
125             Returns true if a valid socket connection appears to exist, false otherwise.
126              
127             =head2 channel_open($channel)
128              
129             Open an AMQP channel on the connection.
130              
131             C<$channel> is a positive integer describing the channel you which to open.
132              
133             =head2 channel_close($channel)
134              
135             Close the specified channel.
136              
137             C<$channel> is a positive integer describing the channel you which to close.
138              
139             =head2 get_channel_max()
140              
141             Returns the maximum allowed channel number.
142              
143             =head2 exchange_declare($channel, $exchange, $options, $arguments)
144              
145             Declare an AMQP exchange on the RabbitMQ server unless it already exists. Bad
146             things will happen if the exchange name already exists and different parameters
147             are provided.
148              
149             C<$channel> is a channel that has been opened with C.
150              
151             C<$exchange> is the name of the exchange to be instantiated.
152              
153             C<$options> is an optional hash respecting the following keys:
154              
155             {
156             exchange_type => $type, #default 'direct'
157             passive => $boolean, #default 0
158             durable => $boolean, #default 0
159             auto_delete => $boolean, #default 0
160             }
161              
162             Note that the default for the C option is different for C and for C.
163              
164             C<$arguments> is an optional hash of additional arguments to the RabbitMQ server, such as:
165              
166             {
167             # exchange to try if no routes apply on this exchange
168             alternate_exchange => 'alternate_exchange_name',
169             }
170              
171             =head2 exchange_delete($channel, $exchange, $options)
172              
173             Delete a AMQP exchange on the RabbitMQ server.
174              
175             C<$channel> is a channel that has been opened with C.
176              
177             C<$exchange> is the name of the exchange to be deleted.
178              
179             C<$options> is an optional hash respecting the following keys:
180              
181             {
182             if_unused => $boolean, #default 1
183             }
184              
185             =head2 exchange_bind($channel, $destination, $source, $routing_key, $arguments)
186              
187             Bind a source exchange to a destination exchange with a given routing key and/or parameters.
188              
189             C<$channel> is a channel that has been opened with C.
190              
191             C<$destination> is a previously declared exchange, C<$source> is
192             yet another previously declared exchange, and C<$routing_key> is
193             the routing key that will bind the specified source exchange to
194             the specified destination exchange.
195              
196             C<$arguments> is an optional hash which will be passed to the server. When
197             binding to an exchange of type C, this can be used to only receive
198             messages with the supplied header values.
199              
200             =head2 exchange_unbind($channel, $destination, $source, $routing_key, $arguments)
201              
202             Remove a binding between source and destination exchanges.
203              
204             C<$channel> is a channel that has been opened with C.
205              
206             C<$destination> is a previously declared exchange, C<$source> is
207             yet another previously declared exchange, and C<$routing_key> is
208             the routing key that will B the specified source exchange from
209             the specified destination exchange.
210              
211             C<$arguments> is an optional hash which will be passed to the server. When
212             binding to an exchange of type C, this can be used to only receive
213             messages with the supplied header values.
214              
215             =head2 queue_declare($channel, $queuename, $options, $arguments)
216              
217             Declare an AMQP queue on the RabbitMQ server.
218              
219             In scalar context, this method returns the queuename declared
220             (important for retrieving the auto-generated queuename in the
221             event that one was requested).
222              
223             In array context, this method returns three items: queuename,
224             the number of message waiting on the queue, and the number
225             of consumers bound to the queue.
226              
227             C<$channel> is a channel that has been opened with C.
228              
229             C<$queuename> is the name of the queuename to be instantiated. If
230             C<$queuename> is undef or an empty string, then an auto generated
231             queuename will be used.
232              
233             C<$options> is an optional hash respecting the following keys:
234              
235             {
236             passive => $boolean, #default 0
237             durable => $boolean, #default 0
238             exclusive => $boolean, #default 0
239             auto_delete => $boolean, #default 1
240             }
241              
242             Note that the default for the C option is different for C and for C.
243              
244             C<$arguments> is an optional hash which will be passed to the server
245             when the queue is created. This can be used for creating mirrored
246             queues by using the x-ha-policy header.
247              
248             =head2 queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
249              
250             Bind the specified queue to the specified exchange with a routing key.
251              
252             C<$channel> is a channel that has been opened with C.
253              
254             C<$queuename> is a previously declared queue, C<$exchange> is a
255             previously declared exchange, and C<$routing_key> is the routing
256             key that will bind the specified queue to the specified exchange.
257              
258             C<$arguments> is an optional hash which will be passed to the server. When
259             binding to an exchange of type C, this can be used to only receive
260             messages with the supplied header values.
261              
262             =head2 queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
263              
264             Remove a binding between a queue and an exchange. If this fails, you must reopen the channel.
265              
266             This is like the C with respect to arguments. This command unbinds
267             the queue from the exchange. The C<$routing_key> and C<$arguments> must match
268             the values supplied when the binding was created.
269              
270             =head2 queue_delete($channel, $queuename, $options)
271              
272             Delete a specified queue. If this fails, you must reopen the channel.
273              
274             C<$options> is an optional hash respecting the following keys:
275              
276             {
277             if_unused => $boolean, #default 1
278             if_empty => $boolean, #default 1
279             }
280              
281             =head2 publish($channel, $routing_key, $body, $options, $props)
282              
283             Publish a message to an exchange.
284              
285             C<$channel> is a channel that has been opened with C.
286              
287             C<$routing_key> is the name of the routing key for this message.
288              
289             C<$body> is the payload to enqueue.
290              
291             C<$options> is an optional hash respecting the following keys:
292              
293             {
294             exchange => $exchange, #default 'amq.direct'
295             mandatory => $boolean, #default 0
296             immediate => $boolean, #default 0
297             force_utf8_in_header_strings => $boolean, #default 0
298             }
299              
300             The C option causes all headers which look like
301             strings to be treated as UTF-8. In an attempt to make this a non-breaking change,
302             this option is disabled by default. However, for all headers beginning with
303             C, those are treated as UTF-8 regardless of this option (per spec).
304              
305             C<$props> is an optional hash (the AMQP 'props') respecting the following keys:
306              
307             {
308             content_type => $string,
309             content_encoding => $string,
310             correlation_id => $string,
311             reply_to => $string,
312             expiration => $string,
313             message_id => $string,
314             type => $string,
315             user_id => $string,
316             app_id => $string,
317             delivery_mode => $integer,
318             priority => $integer,
319             timestamp => $integer,
320             headers => $headers # This should be a hashref of keys and values.
321             }
322              
323             =head2 consume($channel, $queuename, $options)
324              
325             Put the channel into consume mode.
326              
327             The C is returned. This command does B return AMQP
328             messages, for that the C method should be used.
329              
330             C<$channel> is a channel that has been opened with C.
331              
332             C<$queuename> is the name of the queue from which we'd like to consume.
333              
334             C<$options> is an optional hash respecting the following keys:
335              
336             {
337             consumer_tag => $tag, #absent by default
338             no_local => $boolean, #default 0
339             no_ack => $boolean, #default 1
340             exclusive => $boolean, #default 0
341             }
342              
343             =head2 recv($timeout)
344              
345             Receive AMQP messages.
346              
347             This method returns a reference to a hash (hashref) containing the following information:
348              
349             {
350             body => 'Magic Transient Payload', # the reconstructed body
351             routing_key => 'nr_test_q', # route the message took
352             exchange => 'nr_test_x', # exchange used
353             delivery_tag => 1, # (used for acks)
354             redelivered => $boolean # if message is redelivered
355             consumer_tag => 'c_tag', # tag from consume()
356             props => $props, # hashref sent in
357             }
358              
359             C<$props> is the hash sent by publish() respecting the following keys:
360              
361             {
362             content_type => $string,
363             content_encoding => $string,
364             correlation_id => $string,
365             reply_to => $string,
366             expiration => $string,
367             message_id => $string,
368             type => $string,
369             user_id => $string,
370             app_id => $string,
371             delivery_mode => $integer,
372             priority => $integer,
373             timestamp => $integer,
374             }
375              
376             C<$timeout> is a positive integer, specifying the number of milliseconds to
377             wait for a message. If you do not provide a timeout (or set it to 0), then
378             this call will block until it receives a message. If you set it to -1 it will
379             return immediately (waiting 0 ms).
380              
381             If you provide a timeout, then the C method returns C if the
382             timeout expires before a message is received from the server.
383              
384             =head2 cancel($channel, $consumer_tag)
385              
386             Take the channel out of consume mode previously enabled with C.
387              
388             This method returns true or false indicating whether we got the expected
389             "cancel-ok" response from the server.
390              
391             C<$channel> is a channel that has been opened with C.
392              
393             C<$consumer_tag> is a tag previously passed to C or one that was
394             generated automatically as a result of calling C without an
395             explicit tag.
396              
397             =head2 get($channel, $queuename, $options)
398              
399             Get a message from the specified queue (via C).
400              
401             The method returns C immediately if no messages are available
402             on the queue. If a message is available a reference to a hash (hashref) is
403             returned with the following contents:
404              
405             {
406             body => 'Magic Transient Payload', # the reconstructed body
407             routing_key => 'nr_test_q', # route the message took
408             exchange => 'nr_test_x', # exchange used
409             content_type => 'foo', # (only if specified)
410             delivery_tag => 1, # (used for acks)
411             redelivered => 0, # if message is redelivered
412             message_count => 0, # message count
413              
414             # Not all of these will be present. Consult the RabbitMQ reference for more details.
415             props => {
416             content_type => 'text/plain',
417             content_encoding => 'none',
418             correlation_id => '123',
419             reply_to => 'somequeue',
420             expiration => 1000,
421             message_id => 'ABC',
422             type => 'notmytype',
423             user_id => 'guest',
424             app_id => 'idd',
425             delivery_mode => 1,
426             priority => 2,
427             timestamp => 1271857990,
428             headers => {
429             unsigned_integer => 12345,
430             signed_integer => -12345,
431             double => 3.141,
432             string => "string here",
433              
434             # The x-death header is a special header for dead-lettered messages (rejected or timed out).
435             'x-death' => [
436             {
437             time => 1271857954,
438             exchange => $exchange,
439             queue => $exchange,
440             reason => 'expired',
441             'routing-keys' => [q{}],
442             },
443             ],
444             },
445             },
446             }
447              
448             C<$channel> is a channel that has been opened with C.
449              
450             C<$queuename> is the name of the queue from which we'd like to consume.
451              
452             C<$options> is an optional hash respecting the following keys:
453              
454             {
455             no_ack => $boolean, #default 1
456             }
457              
458             =head2 ack($channel, $delivery_tag, $multiple = 0)
459              
460             Acknowledge a message.
461              
462             C<$channel> is a channel that has been opened with C.
463              
464             C<$delivery_tag> the delivery tag seen from a returned message from the
465             C method.
466              
467             C<$multiple> specifies if multiple are to be acknowledged at once. If C<$multiple> is non-zero, the broker will operate on all messages delivered with a delivery tag less than or equal to C<$delivery_tag>.
468              
469             =head2 nack($channel, $delivery_tag, $multiple = 0)
470              
471             Negatively acknowledge a message.
472              
473             C<$channel> is a channel that has been opened with C.
474              
475             C<$delivery_tag> the delivery tag seen from a returned message from the
476             C method.
477              
478             C<$multiple> specifies if multiple are to be acknowledged at once. If C<$multiple> is non-zero, the broker will operate on all messages delivered with a delivery tag less than or equal to C<$delivery_tag>.
479              
480             =head2 reject($channel, $delivery_tag, $requeue = 0)
481              
482             Reject a message with the specified delivery tag.
483              
484             C<$channel> is a channel that has been opened with C.
485              
486             C<$delivery_tag> the delivery tag seen from a returned message from the
487             C method.
488              
489             C<$requeue> specifies if the message should be requeued.
490              
491             =head2 purge($channel, $queuename)
492              
493             Purge all messages from the specified queue.
494              
495             C<$channel> is a channel that has been opened with C.
496              
497             C<$queuename> is the queue to be purged.
498              
499             =head2 tx_select($channel)
500              
501             Start a server-side (tx) transaction over $channel.
502              
503             C<$channel> is a channel that has been opened with C.
504              
505             =head2 tx_commit($channel)
506              
507             Commit a server-side (tx) transaction over $channel.
508              
509             C<$channel> is a channel that has been opened with C.
510              
511             =head2 tx_rollback($channel)
512              
513             Rollback a server-side (tx) transaction over $channel.
514              
515             C<$channel> is a channel that has been opened with C.
516              
517             =head2 get_rpc_timeout()
518              
519             Return the RPC timeout on the current connection.
520              
521             The value returned will be either C, if the RPC timeout is
522             unlimited, or a hashref with C for the number of seconds and
523             C for the number of microseconds.
524              
525             =head2 set_rpc_timeout({ tv_sec => SECONDS, tv_usec => MICROSECONDS })
526              
527             Set the RPC timeout for the current connection, using the seconds
528             (C) and microseconds (C) provided. The arguments
529             supplied can be either in the form of a hash or a hashref, so all of
530             the following are valid:
531              
532             $mq->set_rpc_timeout(tv_sec => 10, tv_usec => 500000)
533             $mq->set_rpc_timeout( { tv_sec => 10, tv_usec => 500000 } )
534             $mq->set_rpc_timeout(tv_sec => 10)
535             $mq->set_rpc_timeout(tv_usec => 500000)
536              
537             In order to remove the time limit for RPC calls, simply pass C.
538              
539             $mq->set_rpc_timeout( undef )
540              
541             =head2 basic_qos($channel, $options)
542              
543             Set quality of service flags on the current $channel.
544              
545             C<$channel> is a channel that has been opened with C.
546              
547             C<$options> is an optional hash respecting the following keys:
548              
549             {
550             prefetch_count => $cnt, #default 0
551             prefetch_size => $size, #default 0
552             global => $bool, #default 0
553             }
554              
555             =head2 heartbeat()
556              
557             Send a heartbeat. If you've connected with a heartbeat parameter,
558             you must send a heartbeat periodically matching connection parameter or
559             the server may snip the connection.
560              
561             Note that since C blocks for up to C<$timeout> milliseconds,
562             it automatically handles sending heartbeats for you while active.
563              
564             =head2 has_ssl
565              
566             Returns true if the module was compiled with SSL support, false otherwise
567              
568             =head2 confirm_select($channel)
569              
570             Put the C<$channel> into select mode so that publisher confirmations will be sent by the broker.
571              
572             C<$channel> is the channel number you wish to put into select mode.
573              
574             Note that there is presently no way to disable select mode on a channel, so in order to cancel select mode you will need to close the channel and open another one.
575              
576             =head2 publisher_confirm_wait($timeout)
577              
578             Wait for a publisher confirm from the broker. If no publisher confirm has appeared before the timeout expires, C is returned.
579              
580             C<$timeout> is an E representing the amount of time, in seconds, to wait for a confirmation. If a positive timeout is not specified or is specified as zero, this call will block until a response is received. If you specify a negative value for the timeout, it will time out immediately.
581              
582             When a response is received, a hashref will be returned in the appropriate format for the method returned.
583              
584             For a `basic.ack` response:
585              
586             {
587             channel => 2,
588             method => 'basic.ack',
589             delivery_tag => 12,
590             multiple => 0,
591             }
592              
593             For a `basic.nack` response:
594              
595             {
596             channel => 2,
597             method => 'basic.nack',
598             delivery_tag => 12,
599             multiple => 0,
600             requeue => 1,
601             }
602              
603             For a `basic.reject` response:
604              
605             {
606             channel => 2,
607             method => 'basic.reject',
608             delivery_tag => 12,
609             requeue => 1,
610             }
611              
612             =over 4
613              
614             =item C
615              
616             This is the channel for which the publisher confirmation was received.
617              
618             =item C
619              
620             The method received from the broker, which will always be one of C, C, or C.
621              
622             =item C
623              
624             A numeric value identifying a message. This is a sequential integer set by the broker for messages delivered in order.
625              
626             For example, if you publish one message, that message will have C of C. When you publish another message, that message will have a C of C.
627              
628             =item C
629              
630             Both C and C can be sent once for multiple messages. This boolean field, when true, indicates that B messages up to the current C since the last response have been confirmed using the same message.
631              
632             So if you publish three messages, and you get a single C with the C field set to C<1>, then you know that all three of those messages have confirmed with the same method.
633              
634             =item C
635              
636             For both C and C, a message can be requeued by whichever consumer received the message. If you receive a confirmation with this set to C<1>, then you know that the message(s) have already been requeued.
637              
638             =back
639              
640             =head1 WARNING AND ERROR MESSAGES
641              
642             =head2 Fatal Errors
643              
644             It should be noted that almost all errors in this library are considered fatal,
645             insomuch as they trigger a C. In these errors, if it appears that somehow
646             the connection has been closed by the remote host, or otherwise invalidated,
647             the socket will also be closed and should be re-opened before any additional
648             calls are made.
649              
650             =head1 EXAMPLES
651              
652             =head2 Simple publisher
653              
654             use Net::AMQP::RabbitMQ;
655              
656             my $channel = 1;
657             my $exchange = "MyExchange.x"; # This exchange must exist already
658             my $routing_key = "foobar";
659              
660             my $mq = Net::AMQP::RabbitMQ->new();
661             $mq->connect("localhost", { user => "guest", password => "guest" });
662             $mq->channel_open(1);
663             $mq->publish($channel, $routing_key, "Message Here", { exchange => $exchange });
664             $mq->disconnect();
665              
666             =head2 Simple consumer
667              
668             use Net::AMQP::RabbitMQ;
669             use Data::Dumper;
670              
671             my $channel = 1;
672             my $exchange = "MyExchange.x"; # This exchange must exist already
673             my $routing_key = "foobar";
674              
675             my $mq = Net::AMQP::RabbitMQ->new();
676             $mq->connect("localhost", { user => "guest", password => "guest" });
677             $mq->channel_open($channel);
678              
679             # Declare queue, letting the server auto-generate one and collect the name
680             my $queuename = $mq->queue_declare($channel, "");
681              
682             # Bind the new queue to the exchange using the routing key
683             $mq->queue_bind($channel, $queuename, $exchange, $routing_key);
684              
685             # Request that messages be sent and receive them until interrupted
686             $mq->consume($channel, $queuename);
687              
688             while ( my $message = $mq->recv(0) )
689             {
690             print "Received message:\n";
691             print Dumper($message);
692             }
693              
694             $mq->disconnect();
695              
696             =head2 Using QOS
697              
698             use Net::AMQP::RabbitMQ;
699             my $channel = 1;
700             my $exchange = "MyExchange.x"; # This exchange must exist already
701             my $routing_key = "foobar";
702              
703             my $mq = Net::AMQP::RabbitMQ->new();
704             $mq->connect("localhost", { user => "guest", password => "guest" });
705             $mq->channel_open($channel);
706              
707             # Prefetch 5 messages per window
708             $mq->basic_qos( $channel, { prefetch_count => 5 });
709              
710             =head1 RUNNING THE TEST SUITE
711              
712             This module is tested with private RabbitMQ services, and for security and
713             compliance reasons it is no longer possible to expose this to the public.
714              
715             You can create your own free instance to use with testing at
716             L.
717              
718             There are separate variables for the ssl and none ssl host/user/password/port,
719             as well as the admin capabilities. In order to run the full test suite, you
720             must have the management module enabled.
721              
722             B The full set of tests (especially the C tests) can take
723             quite some time, and may only work on GNU/Linux environments. By "quite some
724             time," I mean that they may take more than two hours depending on your RMQ
725             server's capacity.
726              
727             These are the environment variables which control test behavior:
728              
729             =over 4
730              
731             =item MQHOST
732              
733             Hostname or IP address of the RabbitMQ server to connect to.
734              
735             =item MQUSERNAME
736              
737             Username for authentication.
738              
739             =item MQPASSWORD
740              
741             Password for authentication.
742              
743             =item MQPORT
744              
745             Port of the RabbitMQ server to connect to (defaults to 5672)
746              
747             =item MQVHOST
748              
749             Vhost to use.
750              
751             =item MQSSL
752              
753             Whether the tests should run with SSL enabled (defaults to false, but
754             see also C).
755              
756             =item MQSKIPSSL
757              
758             Whether the SSL tests should be skipped entirely. This option exists
759             because the SSL tests used to ignore C, and to maintain
760             backwards compatibility, still do.
761              
762             =item MQSSLHOST
763              
764             Hostname or IP address of the RabbitMQ server to connect to.
765              
766             =item MQSSLUSERNAME
767              
768             Username for authentication.
769              
770             =item MQSSLPASSWORD
771              
772             Password for authentication.
773              
774             =item MQSSLPORT
775              
776             Port of the RabbitMQ server to connect to (defaults to 5671)
777              
778             =item MQSSLCACERT
779              
780             Path to the certificate file for SSL-enabled connections.
781              
782             =item MQSSLVERIFYHOST
783              
784             Whether SSL hostname verification should be enabled (defaults to
785             true).
786              
787             =item MQSSLVHOST
788              
789             Vhost to use when in SSL mode.
790              
791             =item MQADMINPROTOCOL
792              
793             Protocol to use for accessing the admin. Defaults to https
794              
795             =item MQADMINPORT
796              
797             Port to use for accessing the admin interface. Defaults to 443
798              
799             =item MQADMINCACERT
800              
801             CA certificate to use for the admin port. There is no default.
802              
803             =back
804              
805             =head1 VERSION COMPATIBILITY
806              
807             This module was forked from L version 0.2.6 which uses an older
808             version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ.
809             The main change between this module and the original is this library uses
810             a newer, unforked, version of librabbitmq.
811              
812             This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ
813             version 2+. Also, since the version of librabbitmq used is not a custom fork, it
814             means this module doesn't support the basic_return callback method.
815              
816             This module has been tested with OpenSSL up to version 3.3.1.
817              
818             Please note that legacy versions of OpenSSL may or may not work, but are indeed unsupported. Only currently-supported versions of OpenSSL will be supported.
819              
820             =head1 AUTHORS
821              
822             Mike "manchicken" Stemle, Jr. Ehello@mikestemle.comE
823              
824             Theo Schlossnagle Ejesus@omniti.comE
825              
826             Mark Ellis Emarkellis@cpan.orgE
827              
828             Dave Rolsky Eautarch@urth.orgE
829              
830             Slaven Rezić
831              
832             Armand Leclercq
833              
834             Daniel W Burke
835              
836             Dávid Kovács
837              
838             Alexey Sheynuk
839              
840             Karen Etheridge Eether@cpan.orgE
841              
842             Eric Brine Eikegami@cpan.orgE
843              
844             Peter Valdemar Mørch Epmorch@cpan.orgE
845              
846             =head1 LICENSE
847              
848             This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.
849              
850             librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.
851              
852             =cut
853              
854             # Since we can't store the PID in $self, which is a amqp_connection_state_t, we
855             # store the pid for $self in $pids{$self}.
856             # (See L).
857              
858             my %pids;
859             if ($have_fieldhash) {
860             fieldhash(%pids);
861             }
862              
863             sub new {
864 33     33 1 7728571 my $class = shift;
865 33         3867 my $self = $class->_new(@_);
866 33 50       612 $pids{$self} = $$
867             if $have_fieldhash;
868 33         184 return $self;
869             }
870              
871             sub publish {
872 0     0 1 0 my ( $self, $channel, $routing_key, $body, $options, $props ) = @_;
873              
874 0   0     0 $options ||= {};
875 0   0     0 $props ||= {};
876              
877             # Do a shallow clone to avoid modifying variable passed by caller
878 0         0 $props = {%$props};
879              
880             # Convert blessed variables in headers to strings
881 0 0       0 if ( $props->{headers} ) {
882             $props->{headers} =
883 0 0       0 { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
  0         0  
  0         0  
884             }
885              
886 0         0 $self->_publish( $channel, $routing_key, $body, $options, $props );
887             }
888              
889             sub set_rpc_timeout {
890 0     0 1 0 my ( $self, @opts ) = @_;
891              
892 0         0 my $args = undef;
893              
894             # Be kind on whether or not we receive a hashref
895             # or an actual hash.
896 0 0 0     0 if ( ( scalar @opts % 2 ) == 0 ) {
    0          
897 0         0 $args = {@opts};
898             }
899             elsif ( scalar @opts == 1 && defined $opts[0] ) {
900 0         0 $args = $opts[0];
901             }
902              
903 0         0 return $self->_set_rpc_timeout($args);
904             }
905              
906             sub DESTROY {
907 33     33   3315333 my ($self) = @_;
908 33 50 33     596 if ( !$have_fieldhash || $pids{$self} && $pids{$self} == $$ ) {
      33        
909 33         312 $self->_destroy_connection_close;
910 33         5169 $self->_destroy_cleanup;
911             }
912             }
913              
914             1;