File Coverage

blib/lib/Net/AMQP/RabbitMQ.pm
Criterion Covered Total %
statement 35 35 100.0
branch 11 12 91.6
condition 9 13 69.2
subroutine 8 8 100.0
pod 3 3 100.0
total 66 71 92.9


line stmt bran cond sub pod time code
1             package Net::AMQP::RabbitMQ;
2 34     34   1999040 use strict;
  34         311  
  34         762  
3 34     34   165 use warnings;
  34         61  
  34         1135  
4              
5             our $VERSION = '2.40010';
6              
7 34     34   192 use XSLoader;
  34         76  
  34         1108  
8             XSLoader::load "Net::AMQP::RabbitMQ", $VERSION;
9 34     34   187 use Scalar::Util qw(blessed);
  34         71  
  34         17281  
10              
11             # Hash::FieldHash and/or Hash::Util's fieldhash may or may not be available
12             #
13             # On versions earlier than 5.9.4, Hash::Util::FieldHash didn't exist and
14             # hence Hash::Util::fieldhash didn't either.
15             #
16             # We need one of them to fix #151: REQUEST: optionally leave connection alive
17             # in net_amqp_rabbitmq_DESTROY to allow forking
18             #
19             # If neither is found #151 will remain unfixed
20             my $have_fieldhash = eval {
21             require Hash::FieldHash;
22             Hash::FieldHash->import('all');
23             1;
24             } || eval {
25             require Hash::Util;
26             Hash::Util->import('fieldhash');
27             1;
28             };
29              
30             =encoding UTF-8
31              
32             =head1 NAME
33              
34             Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
35              
36             =head1 SYNOPSIS
37              
38             use Net::AMQP::RabbitMQ;
39             my $mq = Net::AMQP::RabbitMQ->new();
40             $mq->connect("localhost", { user => "guest", password => "guest" });
41             $mq->channel_open(1);
42             $mq->queue_declare(1, "queuename");
43             $mq->publish(1, "queuename", "Hi there!");
44             my $gotten = $mq->get(1, "queuename");
45             print $gotten->{body} . "\n";
46             $mq->disconnect();
47              
48             =head1 DESCRIPTION
49              
50             C provides a simple wrapper around the librabbitmq library
51             that allows connecting, declaring exchanges and queues, binding and unbinding
52             queues, publishing, consuming and receiving events.
53              
54             Error handling in this module is primarily achieve by C (die). You
55             should be making good use of C around these methods to ensure that you
56             appropriately catch the errors.
57              
58             =head1 INSTALLATION
59              
60             C or C
61              
62             Note that the C module includes the associated librabbitmq
63             C library. Thus there is no need to install this separately beforehand.
64              
65             =head1 METHODS
66              
67             All methods, unless specifically stated, return nothing on success
68             and die on failure.
69              
70             Failure to be connected is a fatal failure for most methods.
71              
72             =head2 new()
73              
74             Creates a new Net::AMQP::RabbitMQ object.
75              
76             =head2 connect( $hostname, $options )
77              
78             Connect to RabbitMQ server.
79              
80             C<$hostname> is the host to which a connection will be attempted.
81              
82             C<$options> is an optional hash respecting the following keys:
83              
84             {
85             user => $user, #default 'guest'
86             password => $password, #default 'guest'
87             port => $port, #default 5672
88             vhost => $vhost, #default '/'
89             channel_max => $cmax, #default 0
90             frame_max => $fmax, #default 131072
91             heartbeat => $heartbeat, #default 0
92             timeout => $seconds, #default undef (no timeout)
93              
94             ssl => 1 | 0, #default 0
95             ssl_verify_host => 1 | 0, #default 1
96             ssl_cacert => $caert_path, #needed for ssl
97             ssl_init => 1 | 0, #default 1, initialise the openssl library
98            
99             ssl_cert => $cert_path, #client cert.pem and key.pem when using ssl certificate chains
100             ssl_key => $key_path #(with RabbitMQ's fail_if_no_peer_cert = true)
101             }
102              
103             You probably don't want to touch C, unless you know what it does.
104              
105             For now there is no option to disable ssl peer checking, meaning to use C, C is required.
106              
107             B
108              
109             if the connection is cut when using ssl, openssl will throw a C, you should catch this or perl
110             will exit with error code 141
111              
112             $SIG{PIPE} = 'IGNORE';
113              
114             =head2 disconnect()
115              
116             Disconnect from the RabbitMQ server.
117              
118             =head2 get_server_properties()
119              
120             Get a reference to hash (hashref) of server properties. These may vary, you should use C to inspect. Properties will be provided for the RabbitMQ server to which you are connected.
121              
122             =head2 get_client_properties()
123              
124             Get a reference to hash (hashref) of client properties. These may vary, you should use C to inspect.
125              
126             =head2 is_connected()
127              
128             Returns true if a valid socket connection appears to exist, false otherwise.
129              
130             =head2 channel_open($channel)
131              
132             Open an AMQP channel on the connection.
133              
134             C<$channel> is a positive integer describing the channel you which to open.
135              
136             =head2 channel_close($channel)
137              
138             Close the specified channel.
139              
140             C<$channel> is a positive integer describing the channel you which to close.
141              
142             =head2 get_channel_max()
143              
144             Returns the maximum allowed channel number.
145              
146             =head2 exchange_declare($channel, $exchange, $options, $arguments)
147              
148             Declare an AMQP exchange on the RabbitMQ server unless it already exists. Bad
149             things will happen if the exchange name already exists and different parameters
150             are provided.
151              
152             C<$channel> is a channel that has been opened with C.
153              
154             C<$exchange> is the name of the exchange to be instantiated.
155              
156             C<$options> is an optional hash respecting the following keys:
157              
158             {
159             exchange_type => $type, #default 'direct'
160             passive => $boolean, #default 0
161             durable => $boolean, #default 0
162             auto_delete => $boolean, #default 0
163             }
164              
165             Note that the default for the C option is different for C and for C.
166              
167             C<$arguments> is an optional hash of additional arguments to the RabbitMQ server, such as:
168              
169             {
170             # exchange to try if no routes apply on this exchange
171             alternate_exchange => 'alternate_exchange_name',
172             }
173              
174             =head2 exchange_delete($channel, $exchange, $options)
175              
176             Delete a AMQP exchange on the RabbitMQ server.
177              
178             C<$channel> is a channel that has been opened with C.
179              
180             C<$exchange> is the name of the exchange to be deleted.
181              
182             C<$options> is an optional hash respecting the following keys:
183              
184             {
185             if_unused => $boolean, #default 1
186             }
187              
188             =head2 exchange_bind($channel, $destination, $source, $routing_key, $arguments)
189              
190             Bind a source exchange to a destination exchange with a given routing key and/or parameters.
191              
192             C<$channel> is a channel that has been opened with C.
193              
194             C<$destination> is a previously declared exchange, C<$source> is
195             yet another previously declared exchange, and C<$routing_key> is
196             the routing key that will bind the specified source exchange to
197             the specified destination exchange.
198              
199             C<$arguments> is an optional hash which will be passed to the server. When
200             binding to an exchange of type C, this can be used to only receive
201             messages with the supplied header values.
202              
203             =head2 exchange_unbind($channel, $destination, $source, $routing_key, $arguments)
204              
205             Remove a binding between source and destination exchanges.
206              
207             C<$channel> is a channel that has been opened with C.
208              
209             C<$destination> is a previously declared exchange, C<$source> is
210             yet another previously declared exchange, and C<$routing_key> is
211             the routing key that will B the specified source exchange from
212             the specified destination exchange.
213              
214             C<$arguments> is an optional hash which will be passed to the server. When
215             binding to an exchange of type C, this can be used to only receive
216             messages with the supplied header values.
217              
218             =head2 queue_declare($channel, $queuename, $options, $arguments)
219              
220             Declare an AMQP queue on the RabbitMQ server.
221              
222             In scalar context, this method returns the queuename declared
223             (important for retrieving the auto-generated queuename in the
224             event that one was requested).
225              
226             In array context, this method returns three items: queuename,
227             the number of message waiting on the queue, and the number
228             of consumers bound to the queue.
229              
230             C<$channel> is a channel that has been opened with C.
231              
232             C<$queuename> is the name of the queuename to be instantiated. If
233             C<$queuename> is undef or an empty string, then an auto generated
234             queuename will be used.
235              
236             C<$options> is an optional hash respecting the following keys:
237              
238             {
239             passive => $boolean, #default 0
240             durable => $boolean, #default 0
241             exclusive => $boolean, #default 0
242             auto_delete => $boolean, #default 1
243             }
244              
245             Note that the default for the C option is different for C and for C.
246              
247             C<$arguments> is an optional hash which will be passed to the server
248             when the queue is created. This can be used for creating mirrored
249             queues by using the x-ha-policy header.
250              
251             =head2 queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
252              
253             Bind the specified queue to the specified exchange with a routing key.
254              
255             C<$channel> is a channel that has been opened with C.
256              
257             C<$queuename> is a previously declared queue, C<$exchange> is a
258             previously declared exchange, and C<$routing_key> is the routing
259             key that will bind the specified queue to the specified exchange.
260              
261             C<$arguments> is an optional hash which will be passed to the server. When
262             binding to an exchange of type C, this can be used to only receive
263             messages with the supplied header values.
264              
265             =head2 queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
266              
267             Remove a binding between a queue and an exchange. If this fails, you must reopen the channel.
268              
269             This is like the C with respect to arguments. This command unbinds
270             the queue from the exchange. The C<$routing_key> and C<$arguments> must match
271             the values supplied when the binding was created.
272              
273             =head2 queue_delete($channel, $queuename, $options)
274              
275             Delete a specified queue. If this fails, you must reopen the channel.
276              
277             C<$options> is an optional hash respecting the following keys:
278              
279             {
280             if_unused => $boolean, #default 1
281             if_empty => $boolean, #default 1
282             }
283              
284             =head2 publish($channel, $routing_key, $body, $options, $props)
285              
286             Publish a message to an exchange.
287              
288             C<$channel> is a channel that has been opened with C.
289              
290             C<$routing_key> is the name of the routing key for this message.
291              
292             C<$body> is the payload to enqueue.
293              
294             C<$options> is an optional hash respecting the following keys:
295              
296             {
297             exchange => $exchange, #default 'amq.direct'
298             mandatory => $boolean, #default 0
299             immediate => $boolean, #default 0
300             force_utf8_in_header_strings => $boolean, #default 0
301             }
302              
303             The C option causes all headers which look like
304             strings to be treated as UTF-8. In an attempt to make this a non-breaking change,
305             this option is disabled by default. However, for all headers beginning with
306             C, those are treated as UTF-8 regardless of this option (per spec).
307              
308             C<$props> is an optional hash (the AMQP 'props') respecting the following keys:
309              
310             {
311             content_type => $string,
312             content_encoding => $string,
313             correlation_id => $string,
314             reply_to => $string,
315             expiration => $string,
316             message_id => $string,
317             type => $string,
318             user_id => $string,
319             app_id => $string,
320             delivery_mode => $integer,
321             priority => $integer,
322             timestamp => $integer,
323             headers => $headers # This should be a hashref of keys and values.
324             }
325              
326             =head2 consume($channel, $queuename, $options)
327              
328             Put the channel into consume mode.
329              
330             The C is returned. This command does B return AMQP
331             messages, for that the C method should be used.
332              
333             C<$channel> is a channel that has been opened with C.
334              
335             C<$queuename> is the name of the queue from which we'd like to consume.
336              
337             C<$options> is an optional hash respecting the following keys:
338              
339             {
340             consumer_tag => $tag, #absent by default
341             no_local => $boolean, #default 0
342             no_ack => $boolean, #default 1
343             exclusive => $boolean, #default 0
344             }
345              
346             =head2 recv($timeout)
347              
348             Receive AMQP messages.
349              
350             This method returns a reference to a hash (hashref) containing the following information:
351              
352             {
353             body => 'Magic Transient Payload', # the reconstructed body
354             routing_key => 'nr_test_q', # route the message took
355             exchange => 'nr_test_x', # exchange used
356             delivery_tag => 1, # (used for acks)
357             redelivered => $boolean # if message is redelivered
358             consumer_tag => 'c_tag', # tag from consume()
359             props => $props, # hashref sent in
360             }
361              
362             C<$props> is the hash sent by publish() respecting the following keys:
363              
364             {
365             content_type => $string,
366             content_encoding => $string,
367             correlation_id => $string,
368             reply_to => $string,
369             expiration => $string,
370             message_id => $string,
371             type => $string,
372             user_id => $string,
373             app_id => $string,
374             delivery_mode => $integer,
375             priority => $integer,
376             timestamp => $integer,
377             }
378              
379             C<$timeout> is a positive integer, specifying the number of milliseconds to
380             wait for a message. If you do not provide a timeout (or set it to 0), then
381             this call will block until it receives a message. If you set it to -1 it will
382             return immediately (waiting 0 ms).
383              
384             If you provide a timeout, then the C method returns C if the
385             timeout expires before a message is received from the server.
386              
387             =head2 cancel($channel, $consumer_tag)
388              
389             Take the channel out of consume mode previously enabled with C.
390              
391             This method returns true or false indicating whether we got the expected
392             "cancel-ok" response from the server.
393              
394             C<$channel> is a channel that has been opened with C.
395              
396             C<$consumer_tag> is a tag previously passed to C or one that was
397             generated automatically as a result of calling C without an
398             explicit tag.
399              
400             =head2 get($channel, $queuename, $options)
401              
402             Get a message from the specified queue (via C).
403              
404             The method returns C immediately if no messages are available
405             on the queue. If a message is available a reference to a hash (hashref) is
406             returned with the following contents:
407              
408             {
409             body => 'Magic Transient Payload', # the reconstructed body
410             routing_key => 'nr_test_q', # route the message took
411             exchange => 'nr_test_x', # exchange used
412             content_type => 'foo', # (only if specified)
413             delivery_tag => 1, # (used for acks)
414             redelivered => 0, # if message is redelivered
415             message_count => 0, # message count
416              
417             # Not all of these will be present. Consult the RabbitMQ reference for more details.
418             props => {
419             content_type => 'text/plain',
420             content_encoding => 'none',
421             correlation_id => '123',
422             reply_to => 'somequeue',
423             expiration => 1000,
424             message_id => 'ABC',
425             type => 'notmytype',
426             user_id => 'guest',
427             app_id => 'idd',
428             delivery_mode => 1,
429             priority => 2,
430             timestamp => 1271857990,
431             headers => {
432             unsigned_integer => 12345,
433             signed_integer => -12345,
434             double => 3.141,
435             string => "string here",
436              
437             # The x-death header is a special header for dead-lettered messages (rejected or timed out).
438             'x-death' => [
439             {
440             time => 1271857954,
441             exchange => $exchange,
442             queue => $exchange,
443             reason => 'expired',
444             'routing-keys' => [q{}],
445             },
446             ],
447             },
448             },
449             }
450              
451             C<$channel> is a channel that has been opened with C.
452              
453             C<$queuename> is the name of the queue from which we'd like to consume.
454              
455             C<$options> is an optional hash respecting the following keys:
456              
457             {
458             no_ack => $boolean, #default 1
459             }
460              
461             =head2 ack($channel, $delivery_tag, $multiple = 0)
462              
463             Acknowledge a message.
464              
465             C<$channel> is a channel that has been opened with C.
466              
467             C<$delivery_tag> the delivery tag seen from a returned message from the
468             C method.
469              
470             C<$multiple> specifies if multiple are to be acknowledged at once.
471              
472             =head2 purge($channel, $queuename)
473              
474             Purge all messages from the specified queue.
475              
476             C<$channel> is a channel that has been opened with C.
477              
478             C<$queuename> is the queue to be purged.
479              
480             =head2 reject($channel, $delivery_tag, $requeue = 0)
481              
482             Reject a message with the specified delivery tag.
483              
484             C<$channel> is a channel that has been opened with C.
485              
486             C<$delivery_tag> the delivery tag seen from a returned message from the
487             C method.
488              
489             C<$requeue> specifies if the message should be requeued.
490              
491             =head2 tx_select($channel)
492              
493             Start a server-side (tx) transaction over $channel.
494              
495             C<$channel> is a channel that has been opened with C.
496              
497             =head2 tx_commit($channel)
498              
499             Commit a server-side (tx) transaction over $channel.
500              
501             C<$channel> is a channel that has been opened with C.
502              
503             =head2 tx_rollback($channel)
504              
505             Rollback a server-side (tx) transaction over $channel.
506              
507             C<$channel> is a channel that has been opened with C.
508              
509             =head2 get_rpc_timeout()
510              
511             Return the RPC timeout on the current connection.
512              
513             The value returned will be either C, if the RPC timeout is
514             unlimited, or a hashref with C for the number of seconds and
515             C for the number of microseconds.
516              
517             =head2 set_rpc_timeout({ tv_sec => SECONDS, tv_usec => MICROSECONDS })
518              
519             Set the RPC timeout for the current connection, using the seconds
520             (C) and microseconds (C) provided. The arguments
521             supplied can be either in the form of a hash or a hashref, so all of
522             the following are valid:
523              
524             $mq->set_rpc_timeout(tv_sec => 10, tv_usec => 500000)
525             $mq->set_rpc_timeout( { tv_sec => 10, tv_usec => 500000 } )
526             $mq->set_rpc_timeout(tv_sec => 10)
527             $mq->set_rpc_timeout(tv_usec => 500000)
528              
529             In order to remove the time limit for RPC calls, simply pass C.
530              
531             $mq->set_rpc_timeout( undef )
532              
533             =head2 basic_qos($channel, $options)
534              
535             Set quality of service flags on the current $channel.
536              
537             C<$channel> is a channel that has been opened with C.
538              
539             C<$options> is an optional hash respecting the following keys:
540              
541             {
542             prefetch_count => $cnt, #default 0
543             prefetch_size => $size, #default 0
544             global => $bool, #default 0
545             }
546              
547             =head2 heartbeat()
548              
549             Send a heartbeat. If you've connected with a heartbeat parameter,
550             you must send a heartbeat periodically matching connection parameter or
551             the server may snip the connection.
552              
553             Note that since C blocks for up to C<$timeout> milliseconds,
554             it automatically handles sending heartbeats for you while active.
555              
556             =head2 has_ssl
557              
558             Returns true if the module was compiled with SSL support, false otherwise
559              
560             =head1 WARNING AND ERROR MESSAGES
561              
562             =head2 Fatal Errors
563              
564             It should be noted that almost all errors in this library are considered fatal,
565             insomuch as they trigger a C. In these errors, if it appears that somehow
566             the connection has been closed by the remote host, or otherwise invalidated,
567             the socket will also be closed and should be re-opened before any additional
568             calls are made.
569              
570             =head1 EXAMPLES
571              
572             =head2 Simple publisher
573              
574             use Net::AMQP::RabbitMQ;
575              
576             my $channel = 1;
577             my $exchange = "MyExchange.x"; # This exchange must exist already
578             my $routing_key = "foobar";
579              
580             my $mq = Net::AMQP::RabbitMQ->new();
581             $mq->connect("localhost", { user => "guest", password => "guest" });
582             $mq->channel_open(1);
583             $mq->publish($channel, $routing_key, "Message Here", { exchange => $exchange });
584             $mq->disconnect();
585              
586             =head2 Simple consumer
587              
588             use Net::AMQP::RabbitMQ;
589             use Data::Dumper;
590              
591             my $channel = 1;
592             my $exchange = "MyExchange.x"; # This exchange must exist already
593             my $routing_key = "foobar";
594              
595             my $mq = Net::AMQP::RabbitMQ->new();
596             $mq->connect("localhost", { user => "guest", password => "guest" });
597             $mq->channel_open($channel);
598              
599             # Declare queue, letting the server auto-generate one and collect the name
600             my $queuename = $mq->queue_declare($channel, "");
601              
602             # Bind the new queue to the exchange using the routing key
603             $mq->queue_bind($channel, $queuename, $exchange, $routing_key);
604              
605             # Request that messages be sent and receive them until interrupted
606             $mq->consume($channel, $queuename);
607              
608             while ( my $message = $mq->recv(0) )
609             {
610             print "Received message:\n";
611             print Dumper($message);
612             }
613              
614             $mq->disconnect();
615              
616             =head1 RUNNING THE TEST SUITE
617              
618             The test suite runs live tests against a RabbitMQ server at
619             C.
620              
621             There are separte variables for the ssl and none ssl host/user/password/port.
622              
623             If you are in an environment that won't let you connect to this
624             host (or the test server is down), you can use these environment variables:
625              
626             =over 4
627              
628             =item MQHOST
629              
630             Hostname or IP address of the RabbitMQ server to connect to (defaults
631             to C).
632              
633             =item MQUSERNAME
634              
635             Username for authentication (defaults to username for L).
636              
637             =item MQPASSWORD
638              
639             Password for authentication (defaults to password for L).
640              
641             =item MQPORT
642              
643             Port of the RabbitMQ server to connect to (defaults to 5672)
644              
645             =item MQVHOST
646              
647             Vhost to use (defaults to vhost for for L).
648              
649             =item MQSSL
650              
651             Whether the tests should run with SSL enabled (defaults to false, but
652             see also C).
653              
654             =item MQSKIPSSL
655              
656             Whether the SSL tests should be skipped entirely. This option exists
657             because the SSL tests used to ignore C, and to maintain
658             backwards compatibility, still do.
659              
660             =item MQSSLHOST
661              
662             Hostname or IP address of the RabbitMQ server to connect to (defaults
663             to C).
664              
665             =item MQSSLUSERNAME
666              
667             Username for authentication (defaults to username for L).
668              
669             =item MQSSLPASSWORD
670              
671             Password for authentication (defaults to password for L).
672              
673             =item MQSSLPORT
674              
675             Port of the RabbitMQ server to connect to (defaults to 5671)
676              
677             =item MQSSLCACERT
678              
679             Path to the certificate file for SSL-enabled connections, defaults to
680             F.
681              
682             =item MQSSLVERIFYHOST
683              
684             Whether SSL hostname verification should be enabled (defaults to
685             true).
686              
687             =item MQSSLINIT
688              
689             Whether the openssl library should be initialized (defaults to true).
690              
691             =item MQSSLVHOST
692              
693             Vhost to use when in SSL mode (defaults to vhost for for L).
694              
695             =item MQADMINPROTOCOL
696              
697             Protocol to use for accessing the admin. Defaults to https
698              
699             =item MQADMINPORT
700              
701             Port to use for accessing the admin interface. Defaults to 443
702              
703             =back
704              
705             =head1 VERSION COMPATIBILITY
706              
707             This module was forked from L version 0.2.6 which uses an older
708             version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ.
709             The main change between this module and the original is this library uses
710             a newer, unforked, version of librabbitmq.
711              
712             This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ
713             version 2+. Also, since the version of librabbitmq used is not a custom fork, it
714             means this module doesn't support the basic_return callback method.
715              
716             =head1 AUTHORS
717              
718             Theo Schlossnagle Ejesus@omniti.comE
719              
720             Mark Ellis Emarkellis@cpan.orgE
721              
722             Mike "manchicken" Stemle, Jr. Emstemle@cpan.orgE
723              
724             Dave Rolsky Eautarch@urth.orgE
725              
726             Slaven Rezić
727              
728             Armand Leclercq
729              
730             Daniel W Burke
731              
732             Dávid Kovács
733              
734             Alexey Sheynuk
735              
736             Karen Etheridge Eether@cpan.orgE
737              
738             Eric Brine Eikegami@cpan.orgE
739              
740             Peter Valdemar Mørch Epmorch@cpan.orgE
741              
742             =head1 THANKS
743              
744             Special thanks to L for providing us with the RabbitMQ server the tests run against.
745              
746             =head1 LICENSE
747              
748             This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.
749              
750             librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.
751              
752             =cut
753              
754             # Since we can't store the PID in $self, which is a amqp_connection_state_t, we
755             # store the pid for $self in $pids{$self}.
756             # (See L).
757              
758             my %pids;
759             if ($have_fieldhash) {
760             fieldhash(%pids);
761             }
762              
763             sub new {
764 32     32 1 5219 my $class = shift;
765 32         2022 my $self = $class->_new(@_);
766 32 50       362 $pids{$self} = $$
767             if $have_fieldhash;
768 32         123 return $self;
769             }
770              
771             sub publish {
772 30     30 1 39825623 my ( $self, $channel, $routing_key, $body, $options, $props ) = @_;
773              
774 30   50     139 $options ||= {};
775 30   100     157 $props ||= {};
776              
777             # Do a shallow clone to avoid modifying variable passed by caller
778 30         124 $props = {%$props};
779              
780             # Convert blessed variables in headers to strings
781 30 100       130 if ( $props->{headers} ) {
782             $props->{headers} =
783 9 100       23 { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
  62         308  
  9         40  
784             }
785              
786 30         100335 $self->_publish( $channel, $routing_key, $body, $options, $props );
787             }
788              
789             sub set_rpc_timeout {
790 7     7 1 514293 my ( $self, @opts ) = @_;
791              
792 7         10 my $args = undef;
793              
794             # Be kind on whether or not we receive a hashref
795             # or an actual hash.
796 7 100 66     33 if ( ( scalar @opts % 2 ) == 0 ) {
    100          
797 3         9 $args = {@opts};
798             }
799             elsif ( scalar @opts == 1 && defined $opts[0] ) {
800 1         2 $args = $opts[0];
801             }
802              
803 7         52 return $self->_set_rpc_timeout($args);
804             }
805              
806             sub DESTROY {
807 32     32   26240929 my ($self) = @_;
808 32 100 66     1471 if ( !$have_fieldhash || $pids{$self} && $pids{$self} == $$ ) {
      66        
809 31         2511669 $self->_destroy_connection_close;
810 31         10641 $self->_destroy_cleanup;
811             }
812             }
813              
814             1;