File Coverage

blib/lib/Mojo/RabbitMQ/Client/Consumer.pm
Criterion Covered Total %
statement 12 65 18.4
branch 0 6 0.0
condition 1 2 50.0
subroutine 4 9 44.4
pod 0 2 0.0
total 17 84 20.2


line stmt bran cond sub pod time code
1             package Mojo::RabbitMQ::Client::Consumer;
2 5     5   42 use Mojo::Base 'Mojo::EventEmitter';
  5         14  
  5         55  
3              
4 5     5   992 use Mojo::Promise;
  5         12  
  5         28  
5 5     5   185 use Scalar::Util 'weaken';
  5         11  
  5         432  
6             require Mojo::RabbitMQ::Client;
7              
8 5   50 5   36 use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
  5         9  
  5         5106  
9              
10             has url => undef;
11             has client => undef;
12             has channel => undef;
13             has queue => undef;
14             has setup => 0;
15             has defaults => sub { {} };
16              
17             sub consume_p {
18 0     0 0   my $self = shift;
19              
20 0           my $promise = Mojo::Promise->new()->resolve();
21              
22 0           weaken $self;
23 0 0         unless ($self->client) {
24             $promise = $promise->then(
25             sub {
26 0     0     warn "-- spawn new client\n" if DEBUG;
27 0           my $client_promise = Mojo::Promise->new();
28 0           my $client = Mojo::RabbitMQ::Client->new(url => $self->url);
29 0           $self->client($client);
30              
31             # Catch all client related errors
32 0           $self->client->catch(sub { $client_promise->reject($_[1]) });
  0            
33              
34             # When connection is in Open state, open new channel
35             $client->on(
36             open => sub {
37 0           warn "-- client open\n" if DEBUG;
38 0           $client_promise->resolve;
39             }
40 0           );
41 0           $client->on('close' => sub { shift; $self->emit('close', @_) });
  0            
  0            
42              
43             # Start connection
44 0           $client->connect;
45              
46 0           return $client_promise;
47             }
48 0           );
49             }
50              
51             # Create a new channel with auto-assigned id
52 0 0         unless ($self->channel) {
53             $promise = $promise->then(
54             sub {
55 0     0     warn "-- create new channel\n" if DEBUG;
56 0           my $channel_promise = Mojo::Promise->new;
57 0           my $channel = Mojo::RabbitMQ::Client::Channel->new();
58              
59 0           $channel->catch(sub { $channel_promise->reject($_[1]) });
  0            
60 0           $channel->on(close => sub { warn 'Channel closed: ' . $_[1]->method_frame->reply_text; });
  0            
61              
62             $channel->on(
63             open => sub {
64 0           my ($channel) = @_;
65 0           warn "-- channel opened\n" if DEBUG;
66              
67 0           $self->channel($channel);
68 0           $channel->qos(%{$self->defaults->{qos}})->deliver;
  0            
69 0           $channel_promise->resolve;
70             }
71 0           );
72              
73 0           $self->client->open_channel($channel);
74 0           return $channel_promise;
75             }
76 0           );
77             }
78              
79             # Start consuming messages
80             $promise = $promise->then(
81             sub {
82 0     0     my $consumer_promise = Mojo::Promise->new;
83             my $consumer = $self->channel->consume(
84             queue => $self->client->url->query->param('queue'),
85 0           %{$self->defaults->{consumer}}
  0            
86             );
87             $consumer->on(
88             message => sub {
89 0           warn "-- message received\n" if DEBUG;
90 0           my ($client, $message) = @_;
91 0           $self->emit('message', $message);
92             }
93 0           );
94 0           $consumer->on('success' => sub { $consumer_promise->resolve(@_) });
  0            
95 0           $consumer->deliver;
96 0           return $consumer_promise;
97             }
98 0           );
99              
100 0           return $promise;
101             }
102              
103             sub close {
104 0     0 0   my $self = shift;
105              
106 0 0         if ($self->client) {
107 0           $self->client->close();
108             }
109             }
110              
111             1;
112              
113             =encoding utf8
114              
115             =head1 NAME
116              
117             Mojo::RabbitMQ::Client::Consumer - simple Mojo::RabbitMQ::Client based consumer
118              
119             =head1 SYNOPSIS
120              
121             use Mojo::RabbitMQ::Client::Consumer;
122             my $consumer = Mojo::RabbitMQ::Consumer->new(
123             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
124             defaults => {
125             qos => {prefetch_count => 1},
126             queue => {durable => 1},
127             consumer => {no_ack => 0},
128             }
129             );
130              
131             $consumer->catch(sub { die "Some error caught in Consumer" } );
132             $consumer->on('success' => sub { say "Consumer ready" });
133             $consumer->on(
134             'message' => sub {
135             my ($consumer, $message) = @_;
136              
137             $consumer->channel->ack($message)->deliver;
138             }
139             );
140             $consumer->consume_p->wait;
141              
142             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
143              
144             =head1 DESCRIPTION
145              
146             =head1 EVENTS
147              
148             L inherits all events from L and can emit the
149             following new ones.
150              
151             =head1 ATTRIBUTES
152              
153             L has following attributes.
154              
155             =head1 METHODS
156              
157             L inherits all methods from L and implements
158             the following new ones.
159              
160             =head1 SEE ALSO
161              
162             L
163              
164             =head1 COPYRIGHT AND LICENSE
165              
166             Copyright (C) 2015-2017, Sebastian Podjasek and others
167              
168             This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
169              
170             =cut