File Coverage

blib/lib/AnyEvent/RabbitMQ/PubSub.pm
Criterion Covered Total %
statement 12 14 85.7
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 17 19 89.4


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::PubSub;
2 1     1   437 use 5.010;
  1         2  
3 1     1   4 use strict;
  1         0  
  1         15  
4 1     1   3 use warnings;
  1         7  
  1         22  
5 1     1   766 use AnyEvent;
  1         3675  
  1         26  
6 1     1   421 use AnyEvent::RabbitMQ;
  0            
  0            
7             use Data::Dumper;
8             use Carp qw(longmess);
9              
10             our $VERSION = "3.1.2";
11              
12             sub connect {
13             my %connection_opts = @_;
14              
15             my $cv = AnyEvent->condvar;
16              
17             my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
18             %connection_opts,
19             on_success => sub { _open_channel_given_condvar($cv, @_) },
20             on_failure => sub { _report_error($cv, @_) },
21             on_read_failure => sub { _report_error($cv, @_) },
22             on_return => sub { _report_error($cv, @_) },
23             on_close => sub { _report_error($cv, @_) },
24             );
25              
26             return $cv->recv()
27             }
28              
29             sub open_channel {
30             my ($ar) = @_;
31              
32             my $cv = AnyEvent->condvar;
33             _open_channel_given_condvar($cv, $ar);
34              
35             (undef, my $channel) = $cv->recv();
36              
37             return $channel
38             }
39              
40             sub _open_channel_given_condvar {
41             my ($cv, $ar) = @_;
42              
43             $ar->open_channel(
44             on_success => sub { my $channel = shift; $cv->send($ar, $channel); },
45             on_failure => sub { _report_error($cv, @_) },
46             on_close => sub { _report_error($cv, @_) },
47             )
48             }
49              
50             sub _report_error {
51             my ($cv, $why) = @_;
52             if (ref($why)) {
53             my $method_frame = $why->method_frame;
54             $cv->croak(longmess(
55             sprintf '%s: %s',
56             $method_frame->reply_code || 503,
57             $method_frame->reply_text || 'Something went wrong.',
58             ));
59             }
60             else {
61             $cv->croak(longmess(Dumper($why)));
62             }
63             }
64              
65              
66             1;
67             __END__
68              
69             =encoding utf-8
70              
71             =head1 NAME
72              
73             AnyEvent::RabbitMQ::PubSub - Publish and consume RabbitMQ messages.
74              
75             =head1 SYNOPSIS
76              
77             # print 'received Hello World' and exit
78              
79             use AnyEvent;
80             use AnyEvent::RabbitMQ::PubSub;
81             use AnyEvent::RabbitMQ::PubSub::Publisher;
82             use AnyEvent::RabbitMQ::PubSub::Consumer;
83              
84             my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
85             host => 'localhost',
86             port => 5672,
87             user => 'guest',
88             pass => 'guest',
89             vhost => '/',
90             );
91              
92             my $exchange = {
93             exchange => 'my_test_exchange',
94             type => 'topic',
95             durable => 0,
96             auto_delete => 1,
97             };
98              
99             my $queue = {
100             queue => 'my_test_queue';
101             auto_delete => 1,
102             };
103              
104             my $routing_key = 'my_rk';
105              
106             my $cv = AnyEvent->condvar;
107              
108             my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
109             channel => $channel,
110             exchange => $exchange,
111             queue => $queue,
112             routing_key => $routing_key,
113             );
114             $consumer->init(); #declares channel, queue and binding
115             $consumer->consume(
116             $cv,
117             sub {
118             my ($self, $msg) = @_;
119             print 'received ', $msg->{body}->payload, "\n";
120             $self->channel->ack();
121             $cv->send();
122             },
123             );
124              
125             my $publisher = AnyEvent::RabbitMQ::PubSub::Publisher->new(
126             channel => $channel,
127             exchange => $exchange,
128             routing_key => $routing_key,
129             );
130             $publisher->init(); #declares exchange;
131             $publisher->publish(body => 'Hello World');
132              
133             $cv->recv();
134              
135              
136             =head1 DESCRIPTION
137              
138             AnyEvent::RabbitMQ::PubSub allows to easily create publishers and consumers
139             of RabbitMQ messages.
140              
141             =head1 LICENSE
142              
143             Copyright (C) Avast Software.
144              
145             This library is free software; you can redistribute it and/or modify
146             it under the same terms as Perl itself.
147              
148             =head1 AUTHOR
149              
150             Miroslav Tynovsky E<lt>tynovsky@avast.comE<gt>
151              
152             =cut
153