File Coverage

blib/lib/Net/RabbitMQ.pm
Criterion Covered Total %
statement 18 18 100.0
branch 4 4 100.0
condition 3 4 75.0
subroutine 4 4 100.0
pod 1 1 100.0
total 30 31 96.7


line stmt bran cond sub pod time code
1             package Net::RabbitMQ;
2              
3             require DynaLoader;
4              
5 16     16   10117 use strict;
  16         27  
  16         639  
6 16     16   71 use vars qw($VERSION @ISA);
  16         18  
  16         1379  
7             $VERSION = "0.2.8";
8             @ISA = qw/DynaLoader/;
9              
10             bootstrap Net::RabbitMQ $VERSION ;
11 16     16   98 use Scalar::Util qw(blessed);
  16         21  
  16         3825  
12              
13             =head1 NAME
14              
15             Net::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
16              
17             =head1 SYNOPSIS
18              
19             use Net::RabbitMQ;
20             my $mq = Net::RabbitMQ->new();
21             $mq->connect("localhost", { user => "guest", password => "guest" });
22             $mq->channel_open(1);
23             $mq->publish(1, "queuename", "Hi there!");
24             $mq->disconnect();
25            
26             =head1 DESCRIPTION
27              
28             C provides a simple wrapper around the librabbitmq library
29             that allows connecting, declaring exchanges and queues, binding and unbinding
30             queues, publishing, consuming and receiving events.
31              
32             Error handling in this module is primarily achieve by Perl_croak (die). You
33             should be making good use of eval around these methods to ensure that you
34             appropriately catch the errors.
35              
36             =head2 Methods
37              
38             All methods, unless specifically stated, return nothing on success
39             and die on failure.
40              
41             =over 4
42              
43             =item new()
44              
45             Creates a new Net::RabbitMQ object.
46              
47             =item connect( $hostname, $options )
48              
49             C<$hostname> is the host to which a connection will be attempted.
50              
51             C<$options> is an optional hash respecting the following keys:
52              
53             {
54             user => $user, #default 'guest'
55             password => $password, #default 'guest'
56             port => $port, #default 5672
57             vhost => $vhost, #default '/'
58             channel_max => $cmax, #default 0
59             frame_max => $fmax, #default 131072
60             heartbeat => $hearbeat, #default 0
61             timeout => $seconds #default undef (no timeout)
62             }
63              
64             =item disconnect()
65              
66             Causes the connection to RabbitMQ to be torn down.
67              
68             =item channel_open($channel)
69              
70             C<$channel> is a positive integer describing the channel you which to open.
71              
72             =item channel_close($channel)
73              
74             C<$channel> is a positive integer describing the channel you which to close.
75              
76             =item get_channel_max()
77              
78             Returns the maximum allowed channel number.
79              
80             =item exchange_declare($channel, $exchange, $options)
81              
82             C<$channel> is a channel that has been opened with C.
83              
84             C<$exchange> is the name of the exchange to be instantiated.
85              
86             C<$options> is an optional hash respecting the following keys:
87              
88             {
89             exchange_type => $type, #default 'direct'
90             passive => $boolean, #default 0
91             durable => $boolean, #default 0
92             auto_delete => $boolean, #default 1
93             }
94              
95             =item exchange_delete($channel, $exchange, $options)
96              
97             C<$channel> is a channel that has been opened with C.
98              
99             C<$exchange> is the name of the exchange to be deleted.
100              
101             C<$options> is an optional hash respecting the following keys:
102              
103             {
104             if_unused => $boolean, #default 1
105             nowait => $boolean, #default 0
106             }
107              
108             =item queue_declare($channel, $queuename, $options, $arguments)
109              
110             C<$channel> is a channel that has been opened with C.
111              
112             C<$queuename> is the name of the queuename to be instantiated. If
113             C<$queuename> is undef or an empty string, then an auto generated
114             queuename will be used.
115              
116             C<$options> is an optional hash respecting the following keys:
117              
118             {
119             passive => $boolean, #default 0
120             durable => $boolean, #default 0
121             exclusive => $boolean, #default 0
122             auto_delete => $boolean, #default 1
123             }
124              
125             C<$arguments> is an optional hash which will be passed to the server
126             when the queue is created. This can be used for creating mirrored
127             queues by using the x-ha-policy header.
128              
129             In scalar context, this method returns the queuename delcared
130             (important for retrieving the autogenerated queuename in the
131             event that one was requested).
132              
133             In array context, this method returns three items: queuename,
134             the number of message waiting on the queue, and the number
135             of consumers bound to the queue.
136              
137             =item queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
138              
139             C<$channel> is a channel that has been opened with C.
140              
141             C<$queuename> is a previously declared queue, C<$exchange> is a
142             previously declared exchange, and C<$routing_key> is the routing
143             key that will bind the specified queue to the specified exchange.
144              
145             C<$arguments> is an optional hash which will be passed to the server. When
146             binding to an exchange of type C, this can be used to only receive
147             messages with the supplied header values.
148              
149             =item queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
150              
151             This is like the C with respect to arguments. This command unbinds
152             the queue from the exchange. The C<$routing_key> and C<$arguments> must match
153             the values supplied when the binding was created.
154              
155             =item publish($channel, $routing_key, $body, $options, $props)
156              
157             C<$channel> is a channel that has been opened with C.
158              
159             C<$routing_key> is the name of the routing key for this message.
160              
161             C<$body> is the payload to enqueue.
162              
163             C<$options> is an optional hash respecting the following keys:
164              
165             {
166             exchange => $exchange, #default 'amq.direct'
167             mandatory => $boolean, #default 0
168             immediate => $boolean, #default 0
169             }
170              
171             C<$props> is an optional hash (the AMQP 'props') respecting the following keys:
172             {
173             content_type => $string,
174             content_encoding => $string,
175             correlation_id => $string,
176             reply_to => $string,
177             expiration => $string,
178             message_id => $string,
179             type => $string,
180             user_id => $string,
181             app_id => $string,
182             delivery_mode => $integer,
183             priority => $integer,
184             timestamp => $integer,
185             }
186              
187             =item consume($channel, $queuename, $options)
188              
189             C<$channel> is a channel that has been opened with C.
190              
191             C<$queuename> is the name of the queue from which we'd like to consume.
192              
193             C<$options> is an optional hash respecting the following keys:
194              
195             {
196             consumer_tag => $tag, #absent by default
197             no_local => $boolean, #default 0
198             no_ack => $boolean, #default 1
199             exclusive => $boolean, #default 0
200             }
201              
202              
203             The consumer_tag is returned. This command does B return AMQP
204             frames, it simply notifies RabbitMQ that messages for this queue should
205             be delivered down the specified channel.
206              
207             =item recv()
208              
209             This command receives and reconstructs AMQP frames and returns a hash
210             containing the following information:
211              
212             {
213             body => 'Magic Transient Payload', # the reconstructed body
214             routing_key => 'nr_test_q', # route the message took
215             exchange => 'nr_test_x', # exchange used
216             delivery_tag => 1, # (used for acks)
217             consumer_tag => 'c_tag', # tag from consume()
218             props => $props, # hashref sent in
219             }
220              
221             C<$props> is the hash sent by publish() respecting the following keys:
222             {
223             content_type => $string,
224             content_encoding => $string,
225             correlation_id => $string,
226             reply_to => $string,
227             expiration => $string,
228             message_id => $string,
229             type => $string,
230             user_id => $string,
231             app_id => $string,
232             delivery_mode => $integer,
233             priority => $integer,
234             timestamp => $integer,
235             }
236              
237             =item get($channel, $queuename, $options)
238              
239             C<$channel> is a channel that has been opened with C.
240              
241             C<$queuename> is the name of the queue from which we'd like to consume.
242              
243             C<$options> is an optional hash respecting the following keys:
244              
245             This command runs an amqp_basic_get which returns undef immediately
246             if no messages are available on the queue and returns a has as follows
247             if a message is available.
248              
249             {
250             body => 'Magic Transient Payload', # the reconstructed body
251             routing_key => 'nr_test_q', # route the message took
252             exchange => 'nr_test_x', # exchange used
253             content_type => 'foo', # (only if specified)
254             delivery_tag => 1, # (used for acks)
255             redelivered => 0, # if message is redelivered
256             message_count => 0, # message count
257             }
258              
259             =item ack($channel, $delivery_tag, $multiple = 0)
260              
261             C<$channel> is a channel that has been opened with C.
262              
263             C<$delivery_tag> the delivery tag seen from a returned frame from the
264             C method.
265              
266             C<$multiple> specifies if multiple are to be acknowledged at once.
267              
268             =item purge($channel, $queuename, $no_wait = 0)
269              
270             C<$channel> is a channel that has been opened with C.
271              
272             C<$queuename> is the queue to be purged.
273              
274             C<$no_wait> a boolean specifying if the call should not wait for
275             the server to acknowledge the acknowledgement.
276              
277              
278             =item reject($channel, $delivery_tag, $requeue = 0)
279              
280             C<$channel> is a channel that has been opened with C.
281              
282             C<$delivery_tag> the delivery tag seen from a returned frame from the
283             C method.
284              
285             C<$requeue> specifies if the message should be requeued.
286              
287              
288             =item tx_select($channel)
289              
290             C<$channel> is a channel that has been opened with C.
291              
292             Start a server-side (tx) transaction over $channel.
293              
294             =item tx_commit($channel)
295              
296             C<$channel> is a channel that has been opened with C.
297              
298             Commit a server-side (tx) transaction over $channel.
299              
300             =item tx_rollback($channel)
301              
302             C<$channel> is a channel that has been opened with C.
303              
304             Rollback a server-side (tx) transaction over $channel.
305              
306             =item basic_qos($channel, $options)
307              
308             C<$channel> is a channel that has been opened with C.
309              
310             C<$options> is an optional hash respecting the following keys:
311              
312             {
313             prefetch_count => $cnt, #default 0
314             prefetch_size => $size, #default 0
315             global => $bool, #default 0
316             }
317              
318             Set quality of service flags on the current $channel.
319              
320             =item hearbeat()
321              
322             Send a hearbeat frame. If you've connected with a heartbeat parameter,
323             you must send a heartbeat periodically matching connection parameter or
324             the server may snip the connection.
325              
326             =item basic_return($subroutine)
327              
328             C<$subroutine> is a perl coderef that takes two arguments:
329              
330             $channel is the channel on which the message is returned.
331              
332             $m the message which is a hash ref containing reply_code,
333             reply_text, exchange, and routing_key.
334              
335             =back
336              
337             =cut
338              
339             sub publish {
340 17     17 1 19536349 my ($self, $channel, $routing_key, $body, $options, $props) = @_;
341              
342 17   50     90 $options ||= {};
343 17   100     108 $props ||= {};
344              
345             # Do a shallow clone to avoid modifying variable passed by caller
346 17         66 $props = { %$props };
347              
348             # Convert blessed variables in headers to strings
349 17 100       106 if( $props->{headers} ) {
350 4 100       8 $props->{headers} = { map { blessed($_) ? "$_" : $_ } %{ $props->{headers} } };
  30         121  
  4         19  
351             }
352              
353 17         1938 $self->_publish($channel, $routing_key, $body, $options, $props);
354             }
355              
356             1;