File Coverage

blib/lib/Net/AMQP/RabbitMQ.pm
Criterion Covered Total %
statement 28 28 100.0
branch 7 8 87.5
condition 7 10 70.0
subroutine 7 7 100.0
pod 2 2 100.0
total 51 55 92.7


line stmt bran cond sub pod time code
1             package Net::AMQP::RabbitMQ;
2 32     32   2365508 use strict;
  32         330  
  32         852  
3 32     32   154 use warnings;
  32         98  
  32         1207  
4              
5             our $VERSION = '2.40008';
6              
7 32     32   185 use XSLoader;
  32         75  
  32         1341  
8             XSLoader::load "Net::AMQP::RabbitMQ", $VERSION;
9 32     32   230 use Scalar::Util qw(blessed);
  32         101  
  32         14415  
10              
11             # Hash::FieldHash and/or Hash::Util's fieldhash may or may not be available
12             #
13             # On versions earlier than 5.9.4, Hash::Util::FieldHash didn't exist and
14             # hence Hash::Util::fieldhash didn't either.
15             #
16             # We need one of them to fix #151: REQUEST: optionally leave connection alive
17             # in net_amqp_rabbitmq_DESTROY to allow forking
18             #
19             # If neither is found #151 will remain unfixed
20             my $have_fieldhash = eval {
21             require Hash::FieldHash;
22             Hash::FieldHash->import('all');
23             1;
24             } || eval {
25             require Hash::Util;
26             Hash::Util->import('fieldhash');
27             1;
28             };
29              
30             =encoding UTF-8
31              
32             =head1 NAME
33              
34             Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
35              
36             =head1 SYNOPSIS
37              
38             use Net::AMQP::RabbitMQ;
39             my $mq = Net::AMQP::RabbitMQ->new();
40             $mq->connect("localhost", { user => "guest", password => "guest" });
41             $mq->channel_open(1);
42             $mq->queue_declare(1, "queuename");
43             $mq->publish(1, "queuename", "Hi there!");
44             my $gotten = $mq->get(1, "queuename");
45             print $gotten->{body} . "\n";
46             $mq->disconnect();
47              
48             =head1 DESCRIPTION
49              
50             C provides a simple wrapper around the librabbitmq library
51             that allows connecting, declaring exchanges and queues, binding and unbinding
52             queues, publishing, consuming and receiving events.
53              
54             Error handling in this module is primarily achieve by C (die). You
55             should be making good use of C around these methods to ensure that you
56             appropriately catch the errors.
57              
58             =head1 METHODS
59              
60             All methods, unless specifically stated, return nothing on success
61             and die on failure.
62              
63             Failure to be connected is a fatal failure for most methods.
64              
65             =head2 new()
66              
67             Creates a new Net::AMQP::RabbitMQ object.
68              
69             =head2 connect( $hostname, $options )
70              
71             Connect to RabbitMQ server.
72              
73             C<$hostname> is the host to which a connection will be attempted.
74              
75             C<$options> is an optional hash respecting the following keys:
76              
77             {
78             user => $user, #default 'guest'
79             password => $password, #default 'guest'
80             port => $port, #default 5672
81             vhost => $vhost, #default '/'
82             channel_max => $cmax, #default 0
83             frame_max => $fmax, #default 131072
84             heartbeat => $hearbeat, #default 0
85             timeout => $seconds, #default undef (no timeout)
86              
87             ssl => 1 | 0, #default 0
88             ssl_verify_host => 1 | 0, #default 1
89             ssl_cacert => $caert_path, #needed for ssl
90             ssl_init => 1 | 0, #default 1, initialise the openssl library
91            
92             ssl_cert => $cert_path, #client cert.pem and key.pem when using ssl certificate chains
93             ssl_key => $key_path #(with RabbitMQ's fail_if_no_peer_cert = true)
94             }
95              
96             You probably don't want to touch C, unless you know what it does.
97              
98             For now there is no option to disable ssl peer checking, meaning to use C, C is required.
99              
100             B
101              
102             if the connection is cut when using ssl, openssl will throw a C, you should catch this or perl
103             will exit with error code 141
104              
105             $SIG{PIPE} = 'IGNORE';
106              
107             =head2 disconnect()
108              
109             Disconnect from the RabbitMQ server.
110              
111             =head2 get_server_properties()
112              
113             Get a reference to hash (hashref) of server properties. These may vary, you should use C to inspect. Properties will be provided for the RabbitMQ server to which you are connected.
114              
115             =head2 get_client_properties()
116              
117             Get a reference to hash (hashref) of client properties. These may vary, you should use C to inspect.
118              
119             =head2 is_connected()
120              
121             Returns true if a valid socket connection appears to exist, false otherwise.
122              
123             =head2 channel_open($channel)
124              
125             Open an AMQP channel on the connection.
126              
127             C<$channel> is a positive integer describing the channel you which to open.
128              
129             =head2 channel_close($channel)
130              
131             Close the specified channel.
132              
133             C<$channel> is a positive integer describing the channel you which to close.
134              
135             =head2 get_channel_max()
136              
137             Returns the maximum allowed channel number.
138              
139             =head2 exchange_declare($channel, $exchange, $options, $arguments)
140              
141             Declare an AMQP exchange on the RabbitMQ server unless it already exists. Bad
142             things will happen if the exchange name already exists and different parameters
143             are provided.
144              
145             C<$channel> is a channel that has been opened with C.
146              
147             C<$exchange> is the name of the exchange to be instantiated.
148              
149             C<$options> is an optional hash respecting the following keys:
150              
151             {
152             exchange_type => $type, #default 'direct'
153             passive => $boolean, #default 0
154             durable => $boolean, #default 0
155             auto_delete => $boolean, #default 0
156             }
157              
158             Note that the default for the C option is different for C and for C.
159              
160             C<$arguments> is an optional hash of additional arguments to the RabbitMQ server, such as:
161              
162             {
163             # exchange to try if no routes apply on this exchange
164             alternate_exchange => 'alternate_exchange_name',
165             }
166              
167             =head2 exchange_delete($channel, $exchange, $options)
168              
169             Delete a AMQP exchange on the RabbitMQ server.
170              
171             C<$channel> is a channel that has been opened with C.
172              
173             C<$exchange> is the name of the exchange to be deleted.
174              
175             C<$options> is an optional hash respecting the following keys:
176              
177             {
178             if_unused => $boolean, #default 1
179             }
180              
181             =head2 exchange_bind($channel, $destination, $source, $routing_key, $arguments)
182              
183             Bind a source exchange to a destination exchange with a given routing key and/or parameters.
184              
185             C<$channel> is a channel that has been opened with C.
186              
187             C<$destination> is a previously declared exchange, C<$source> is
188             yet another previously declared exchange, and C<$routing_key> is
189             the routing key that will bind the specified source exchange to
190             the specified destination exchange.
191              
192             C<$arguments> is an optional hash which will be passed to the server. When
193             binding to an exchange of type C, this can be used to only receive
194             messages with the supplied header values.
195              
196             =head2 exchange_unbind($channel, $destination, $source, $routing_key, $arguments)
197              
198             Remove a binding between source and destination exchanges.
199              
200             C<$channel> is a channel that has been opened with C.
201              
202             C<$destination> is a previously declared exchange, C<$source> is
203             yet another previously declared exchange, and C<$routing_key> is
204             the routing key that will B the specified source exchange from
205             the specified destination exchange.
206              
207             C<$arguments> is an optional hash which will be passed to the server. When
208             binding to an exchange of type C, this can be used to only receive
209             messages with the supplied header values.
210              
211             =head2 queue_declare($channel, $queuename, $options, $arguments)
212              
213             Declare an AMQP queue on the RabbitMQ server.
214              
215             In scalar context, this method returns the queuename declared
216             (important for retrieving the auto-generated queuename in the
217             event that one was requested).
218              
219             In array context, this method returns three items: queuename,
220             the number of message waiting on the queue, and the number
221             of consumers bound to the queue.
222              
223             C<$channel> is a channel that has been opened with C.
224              
225             C<$queuename> is the name of the queuename to be instantiated. If
226             C<$queuename> is undef or an empty string, then an auto generated
227             queuename will be used.
228              
229             C<$options> is an optional hash respecting the following keys:
230              
231             {
232             passive => $boolean, #default 0
233             durable => $boolean, #default 0
234             exclusive => $boolean, #default 0
235             auto_delete => $boolean, #default 1
236             }
237              
238             Note that the default for the C option is different for C and for C.
239              
240             C<$arguments> is an optional hash which will be passed to the server
241             when the queue is created. This can be used for creating mirrored
242             queues by using the x-ha-policy header.
243              
244             =head2 queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
245              
246             Bind the specified queue to the specified exchange with a routing key.
247              
248             C<$channel> is a channel that has been opened with C.
249              
250             C<$queuename> is a previously declared queue, C<$exchange> is a
251             previously declared exchange, and C<$routing_key> is the routing
252             key that will bind the specified queue to the specified exchange.
253              
254             C<$arguments> is an optional hash which will be passed to the server. When
255             binding to an exchange of type C, this can be used to only receive
256             messages with the supplied header values.
257              
258             =head2 queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
259              
260             Remove a binding between a queue and an exchange. If this fails, you must reopen the channel.
261              
262             This is like the C with respect to arguments. This command unbinds
263             the queue from the exchange. The C<$routing_key> and C<$arguments> must match
264             the values supplied when the binding was created.
265              
266             =head2 queue_delete($channel, $queuename, $options)
267              
268             Delete a specified queue. If this fails, you must reopen the channel.
269              
270             C<$options> is an optional hash respecting the following keys:
271              
272             {
273             if_unused => $boolean, #default 1
274             if_empty => $boolean, #default 1
275             }
276              
277             =head2 publish($channel, $routing_key, $body, $options, $props)
278              
279             Publish a message to an exchange.
280              
281             C<$channel> is a channel that has been opened with C.
282              
283             C<$routing_key> is the name of the routing key for this message.
284              
285             C<$body> is the payload to enqueue.
286              
287             C<$options> is an optional hash respecting the following keys:
288              
289             {
290             exchange => $exchange, #default 'amq.direct'
291             mandatory => $boolean, #default 0
292             immediate => $boolean, #default 0
293             force_utf8_in_header_strings => $boolean, #default 0
294             }
295              
296             The C option causes all headers which look like
297             strings to be treated as UTF-8. In an attempt to make this a non-breaking change,
298             this option is disabled by default. However, for all headers beginning with
299             C, those are treated as UTF-8 regardless of this option (per spec).
300              
301             C<$props> is an optional hash (the AMQP 'props') respecting the following keys:
302              
303             {
304             content_type => $string,
305             content_encoding => $string,
306             correlation_id => $string,
307             reply_to => $string,
308             expiration => $string,
309             message_id => $string,
310             type => $string,
311             user_id => $string,
312             app_id => $string,
313             delivery_mode => $integer,
314             priority => $integer,
315             timestamp => $integer,
316             headers => $headers # This should be a hashref of keys and values.
317             }
318              
319             =head2 consume($channel, $queuename, $options)
320              
321             Put the channel into consume mode.
322              
323             The C is returned. This command does B return AMQP
324             messages, for that the C method should be used.
325              
326             C<$channel> is a channel that has been opened with C.
327              
328             C<$queuename> is the name of the queue from which we'd like to consume.
329              
330             C<$options> is an optional hash respecting the following keys:
331              
332             {
333             consumer_tag => $tag, #absent by default
334             no_local => $boolean, #default 0
335             no_ack => $boolean, #default 1
336             exclusive => $boolean, #default 0
337             }
338              
339             =head2 recv($timeout)
340              
341             Receive AMQP messages.
342              
343             This method returns a reference to a hash (hashref) containing the following information:
344              
345             {
346             body => 'Magic Transient Payload', # the reconstructed body
347             routing_key => 'nr_test_q', # route the message took
348             exchange => 'nr_test_x', # exchange used
349             delivery_tag => 1, # (used for acks)
350             redelivered => $boolean # if message is redelivered
351             consumer_tag => 'c_tag', # tag from consume()
352             props => $props, # hashref sent in
353             }
354              
355             C<$props> is the hash sent by publish() respecting the following keys:
356              
357             {
358             content_type => $string,
359             content_encoding => $string,
360             correlation_id => $string,
361             reply_to => $string,
362             expiration => $string,
363             message_id => $string,
364             type => $string,
365             user_id => $string,
366             app_id => $string,
367             delivery_mode => $integer,
368             priority => $integer,
369             timestamp => $integer,
370             }
371              
372             C<$timeout> is a positive integer, specifying the number of milliseconds to
373             wait for a message. If you do not provide a timeout (or set it to 0), then
374             this call will block until it receives a message. If you set it to -1 it will
375             return immediately (waiting 0 ms).
376              
377             If you provide a timeout, then the C method returns C if the
378             timeout expires before a message is received from the server.
379              
380             =head2 cancel($channel, $consumer_tag)
381              
382             Take the channel out of consume mode previously enabled with C.
383              
384             This method returns true or false indicating whether we got the expected
385             "cancel-ok" response from the server.
386              
387             C<$channel> is a channel that has been opened with C.
388              
389             C<$consumer_tag> is a tag previously passed to C or one that was
390             generated automatically as a result of calling C without an
391             explicit tag.
392              
393             =head2 get($channel, $queuename, $options)
394              
395             Get a message from the specified queue (via C).
396              
397             The method returns C immediately if no messages are available
398             on the queue. If a message is available a reference to a hash (hashref) is
399             returned with the following contents:
400              
401             {
402             body => 'Magic Transient Payload', # the reconstructed body
403             routing_key => 'nr_test_q', # route the message took
404             exchange => 'nr_test_x', # exchange used
405             content_type => 'foo', # (only if specified)
406             delivery_tag => 1, # (used for acks)
407             redelivered => 0, # if message is redelivered
408             message_count => 0, # message count
409              
410             # Not all of these will be present. Consult the RabbitMQ reference for more details.
411             props => {
412             content_type => 'text/plain',
413             content_encoding => 'none',
414             correlation_id => '123',
415             reply_to => 'somequeue',
416             expiration => 1000,
417             message_id => 'ABC',
418             type => 'notmytype',
419             user_id => 'guest',
420             app_id => 'idd',
421             delivery_mode => 1,
422             priority => 2,
423             timestamp => 1271857990,
424             headers => {
425             unsigned_integer => 12345,
426             signed_integer => -12345,
427             double => 3.141,
428             string => "string here",
429              
430             # The x-death header is a special header for dead-lettered messages (rejected or timed out).
431             'x-death' => [
432             {
433             time => 1271857954,
434             exchange => $exchange,
435             queue => $exchange,
436             reason => 'expired',
437             'routing-keys' => [q{}],
438             },
439             ],
440             },
441             },
442             }
443              
444             C<$channel> is a channel that has been opened with C.
445              
446             C<$queuename> is the name of the queue from which we'd like to consume.
447              
448             C<$options> is an optional hash respecting the following keys:
449              
450             {
451             no_ack => $boolean, #default 1
452             }
453              
454             =head2 ack($channel, $delivery_tag, $multiple = 0)
455              
456             Acknowledge a message.
457              
458             C<$channel> is a channel that has been opened with C.
459              
460             C<$delivery_tag> the delivery tag seen from a returned message from the
461             C method.
462              
463             C<$multiple> specifies if multiple are to be acknowledged at once.
464              
465             =head2 purge($channel, $queuename)
466              
467             Purge all messages from the specified queue.
468              
469             C<$channel> is a channel that has been opened with C.
470              
471             C<$queuename> is the queue to be purged.
472              
473             =head2 reject($channel, $delivery_tag, $requeue = 0)
474              
475             Reject a message with the specified delivery tag.
476              
477             C<$channel> is a channel that has been opened with C.
478              
479             C<$delivery_tag> the delivery tag seen from a returned message from the
480             C method.
481              
482             C<$requeue> specifies if the message should be requeued.
483              
484             =head2 tx_select($channel)
485              
486             Start a server-side (tx) transaction over $channel.
487              
488             C<$channel> is a channel that has been opened with C.
489              
490             =head2 tx_commit($channel)
491              
492             Commit a server-side (tx) transaction over $channel.
493              
494             C<$channel> is a channel that has been opened with C.
495              
496             =head2 tx_rollback($channel)
497              
498             Rollback a server-side (tx) transaction over $channel.
499              
500             C<$channel> is a channel that has been opened with C.
501              
502             =head2 basic_qos($channel, $options)
503              
504             Set quality of service flags on the current $channel.
505              
506             C<$channel> is a channel that has been opened with C.
507              
508             C<$options> is an optional hash respecting the following keys:
509              
510             {
511             prefetch_count => $cnt, #default 0
512             prefetch_size => $size, #default 0
513             global => $bool, #default 0
514             }
515              
516             =head2 heartbeat()
517              
518             Send a heartbeat. If you've connected with a heartbeat parameter,
519             you must send a heartbeat periodically matching connection parameter or
520             the server may snip the connection.
521              
522             =head2 has_ssl
523              
524             Returns true if the module was compiled with SSL support, false otherwise
525              
526             =head1 WARNING AND ERROR MESSAGES
527              
528             =head2 Fatal Errors
529              
530             It should be noted that almost all errors in this library are considered fatal,
531             insomuch as they trigger a C. In these errors, if it appears that somehow
532             the connection has been closed by the remote host, or otherwise invalidated,
533             the socket will also be closed and should be re-opened before any additional
534             calls are made.
535              
536             =head1 EXAMPLES
537              
538             =head2 Simple publisher
539              
540             use Net::AMQP::RabbitMQ;
541              
542             my $channel = 1;
543             my $exchange = "MyExchange.x"; # This exchange must exist already
544             my $routing_key = "foobar";
545              
546             my $mq = Net::AMQP::RabbitMQ->new();
547             $mq->connect("localhost", { user => "guest", password => "guest" });
548             $mq->channel_open(1);
549             $mq->publish($channel, $routing_key, "Message Here", { exchange => $exchange });
550             $mq->disconnect();
551              
552             =head2 Simple consumer
553              
554             use Net::AMQP::RabbitMQ;
555             use Data::Dumper;
556              
557             my $channel = 1;
558             my $exchange = "MyExchange.x"; # This exchange must exist already
559             my $routing_key = "foobar";
560              
561             my $mq = Net::AMQP::RabbitMQ->new();
562             $mq->connect("localhost", { user => "guest", password => "guest" });
563             $mq->channel_open($channel);
564              
565             # Declare queue, letting the server auto-generate one and collect the name
566             my $queuename = $mq->queue_declare($channel, "");
567              
568             # Bind the new queue to the exchange using the routing key
569             $mq->queue_bind($channel, $queuename, $exchange, $routing_key);
570              
571             # Request that messages be sent and receive them until interrupted
572             $mq->consume($channel, $queuename);
573              
574             while ( my $message = $mq->recv(0) )
575             {
576             print "Received message:\n";
577             print Dumper($message);
578             }
579              
580             $mq->disconnect();
581              
582             =head1 RUNNING THE TEST SUITE
583              
584             The test suite runs live tests against a RabbitMQ server at
585             C.
586              
587             There are separte variables for the ssl and none ssl host/user/password/port.
588              
589             If you are in an environment that won't let you connect to this
590             host (or the test server is down), you can use these environment variables:
591              
592             =over 4
593              
594             =item MQHOST
595              
596             Hostname or IP address of the RabbitMQ server to connect to (defaults
597             to C).
598              
599             =item MQUSERNAME
600              
601             Username for authentication (defaults to username for L).
602              
603             =item MQPASSWORD
604              
605             Password for authentication (defaults to password for L).
606              
607             =item MQPORT
608              
609             Port of the RabbitMQ server to connect to (defaults to 5672)
610              
611             =item MQVHOST
612              
613             Vhost to use (defaults to vhost for for L).
614              
615             =item MQSSL
616              
617             Whether the tests should run with SSL enabled (defaults to false, but
618             see also C).
619              
620             =item MQSKIPSSL
621              
622             Whether the SSL tests should be skipped entirely. This option exists
623             because the SSL tests used to ignore C, and to maintain
624             backwards compatibility, still do.
625              
626             =item MQSSLHOST
627              
628             Hostname or IP address of the RabbitMQ server to connect to (defaults
629             to C).
630              
631             =item MQSSLUSERNAME
632              
633             Username for authentication (defaults to username for L).
634              
635             =item MQSSLPASSWORD
636              
637             Password for authentication (defaults to password for L).
638              
639             =item MQSSLPORT
640              
641             Port of the RabbitMQ server to connect to (defaults to 5671)
642              
643             =item MQSSLCACERT
644              
645             Path to the certificate file for SSL-enabled connections, defaults to
646             F.
647              
648             =item MQSSLVERIFYHOST
649              
650             Whether SSL hostname verification should be enabled (defaults to
651             true).
652              
653             =item MQSSLINIT
654              
655             Whether the openssl library should be initialized (defaults to true).
656              
657             =item MQSSLVHOST
658              
659             Vhost to use when in SSL mode (defaults to vhost for for L).
660              
661             =item MQADMINPROTOCOL
662              
663             Protocol to use for accessing the admin. Defaults to https
664              
665             =item MQADMINPORT
666              
667             Port to use for accessing the admin interface. Defaults to 443
668              
669             =back
670              
671             =head1 VERSION COMPATIBILITY
672              
673             This module was forked from L version 0.2.6 which uses an older
674             version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ.
675             The main change between this module and the original is this library uses
676             a newer, unforked, version of librabbitmq.
677              
678             This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ
679             version 2+. Also, since the version of librabbitmq used is not a custom fork, it
680             means this module doesn't support the basic_return callback method.
681              
682             =head1 AUTHORS
683              
684             Theo Schlossnagle Ejesus@omniti.comE
685              
686             Mark Ellis Emarkellis@cpan.orgE
687              
688             Michael Stemle, Jr. Ethemanchicken@gmail.comE
689              
690             Dave Rolsky Eautarch@urth.orgE
691              
692             Slaven Rezić
693              
694             Armand Leclercq
695              
696             Daniel W Burke
697              
698             Dávid Kovács
699              
700             Alexey Sheynuk
701              
702             Karen Etheridge Eether@cpan.orgE
703              
704             Eric Brine Eikegami@cpan.orgE
705              
706             Peter Valdemar Mørch Epmorch@cpan.orgE
707              
708             =head1 THANKS
709              
710             Special thanks to L for providing us with the RabbitMQ server the tests run against.
711              
712             =head1 LICENSE
713              
714             This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.
715              
716             librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.
717              
718             =cut
719              
720             # Since we can't store the PID in $self, which is a amqp_connection_state_t, we
721             # store the pid for $self in $pids{$self}.
722             # (See L).
723              
724             my %pids;
725             if ($have_fieldhash) {
726             fieldhash(%pids);
727             }
728              
729             sub new {
730 30     30 1 5907 my $class = shift;
731 30         2206 my $self = $class->_new(@_);
732 30 50       426 $pids{$self} = $$
733             if $have_fieldhash;
734 30         135 return $self;
735             }
736              
737             sub publish {
738 30     30 1 38947886 my ($self, $channel, $routing_key, $body, $options, $props) = @_;
739              
740 30   50     156 $options ||= {};
741 30   100     189 $props ||= {};
742              
743             # Do a shallow clone to avoid modifying variable passed by caller
744 30         147 $props = { %$props };
745              
746             # Convert blessed variables in headers to strings
747 30 100       188 if( $props->{headers} ) {
748 9 100       31 $props->{headers} = { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
  62         352  
  9         51  
749             }
750              
751 30         2952798 $self->_publish($channel, $routing_key, $body, $options, $props);
752             }
753              
754             sub DESTROY {
755 30     30   23402002 my ($self) = @_;
756             $self->_destroy_connection_close
757 30 100 66     2115544 if !$have_fieldhash || $pids{$self} && $pids{$self} == $$;
      66        
758 30         11148 $self->_destroy_cleanup;
759             }
760              
761             1;