File Coverage

lib/AMQP/Subscriber.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package AMQP::Subscriber;
2             our $VERSION = '0.01';
3              
4 1     1   627 use Mojo::Base 'AMQP';
  1         2  
  1         10  
5 1     1   572 use AnyEvent::RabbitMQ;
  0            
  0            
6             use Sys::Hostname;
7              
8             has 'debug' => 1;
9             has 'host' => 'localhost';
10             has 'port' => 5672;
11             has 'username' => 'guest';
12             has 'password' => 'guest';
13             has 'vhost' => '/';
14             has 'timeout' => 1;
15             has 'heartbeat' => 30;
16             has 'exchange' => 'test';
17             has 'type' => 'topic';
18             has 'key' => '#';
19             has 'queue' => 'test';
20             has 'rabbit';
21             has 'connection';
22             has 'channel';
23             has 'status';
24             has 'tag' => $ENV{LOGNAME} . "@" . hostname;
25             has 'on_message';
26              
27             sub attach {
28             my $self = shift;
29             $self->useragent(Mojo::UserAgent->new);
30             $self->status(AnyEvent->condvar);
31             $self->rabbit(AnyEvent::RabbitMQ->new);
32             $self->rabbit->load_xml_spec();
33             $self->rabbit->connect(
34             host => $self->host,
35             port => $self->port,
36             username => $self->username,
37             pass => $self->password,
38             vhost => $self->vhost,
39             timeout => $self->timeout,
40             tune => { heartbeat => $self->heartbeat },
41             on_success => sub {
42             say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
43             $self->connection(shift);
44             $self->connection->open_channel(
45             on_failure => $self->status,
46             on_close => sub {
47             say "Channel closed" if $self->debug;
48             $self->status->send;
49             },
50             on_success => sub {
51             say "Opened channel" if $self->debug;
52             $self->channel(shift);
53             $self->channel->declare_exchange(
54             exchange => $self->exchange,
55             type => $self->type,
56             auto_delete => 1,
57             on_failure => $self->status,
58             on_success => sub {
59             say "Declared exchange " . $self->exchange if $self->debug;
60             $self->channel->declare_queue(
61             queue => $self->queue,
62             auto_delete => 1,
63             on_failure => $self->status,
64             on_success => sub {
65             say "Declared queue " . $self->queue if $self->debug;
66             $self->channel->bind_queue(
67             queue => $self->queue,
68             exchange => $self->exchange,
69             routing_key => $self->key,
70             on_failure => $self->status,
71             on_success => sub {
72             say "Bound " . $self->queue . " to " . $self->exchange . " " . $self->key if $self->debug;
73             $self->channel->consume(
74             consumer_tag => $self->tag,
75             on_success => sub {
76             say 'Consuming from ' . $self->queue if $self->debug;
77             },
78             on_consume => sub {
79             my $msg = shift;
80             $self->on_message->($self,$msg);
81             },
82             on_cancel => sub {
83             say "Consumption canceled" if $self->debug;
84             $self->status->send;
85             },
86             on_failure => $self->status,
87             );
88             }
89             );
90             }
91             );
92             }
93             );
94             },
95             );
96             },
97             on_failure => $self->status,
98             on_read_failure => sub {
99             say "Failed to read" if $self->debug;
100             $self->status->send;
101             },
102             on_return => sub {
103             say "Failed to send" if $self->debug;
104             $self->status->send;
105             },
106             on_close => sub {
107             say "Connection closed" if $self->debug;
108             $self->status->send;
109             }
110             );
111             $self->status->recv;
112             }
113            
114              
115             1;
116              
117             __END__