File Coverage

blib/lib/Crixa/Exchange.pm
Criterion Covered Total %
statement 34 34 100.0
branch 5 10 50.0
condition 3 7 42.8
subroutine 9 9 100.0
pod 3 4 75.0
total 54 64 84.3


line stmt bran cond sub pod time code
1             package Crixa::Exchange;
2              
3 1     1   5 use strict;
  1         2  
  1         35  
4 1     1   4 use warnings;
  1         1  
  1         27  
5 1     1   5 use namespace::autoclean;
  1         2  
  1         8  
6              
7             our $VERSION = '0.12';
8              
9 1     1   70 use Moose;
  1         1  
  1         7  
10              
11             with qw(Crixa::HasMQ);
12              
13             has name => (
14             isa => 'Str',
15             is => 'ro',
16             required => 1,
17             );
18              
19             has channel => (
20             isa => 'Crixa::Channel',
21             is => 'ro',
22             required => 1,
23             );
24              
25             has exchange_type => (
26             isa => 'Str',
27             is => 'ro',
28             default => 'direct',
29             );
30              
31             has passive => (
32             is => 'ro',
33             isa => 'Bool',
34             default => 0,
35             );
36              
37             has durable => (
38             is => 'ro',
39             isa => 'Bool',
40             default => 0,
41             );
42              
43             has auto_delete => (
44             is => 'ro',
45             isa => 'Bool',
46             default => 1,
47             );
48              
49             sub BUILD {
50 8     8 0 15 my $self = shift;
51 8         183 $self->_mq->exchange_declare(
52             $self->channel->id,
53             $self->name,
54             $self->_props,
55             );
56             }
57              
58             sub queue {
59 8     8 1 56 my $self = shift;
60 8 50       34 my $args = @_ == 1 ? $_[0] : {@_};
61              
62 8   50     26 my $routing = delete $args->{routing_keys} // [];
63 8         164 my $q = $self->channel->queue($args);
64              
65 8         15 for my $key (@$routing) {
66 8         146 $q->bind( { exchange => $self->name, routing_key => $key } );
67             }
68              
69 8         1495 return $q;
70             }
71              
72             sub publish {
73 20     20 1 2765 my $self = shift;
74 20 50       50 my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : { body => $_[0] };
    50          
75              
76 20   50     48 my $routing_key = delete $args->{routing_key} // q{};
77 20   33     46 my $body = delete $args->{body}
78             || confess(
79             'You must supply a body when calling the publish() method');
80 20         17 my $props = delete $args->{props};
81              
82 20         414 $args->{exchange} = $self->name;
83              
84 20         362 return $self->_mq->publish(
85             $self->channel->id,
86             $routing_key,
87             $body,
88             $args,
89             $props,
90             );
91             }
92              
93             ## no critic (Subroutines::ProhibitBuiltinHomonyms)
94             sub delete {
95 8     8 1 793 my $self = shift;
96 8 50       34 my $args = @_ > 1 ? {@_} : ref $_[0] ? $_[0] : {};
    50          
97              
98 8         154 $self->_mq->exchange_delete( $self->channel->id, $self->name, $args );
99             }
100             ## use critic
101              
102             sub _props {
103 8     8   8 my $self = shift;
104              
105 8         18 return { map { $_ => $self->$_() }
  32         660  
106             qw( exchange_type passive durable auto_delete ) };
107             }
108              
109             __PACKAGE__->meta->make_immutable;
110              
111             1;
112              
113             # ABSTRACT: A Crixa Exchange
114              
115             __END__
116              
117             =pod
118              
119             =head1 NAME
120              
121             Crixa::Exchange - A Crixa Exchange
122              
123             =head1 VERSION
124              
125             version 0.12
126              
127             =head1 DESCRIPTION
128              
129             This class represents a single exchange. With RabbitMQ, messages are published
130             to an exchange. Queues can then connect to exchanges and receive those messages.
131              
132             =encoding UTF-8
133              
134             =head1 METHODS
135              
136             This class provides the following methods:
137              
138             =head2 Crixa::Exchange->new
139              
140             You should not call this method directly under normal circumstances. Instead,
141             you should create an exchange by calling the C<exchange> method on a
142             L<Crixa::Channel> object. However, you need to know what parameters the
143             constructor accepts.
144              
145             =over 4
146              
147             =item * name => $name
148              
149             This is a required string. Note that the empty string is acceptable here, as
150             this is the default exchange name for RabbitMQ.
151              
152             =item * exchange_type => $type
153              
154             This is an optional string. It can be any type of exchange supported by
155             RabbitMQ, including those provided by plugins.
156              
157             This defaults to "direct".
158              
159             =item * passive => $bool
160              
161             If this is true, then the constructor will throw an error B<unless the
162             exchange already exists>.
163              
164             This defaults to false.
165              
166             =item * durable => $bool
167              
168             If this is true, then the exchange will remain active across server
169             restarts.
170              
171             B<This has nothing to do with whether messages are stored! In order to make
172             sure messages are written to disk, you must declare the I<queue> as durable>
173              
174             This defaults to false.
175              
176             =item * auto_delete => $bool
177              
178             If this is true, then the exchange will be deleted when all queues have
179             finished using it.
180              
181             This defaults to true.
182              
183             =back
184              
185             =head2 $exchange->publish(...)
186              
187             This method sends a message to the exchange. It accepts either a hash or
188             hashref with the following keys:
189              
190             =over 4
191              
192             =item * routing_key
193              
194             This is an optional routing key for the message. If this is not provided then
195             the empty string is used.
196              
197             =item * body
198              
199             This is the message body. This should be a scalar containing any sort of data.
200              
201             =item * mandatory
202              
203             If this is true, then if the message cannot be routed to a queue, the server
204             will return an unroutable message. This defaults to false.
205              
206             B<Note that as of this writing L<Net::AMQP::RabbitMQ> does not support return
207             messages from publishing, so this flag is not really useful.>
208              
209             =item * immediate
210              
211             If this is true, then if the message cannot be routed immediately, the server
212             will return an undeliverable message. This defaults to false.
213              
214             B<Note that as of this writing L<Net::AMQP::RabbitMQ> does not support return
215             messages from publishing, so this flag is not really useful.>
216              
217             =item * props
218              
219             This is an optional hashref containing message metadata:
220              
221             =over 8
222              
223             =item * content_type => $ct
224              
225             This should be a MIME type like "application/json" or "text/plain". This is
226             exactly like an HTTP Content-Type header.
227              
228             Note that RabbitMQ doesn't really care about the content of your message. This
229             is for the benefit of whatever code eventually consumes this message.
230              
231             =item * content_encoding => $enc
232              
233             The MIME content encoding of the message. This is exactly like an HTTP
234             Content-Encoding header.
235              
236             Note that RabbitMQ doesn't really care about the content of your message. This
237             is for the benefit of whatever code eventually consumes this message.
238              
239             =item * message_id => $id
240              
241             A unique identifier for the message that you create.
242              
243             =item * correlation_id => $correlation
244              
245             This is the ID of the message for which the message you're publishing is a
246             reply to.
247              
248             =item * reply_to => $reply_to
249              
250             This is typically used to name the queue to which reply messages should be
251             sent.
252              
253             =item * expiration => $expiration
254              
255             This is the message expiration as a number in milliseconds. If the message
256             cannot be delivered within that time frame it will be discarded.
257              
258             If both the queue and the message define an expiration time, the lower of the
259             two will be used.
260              
261             =item * type => $type
262              
263             The message type (not content type) as a string. This could be something like
264             "order" or "email", etc.
265              
266             =item * user_id => $user_id
267              
268             A string identifying the message's sender. If this is provided, then RabbitMQ
269             requires that this be identical to the username used to connect to RabbitMQ.
270              
271             =item * app_id => $app_id
272              
273             This is a string identifying the app that sent the message. For example, you
274             might used something like "webcrawler" or "rest-api".
275              
276             =item * delivery_mode => $mode
277              
278             This can either be 1 or 2. A 1 is "non-persistent" and a 2 is
279             "persistent". This defines whether or not the message is stored on disk so
280             that it can be recovered across RabbitMQ server restarts.
281              
282             Note that even if you set the exchange and queue to be durable, you still must
283             specify the C<delivery_mode> as persistent in order for it to be saved!
284              
285             =item * timestamp => $epoch
286              
287             This is an epoch time indicating when the message was sent.
288              
289             =item * priority => $priority
290              
291             This can be a number from 0 to 9, but RabbitMQ ignores this.
292              
293             =item * headers => { ... }
294              
295             An arbitrary hashref of headers. This is something like "X-*" headers in
296             HTTP. You can put anything you want here. This is used when matching messages
297             to queues via a headers-type exchange.
298              
299             =back
300              
301             You are strongly encouraged to use the well-known properties listed above
302             instead of encoding similar information in the message body or in the
303             C<headers> property.
304              
305             =back
306              
307             B<Note that if you publish a message and there is no queue bound to the
308             exchange which can receive that message, the message will be discarded. This
309             means you must create your exchanges and queues I<before> you publish any
310             messages.>.
311              
312             =head2 $exchange->queue(...)
313              
314             This returns a new L<Crixa::Queue> object. This method accepts all of the
315             arguments that can be passed to the L<Crixa::Queue> constructor, either as a
316             hash or hashref. See the L<Crixa::Queue> documentation for more details.
317              
318             In addition, it also accepts a C<routing_keys> parameter, which should be an
319             arrayref of strings. If these are provided, then the queue is bound to the
320             exchange with each string in the arrayref as a routing key.
321              
322             This is a convenient way of declaring and binding a queue all at once.
323              
324             =head2 $exchange->delete(...)
325              
326             This deletes the exchange. It accepts either a hash or hashref with the
327             following keys:
328              
329             =over 4
330              
331             =item * if_unused => $bool
332              
333             If this is true, then the exchange is only deleted if it has no queue
334             bindings. This defaults to true.
335              
336             =item * no_wait => $bool
337              
338             If this is true, then the method returns immediately without getting
339             confirmation from the server. This defaults to false.
340              
341             =back
342              
343             =head2 $exchange->name(...)
344              
345             This returns the exchange name as passed to the constructor.
346              
347             =head2 $exchange->channel
348              
349             Returns the L<Crixa::Channel> that this exchange uses.
350              
351             =head2 $exchange->exchange_type
352              
353             This returns the exchange type as passed to the constructor or set by a
354             default.
355              
356             =head2 $exchange->passive
357              
358             This returns the passive flag as passed to the constructor or set by a
359             default.
360              
361             =head2 $exchange->durable
362              
363             This returns the durable flag as passed to the constructor or set by a
364             default.
365              
366             =head2 $exchange->auto_delete
367              
368             This returns the auto-delete flag as passed to the constructor or set by a
369             default.
370              
371             =head1 AUTHORS
372              
373             =over 4
374              
375             =item *
376              
377             Chris Prather <chris@prather.org>
378              
379             =item *
380              
381             Dave Rolsky <autarch@urth.org>
382              
383             =back
384              
385             =head1 COPYRIGHT AND LICENSE
386              
387             This software is copyright (c) 2012 - 2015 by Chris Prather.
388              
389             This is free software; you can redistribute it and/or modify it under
390             the same terms as the Perl 5 programming language system itself.
391              
392             =cut