File Coverage

blib/lib/Mojo/RabbitMQ/Client.pm
Criterion Covered Total %
statement 90 297 30.3
branch 22 94 23.4
condition 6 35 17.1
subroutine 22 56 39.2
pod 8 10 80.0
total 148 492 30.0


line stmt bran cond sub pod time code
1             package Mojo::RabbitMQ::Client;
2 5     5   177285 use Mojo::Base 'Mojo::EventEmitter';
  5         798222  
  5         49  
3              
4 5     5   8686 use Carp qw(croak confess);
  5         11  
  5         264  
5 5     5   2219 use Mojo::URL;
  5         37641  
  5         41  
6 5     5   2353 use Mojo::Home;
  5         155222  
  5         274  
7 5     5   2590 use Mojo::IOLoop;
  5         552038  
  5         40  
8 5     5   302 use Mojo::Parameters;
  5         11  
  5         43  
9 5     5   221 use Mojo::Promise;
  5         11  
  5         59  
10 5     5   180 use Mojo::Util qw(url_unescape dumper);
  5         12  
  5         290  
11 5     5   29 use List::Util qw(none);
  5         13  
  5         260  
12 5     5   71 use Scalar::Util qw(blessed weaken);
  5         12  
  5         237  
13 5     5   30 use File::Basename 'dirname';
  5         12  
  5         331  
14 5     5   2600 use File::ShareDir qw(dist_file);
  5         119407  
  5         377  
15              
16 5     5   2276 use Net::AMQP;
  5         285338  
  5         207  
17 5     5   47 use Net::AMQP::Common qw(:all);
  5         12  
  5         1043  
18              
19 5     5   3543 use Mojo::RabbitMQ::Client::Channel;
  5         14  
  5         39  
20 5     5   219 use Mojo::RabbitMQ::Client::LocalQueue;
  5         13  
  5         29  
21             require Mojo::RabbitMQ::Client::Consumer;
22             require Mojo::RabbitMQ::Client::Publisher;
23              
24             our $VERSION = "0.3.1";
25              
26 5   50 5   397 use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
  5         12  
  5         23441  
27              
28             has is_open => 0;
29             has url => undef;
30             has tls => sub { shift->_uri_handler('tls') };
31             has user => sub { shift->_uri_handler('user') };
32             has pass => sub { shift->_uri_handler('pass') };
33             has host => sub { shift->_uri_handler('host') };
34             has port => sub { shift->_uri_handler('port') };
35             has vhost => sub { shift->_uri_handler('vhost') };
36             has params => sub { shift->_uri_handler('params') // Mojo::Parameters->new };
37             has connect_timeout => sub { $ENV{MOJO_CONNECT_TIMEOUT} // 10 };
38             has heartbeat_timeout => 60;
39             has heartbeat_received => 0; # When did we receive last heartbeat
40             has heartbeat_sent => 0; # When did we sent last heartbeat
41             has ioloop => sub { Mojo::IOLoop->singleton };
42             has max_buffer_size => 16384;
43             has max_channels => 0;
44             has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new };
45             has channels => sub { {} };
46             has stream_id => undef;
47              
48             sub connect {
49 0     0 1 0 my $self = shift;
50 0         0 $self->{buffer} = '';
51              
52 0         0 my $id;
53 0     0   0 $id = $self->_connect(sub { $self->_connected($id) });
  0         0  
54 0         0 $self->stream_id($id);
55              
56 0         0 return $id;
57             }
58              
59             sub connect_p {
60 0     0 0 0 my $self = shift;
61 0         0 my $promise = Mojo::Promise->new;
62              
63 0         0 my $id;
64              
65 0         0 weaken $self;
66             my $handler = sub {
67 0     0   0 my ($err) = @_;
68 0 0       0 if (defined $err) {
69 0         0 return $promise->reject($err);
70             }
71              
72 0         0 return $promise->resolve($self);
73 0         0 };
74              
75 0     0   0 $id = $self->_connect(sub { $self->_connected($id, $handler) });
  0         0  
76 0         0 $self->stream_id($id);
77              
78 0         0 return $promise;
79             }
80              
81             sub consumer {
82 2     2 1 1480 my ($class, @params) = @_;
83 2 100       24 croak "consumer is a static method" if ref $class;
84              
85 1         12 return Mojo::RabbitMQ::Client::Consumer->new(@params);
86             }
87              
88             sub publisher {
89 2     2 1 1338 my ($class, @params) = @_;
90 2 100       16 croak "publisher is a static method" if ref $class;
91              
92 1         8 return Mojo::RabbitMQ::Client::Publisher->new(@params);
93             }
94              
95             sub param {
96 6     6 1 10682 my $self = shift;
97 6 50       20 return undef unless defined $self->params;
98 6         61 return $self->params->param(@_);
99             }
100              
101             sub add_channel {
102 0     0 1 0 my $self = shift;
103 0         0 my $channel = shift;
104              
105 0         0 my $id = $channel->id;
106 0 0 0     0 if ($id and $self->channels->{$id}) {
107 0         0 return $channel->emit(
108             error => 'Channel with id: ' . $id . ' already defined');
109             }
110              
111 0 0 0     0 if ($self->max_channels > 0
112 0         0 and scalar keys %{$self->channels} >= $self->max_channels)
113             {
114 0         0 return $channel->emit(error => 'Maximum number of channels reached');
115             }
116              
117 0 0       0 if (not $id) {
118 0         0 for my $candidate_id (1 .. (2**16 - 1)) {
119 0 0       0 next if defined $self->channels->{$candidate_id};
120 0         0 $id = $candidate_id;
121 0         0 last;
122             }
123 0 0       0 unless ($id) {
124 0         0 return $channel->emit(error => 'Ran out of channel ids');
125             }
126             }
127              
128 0         0 $self->channels->{$id} = $channel->id($id)->client($self);
129 0         0 weaken $channel->{client};
130              
131 0         0 return $channel;
132             }
133              
134             sub acquire_channel_p {
135 0     0 0 0 my $self = shift;
136              
137 0         0 my $promise = Mojo::Promise->new;
138              
139 0         0 my $channel = Mojo::RabbitMQ::Client::Channel->new();
140 0     0   0 $channel->catch(sub { $promise->reject(@_); undef $promise });
  0         0  
  0         0  
141 0     0   0 $channel->on(close => sub { warn "Channel closed" });
  0         0  
142 0     0   0 $channel->on(open => sub { $promise->resolve(@_); undef $promise });
  0         0  
  0         0  
143              
144 0         0 $self->open_channel($channel);
145              
146 0         0 return $promise;
147             }
148              
149             sub open_channel {
150 0     0 1 0 my $self = shift;
151 0         0 my $channel = shift;
152              
153 0 0       0 return $channel->emit(error => 'Client connection not opened')
154             unless $self->is_open;
155              
156 0         0 $self->add_channel($channel)->open;
157              
158 0         0 return $self;
159             }
160              
161             sub delete_channel {
162 0     0 1 0 my $self = shift;
163 0         0 return delete $self->channels->{shift};
164             }
165              
166             sub close {
167 0     0 1 0 my $self = shift;
168              
169 0         0 weaken $self;
170             $self->_write_expect(
171             'Connection::Close' => {},
172             'Connection::CloseOk' => sub {
173 0     0   0 warn "-- Connection::CloseOk\n" if DEBUG;
174 0         0 $self->emit('close');
175 0         0 $self->_close;
176             },
177             sub {
178 0     0   0 $self->_close;
179             }
180 0         0 );
181             }
182              
183 0     0   0 sub _loop { $_[0]->ioloop }
184              
185             sub _error {
186 0     0   0 my ($self, $id, $err) = @_;
187              
188 0         0 $self->emit(error => $err);
189             }
190              
191             sub _uri_handler {
192 20     20   38 my $self = shift;
193 20         34 my $attr = shift;
194              
195 20 100       49 return undef unless defined $self->url;
196              
197 18 50 33     135 $self->url(Mojo::URL->new($self->url))
198             unless blessed $self->url && $self->url->isa('Mojo::URL');
199              
200             # Set some defaults
201 18         3491 my %defaults = (
202             tls => 0,
203             user => undef,
204             pass => undef,
205             host => 'localhost',
206             port => 5672,
207             vhost => '/',
208             params => undef
209             );
210              
211             # Check secure scheme in url
212 18 100       46 $defaults{tls} = 1
213             if $self->url->scheme
214             =~ /^(amqp|rabbitmq)s$/; # Fallback support for rabbitmq scheme name
215 18 100       153 $defaults{port} = 5671 if $defaults{tls};
216              
217             # Get host & port
218 18 100 66     43 $defaults{host} = $self->url->host
219             if defined $self->url->host && $self->url->host ne '';
220 18 100       287 $defaults{port} = $self->url->port if defined $self->url->port;
221              
222             # Get user & password
223 18         140 my $userinfo = $self->url->userinfo;
224 18 100       107 if (defined $userinfo) {
225 5         23 my ($user, $pass) = split /:/, $userinfo;
226 5         11 $defaults{user} = $user;
227 5         9 $defaults{pass} = $pass;
228             }
229              
230 18         42 my $vhost = url_unescape $self->url->path;
231 18         1728 $vhost =~ s|^/(.+)$|$1|;
232 18 100 66     842 $defaults{vhost} = $vhost if defined $vhost && $vhost ne '';
233              
234             # Query params
235 18         526 my $params = $defaults{params} = $self->url->query;
236              
237             # Handle common aliases to internal names
238 18         382 my %aliases = (
239             cacertfile => 'ca',
240             certfile => 'cert',
241             keyfile => 'key',
242             fail_if_no_peer_cert => 'verify',
243             connection_timeout => 'timeout'
244             );
245             $params->param($aliases{$_}, $params->param($_))
246 18         60 foreach grep { defined $params->param($_) } keys %aliases;
  90         2086  
247              
248             # Some query parameters are translated to attribute values
249 18         760 my %attributes = (
250             heartbeat_timeout => 'heartbeat',
251             connect_timeout => 'timeout',
252             max_channels => 'channel_max'
253             );
254             $self->$_($params->param($attributes{$_}))
255 18         45 foreach grep { defined $params->param($attributes{$_}) } keys %attributes;
  54         695  
256              
257             # Set all
258 18         502 $self->$_($defaults{$_}) foreach keys %defaults;
259              
260 18         679 return $self->$attr;
261             }
262              
263             sub _close {
264 0     0   0 my $self = shift;
265 0         0 $self->_loop->stream($self->stream_id)->close_gracefully;
266             }
267              
268             sub _handle {
269 0     0   0 my ($self, $id, $close) = @_;
270              
271 0         0 $self->emit('disconnect');
272              
273 0         0 $self->_loop->remove($id);
274             }
275              
276             sub _read {
277 0     0   0 my ($self, $id, $chunk) = @_;
278              
279 0         0 warn "<- @{[dumper $chunk]}" if DEBUG;
280 0         0 $self->{buffer} .= $chunk;
281 0         0 $self->_parse_frames;
282              
283 0         0 return;
284             }
285              
286             sub _parse_frames {
287 0     0   0 my $self = shift;
288              
289 0         0 for my $frame (Net::AMQP->parse_raw_frames(\$self->{buffer})) {
290              
291 0 0 0     0 if ($frame->isa('Net::AMQP::Frame::Heartbeat')) {
    0          
    0          
292 0         0 $self->heartbeat_received(time());
293             }
294             elsif ($frame->isa('Net::AMQP::Frame::Method')
295             and $frame->method_frame->isa('Net::AMQP::Protocol::Connection::Close'))
296             {
297 0         0 $self->is_open(0);
298              
299 0         0 $self->_write_frame(Net::AMQP::Protocol::Connection::CloseOk->new());
300             $self->emit(disconnect => "Server side disconnection: "
301 0         0 . $frame->method_frame->{reply_text});
302             }
303             elsif ($frame->channel == 0) {
304 0         0 $self->queue->push($frame);
305             }
306             else {
307 0         0 my $channel = $self->channels->{$frame->channel};
308 0 0       0 if (defined $channel) {
309 0         0 $channel->_push_queue_or_consume($frame);
310             }
311             else {
312 0   0     0 $self->emit(
313             error => "Unknown channel id received: "
314             . ($frame->channel // '(undef)'),
315             $frame
316             );
317             }
318             }
319             }
320             }
321              
322             sub _connect {
323 0     0   0 my ($self, $cb) = @_;
324              
325             # Options
326             # Parse according to (https://www.rabbitmq.com/uri-spec.html)
327 0         0 my $options = {
328             address => $self->host,
329             port => $self->port,
330             timeout => $self->connect_timeout,
331             tls => $self->tls,
332             tls_ca => scalar $self->param('ca'),
333             tls_cert => scalar $self->param('cert'),
334             tls_key => scalar $self->param('key')
335             };
336 0         0 my $verify = $self->param('verify');
337 0 0       0 $options->{tls_verify} = hex $verify if defined $verify;
338              
339             # Connect
340 0         0 weaken $self;
341 0         0 my $id;
342             return $id = $self->_loop->client(
343             $options => sub {
344 0     0   0 my ($loop, $err, $stream) = @_;
345              
346             # Connection error
347 0 0       0 return unless $self;
348 0 0       0 return $self->_error($id, $err) if $err;
349              
350 0         0 $self->emit(connect => $stream);
351              
352             # Connection established
353 0         0 $stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') });
  0         0  
354 0 0       0 $stream->on(close => sub { $self && $self->_handle($id, 1) });
  0         0  
355 0 0       0 $stream->on(error => sub { $self && $self->_error($id, pop) });
  0         0  
356 0 0       0 $stream->on(read => sub { $self && $self->_read($id, pop) });
  0         0  
357 0         0 $cb->();
358             }
359 0         0 );
360             }
361              
362             sub _connected {
363 0     0   0 my ($self, $id, $cb) = @_;
364              
365             # Inactivity timeout
366 0         0 my $stream = $self->_loop->stream($id)->timeout(0);
367              
368             # Store connection information in transaction
369 0         0 my $handle = $stream->handle;
370              
371             # Detect that xml spec was already loaded
372 0         0 my $loaded = eval { Net::AMQP::Protocol::Connection::StartOk->new; 1 };
  0         0  
  0         0  
373 0 0       0 unless ($loaded) { # Load AMQP specs
374 0         0 my $file = "amqp0-9-1.stripped.extended.xml";
375              
376             # Original spec is in "fixed_amqp0-8.xml"
377 0         0 my $share = dist_file('Mojo-RabbitMQ-Client', $file);
378 0         0 Net::AMQP::Protocol->load_xml_spec($share);
379             }
380              
381 0         0 $self->_write($id => Net::AMQP::Protocol->header);
382              
383 0         0 weaken $self;
384             $self->_expect(
385             'Connection::Start' => sub {
386 0     0   0 my $frame = shift;
387              
388 0         0 my @server_mechanisms = split /\s/, $frame->method_frame->mechanisms;
389 0   0     0 my $param_mechanism = $self->param('auth_mechanism') // '';
390 0         0 my @client_mechanisms = ('AMQPLAIN', 'EXTERNAL');
391 0 0       0 @client_mechanisms = ($param_mechanism) if ($param_mechanism);
392 0         0 warn "-- Server mechanisms: @server_mechanisms\n" if DEBUG;
393 0         0 warn "-- Client mechanisms: @client_mechanisms\n" if DEBUG;
394 0         0 my $mechanism;
395 0         0 for my $cand (@client_mechanisms) {
396 0 0       0 if (grep { $_ eq $cand } @server_mechanisms) {
  0         0  
397 0         0 $mechanism = $cand;
398 0         0 last;
399             }
400             }
401 0 0       0 return $self->emit(error => 'No authentication mechanism could be negotiated')
402             unless $mechanism;
403              
404 0         0 my @locales = split /\s/, $frame->method_frame->locales;
405             return $self->emit(error => 'en_US is not found in locales')
406 0 0       0 if none { $_ eq 'en_US' } @locales;
  0         0  
407              
408 0         0 $self->{_server_properties} = $frame->method_frame->server_properties;
409              
410 0         0 warn "-- Connection::Start {product: " . $self->{_server_properties}->{product} . ", version: " . $self->{_server_properties}->{version} . "}\n" if DEBUG;
411 0         0 $self->_write_frame(
412             Net::AMQP::Protocol::Connection::StartOk->new(
413             client_properties => {
414             platform => 'Perl',
415             product => __PACKAGE__,
416             information => 'https://github.com/inway/mojo-rabbitmq-client',
417             version => __PACKAGE__->VERSION,
418             },
419             mechanism => $mechanism,
420             response => {LOGIN => $self->user, PASSWORD => $self->pass},
421             locale => 'en_US',
422             ),
423             );
424              
425 0         0 $self->_tune($id, $cb);
426             },
427             sub {
428 0     0   0 $self->emit(error => 'Unable to start connection: ' . shift);
429             }
430 0         0 );
431             }
432              
433             sub _tune {
434 0     0   0 my ($self, $id, $cb) = @_;
435              
436 0         0 weaken $self;
437             $self->_expect(
438             'Connection::Tune' => sub {
439 0     0   0 my $frame = shift;
440              
441 0         0 my $method_frame = $frame->method_frame;
442 0         0 $self->max_buffer_size($method_frame->frame_max);
443              
444 0   0     0 my $heartbeat = $self->heartbeat_timeout || $method_frame->heartbeat;
445              
446 0         0 warn "-- Connection::Tune {frame_max: " . $method_frame->frame_max . ", heartbeat: " . $method_frame->heartbeat . "}\n" if DEBUG;
447             # Confirm
448 0         0 $self->_write_frame(
449             Net::AMQP::Protocol::Connection::TuneOk->new(
450             channel_max => $method_frame->channel_max,
451             frame_max => $method_frame->frame_max,
452             heartbeat => $heartbeat,
453             ),
454             );
455              
456             # According to https://www.rabbitmq.com/amqp-0-9-1-errata.html
457             # The client should start sending heartbeats after receiving a Connection.Tune
458             # method, and start monitoring heartbeats after sending Connection.Open.
459             # -and-
460             # Heartbeat frames are sent about every timeout / 2 seconds. After two missed
461             # heartbeats, the peer is considered to be unreachable.
462             $self->{heartbeat_tid} = $self->_loop->recurring(
463             $heartbeat / 2 => sub {
464 0 0       0 return unless time() - $self->heartbeat_sent > $heartbeat / 2;
465 0         0 $self->_write_frame(Net::AMQP::Frame::Heartbeat->new());
466 0         0 $self->heartbeat_sent(time());
467             }
468 0 0       0 ) if $heartbeat;
469              
470             $self->_write_expect(
471             'Connection::Open' =>
472             {virtual_host => $self->vhost, capabilities => '', insist => 1,},
473             'Connection::OpenOk' => sub {
474 0         0 warn "-- Connection::OpenOk\n" if DEBUG;
475              
476 0         0 $self->is_open(1);
477 0         0 $self->emit('open');
478 0 0       0 $cb->() if defined $cb;
479             },
480             sub {
481 0         0 my $err = shift;
482 0         0 $self->emit(error => 'Unable to open connection: ' . $err);
483 0 0       0 $cb->($err) if defined $cb;
484             }
485 0         0 );
486             },
487             sub {
488 0     0   0 $self->emit(error => 'Unable to tune connection: ' . shift);
489             }
490 0         0 );
491             }
492              
493             sub _write_expect {
494 0     0   0 my $self = shift;
495 0         0 my ($method, $args, $exp, $cb, $failure_cb, $channel_id) = @_;
496 0         0 $method = 'Net::AMQP::Protocol::' . $method;
497              
498 0   0     0 $channel_id ||= 0;
499              
500 0         0 my $method_frame = Net::AMQP::Frame::Method->new(
501             method_frame => $method->new(%$args)
502             );
503              
504 0         0 $self->_write_frame(
505             $method_frame,
506             $channel_id
507             );
508              
509 0         0 return $self->_expect($exp, $cb, $failure_cb, $channel_id);
510             }
511              
512             sub _expect {
513 0     0   0 my $self = shift;
514 0         0 my ($exp, $cb, $failure_cb, $channel_id) = @_;
515 0 0       0 my @expected = ref($exp) eq 'ARRAY' ? @$exp : ($exp);
516              
517 0   0     0 $channel_id ||= 0;
518              
519 0         0 my $queue;
520 0 0       0 if (!$channel_id) {
521 0         0 $queue = $self->queue;
522             }
523             else {
524 0         0 my $channel = $self->channels->{$channel_id};
525 0 0       0 if (defined $channel) {
526 0         0 $queue = $channel->queue;
527             }
528             else {
529 0   0     0 $failure_cb->(
530             "Unknown channel id received: " . ($channel_id // '(undef)'));
531             }
532             }
533              
534 0 0       0 return unless $queue;
535              
536             $queue->get(
537             sub {
538 0     0   0 my $frame = shift;
539              
540 0 0       0 return $failure_cb->("Received data is not method frame")
541             if not $frame->isa("Net::AMQP::Frame::Method");
542              
543 0         0 my $method_frame = $frame->method_frame;
544 0         0 for my $exp (@expected) {
545 0 0       0 return $cb->($frame)
546             if $method_frame->isa("Net::AMQP::Protocol::" . $exp);
547             }
548              
549 0         0 $failure_cb->("Method is not "
550             . join(', ', @expected)
551             . ". It's "
552             . ref($method_frame));
553             }
554 0         0 );
555             }
556              
557             sub _write_frame {
558 0     0   0 my $self = shift;
559 0         0 my $id = $self->stream_id;
560 0         0 my ($out, $channel, $cb) = @_;
561              
562 0 0       0 if ($out->isa('Net::AMQP::Protocol::Base')) {
563 0         0 $out = $out->frame_wrap;
564             }
565 0   0     0 $out->channel($channel // 0);
566              
567 0         0 return $self->_write($id, $out->to_raw_frame, $cb);
568             }
569              
570             sub _write {
571 0     0   0 my $self = shift @_;
572 0         0 my $id = shift @_;
573 0         0 my $frame = shift @_;
574 0         0 my $cb = shift @_;
575              
576 0         0 warn "-> @{[dumper $frame]}" if DEBUG;
577              
578 0         0 utf8::downgrade($frame);
579 0 0       0 $self->_loop->stream($id)->write($frame => $cb)
580             if defined $self->_loop->stream($id);
581             }
582              
583             sub DESTROY {
584 20     20   51739 my $self = shift;
585 20 50       63 my $ioloop = $self->ioloop or return;
586 20         139 my $heartbeat_tid = $self->{heartbeat_tid};
587              
588 20 50       324 $ioloop->remove($heartbeat_tid) if $heartbeat_tid;
589             }
590              
591             1;
592              
593             =encoding utf8
594              
595             =head1 NAME
596              
597             Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client
598              
599             =head1 SYNOPSIS
600              
601             use Mojo::RabbitMQ::Client;
602              
603             # Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
604             my $client = Mojo::RabbitMQ::Client->new(
605             url => 'amqp://guest:guest@127.0.0.1:5672/');
606              
607             # Catch all client related errors
608             $client->catch(sub { warn "Some error caught in client"; });
609              
610             # When connection is in Open state, open new channel
611             $client->on(
612             open => sub {
613             my ($client) = @_;
614              
615             # Create a new channel with auto-assigned id
616             my $channel = Mojo::RabbitMQ::Client::Channel->new();
617              
618             $channel->catch(sub { warn "Error on channel received"; });
619              
620             $channel->on(
621             open => sub {
622             my ($channel) = @_;
623             $channel->qos(prefetch_count => 1)->deliver;
624              
625             # Publish some example message to test_queue
626             my $publish = $channel->publish(
627             exchange => 'test',
628             routing_key => 'test_queue',
629             body => 'Test message',
630             mandatory => 0,
631             immediate => 0,
632             header => {}
633             );
634             # Deliver this message to server
635             $publish->deliver;
636              
637             # Start consuming messages from test_queue
638             my $consumer = $channel->consume(queue => 'test_queue');
639             $consumer->on(message => sub { say "Got a message" });
640             $consumer->deliver;
641             }
642             );
643             $channel->on(close => sub { $log->error('Channel closed') });
644              
645             $client->open_channel($channel);
646             }
647             );
648              
649             # Start connection
650             $client->connect();
651              
652             # Start Mojo::IOLoop if not running already
653             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
654              
655             =head2 CONSUMER
656              
657             use Mojo::RabbitMQ::Client;
658             my $consumer = Mojo::RabbitMQ::Client->consumer(
659             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
660             defaults => {
661             qos => {prefetch_count => 1},
662             queue => {durable => 1},
663             consumer => {no_ack => 0},
664             }
665             );
666              
667             $consumer->catch(sub { die "Some error caught in Consumer" } );
668             $consumer->on('success' => sub { say "Consumer ready" });
669             $consumer->on(
670             'message' => sub {
671             my ($consumer, $message) = @_;
672              
673             $consumer->channel->ack($message)->deliver;
674             }
675             );
676             $consumer->start();
677              
678             Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
679              
680             =head2 PUBLISHER
681              
682             use Mojo::RabbitMQ::Client;
683             my $publisher = Mojo::RabbitMQ::Client->publisher(
684             url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
685             );
686              
687             $publisher->publish('plain text');
688              
689             $publisher->publish(
690             {encode => { to => 'json'}},
691             routing_key => 'mojo_mq'
692             )->then(sub {
693             say "Message published";
694             })->catch(sub {
695             die "Publishing failed"
696             })->wait;
697              
698             =head1 DESCRIPTION
699              
700             L is a rewrite of L to work on top of L.
701              
702             =head1 EVENTS
703              
704             L inherits all events from L and can emit the
705             following new ones.
706              
707             =head2 connect
708              
709             $client->on(connect => sub {
710             my ($client, $stream) = @_;
711             ...
712             });
713              
714             Emitted when TCP/IP connection with RabbitMQ server is established.
715              
716             =head2 open
717              
718             $client->on(open => sub {
719             my ($client) = @_;
720             ...
721             });
722              
723             Emitted AMQP protocol Connection.Open-Ok method is received.
724              
725             =head2 close
726              
727             $client->on(close => sub {
728             my ($client) = @_;
729             ...
730             });
731              
732             Emitted on reception of Connection.Close-Ok method.
733              
734             =head2 disconnect
735              
736             $client->on(close => sub {
737             my ($client) = @_;
738             ...
739             });
740              
741             Emitted when TCP/IP connection gets disconnected.
742              
743             =head1 ATTRIBUTES
744              
745             L has following attributes.
746              
747             =head2 tls
748              
749             my $tls = $client->tls;
750             $client = $client->tls(1)
751              
752             Force secure connection. Default is disabled (C<0>).
753              
754             =head2 user
755              
756             my $user = $client->user;
757             $client = $client->user('guest')
758              
759             Sets username for authorization, by default it's not defined.
760              
761             =head2 pass
762              
763             my $pass = $client->pass;
764             $client = $client->pass('secret')
765              
766             Sets user password for authorization, by default it's not defined.
767              
768             =head2 host
769              
770             my $host = $client->host;
771             $client = $client->host('localhost')
772              
773             Hostname or IP address of RabbitMQ server. Defaults to C.
774              
775             =head2 port
776              
777             my $port = $client->port;
778             $client = $client->port(1234)
779              
780             Port on which RabbitMQ server listens for new connections.
781             Defaults to C<5672>, which is standard RabbitMQ server listen port.
782              
783             =head2 vhost
784              
785             my $vhost = $client->vhost;
786             $client = $client->vhost('/')
787              
788             RabbitMQ virtual server to user. Default is C.
789              
790             =head2 params
791              
792             my $params = $client->params;
793             $client = $client->params(Mojo::Parameters->new('verify=1'))
794              
795             Sets additional parameters for connection. Default is not defined.
796              
797             For list of supported parameters see L.
798              
799             =head2 url
800              
801             my $url = $client->url;
802             $client = $client->url('amqp://...');
803              
804             Sets all connection parameters in one string, according to specification from
805             L.
806              
807             amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]
808              
809             amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
810              
811             amqp_userinfo = username [ ":" password ]
812              
813             username = *( unreserved / pct-encoded / sub-delims )
814              
815             password = *( unreserved / pct-encoded / sub-delims )
816              
817             vhost = segment
818              
819             =head2 heartbeat_timeout
820              
821             my $timeout = $client->heartbeat_timeout;
822             $client = $client->heartbeat_timeout(180);
823              
824             Heartbeats are use to monitor peer reachability in AMQP.
825             Default value is C<60> seconds, if set to C<0> no heartbeats will be sent.
826              
827             =head2 connect_timeout
828              
829             my $timeout = $client->connect_timeout;
830             $client = $client->connect_timeout(5);
831              
832             Connection timeout used by L.
833             Defaults to environment variable C or C<10> seconds
834             if nothing else is set.
835              
836             =head2 max_channels
837              
838             my $max_channels = $client->max_channels;
839             $client = $client->max_channels(10);
840              
841             Maximum number of channels allowed to be active. Defaults to C<0> which
842             means no implicit limit.
843              
844             When you try to call C over limit an C will be
845             emitted on channel saying that: I.
846              
847             =head1 STATIC METHODS
848              
849             =head2 consumer
850              
851             my $client = Mojo::RabbitMQ::Client->consumer(...)
852              
853             Shortcut for creating L.
854              
855             =head2 publisher
856              
857             my $client = Mojo::RabbitMQ::Client->publisher(...)
858              
859             Shortcut for creating L.
860              
861             =head1 METHODS
862              
863             L inherits all methods from L and implements
864             the following new ones.
865              
866             =head2 connect
867              
868             $client->connect();
869              
870             Tries to connect to RabbitMQ server and negotiate AMQP protocol.
871              
872             =head2 close
873              
874             $client->close();
875              
876             =head2 param
877              
878             my $param = $client->param('name');
879             $client = $client->param(name => 'value');
880              
881             =head2 add_channel
882              
883             my $channel = Mojo::RabbitMQ::Client::Channel->new();
884             ...
885             $channel = $client->add_channel($channel);
886             $channel->open;
887              
888             =head2 open_channel
889              
890             my $channel = Mojo::RabbitMQ::Client::Channel->new();
891             ...
892             $client->open_channel($channel);
893              
894             =head2 delete_channel
895              
896             my $removed = $client->delete_channel($channel->id);
897              
898             =head1 SUPPORTED QUERY PARAMETERS
899              
900             There's no formal specification, nevertheless a list of common parameters
901             recognized by officially supported RabbitMQ clients is maintained here:
902             L.
903              
904             Some shortcuts are also supported, you'll find them in parenthesis.
905              
906             Aliases are less significant, so when both are specified only primary
907             value will be used.
908              
909             =head2 cacertfile (I)
910              
911             Path to Certificate Authority file for TLS.
912              
913             =head2 certfile (I)
914              
915             Path to the client certificate file for TLS.
916              
917             =head2 keyfile (I)
918              
919             Path to the client certificate private key file for TLS.
920              
921             =head2 fail_if_no_peer_cert (I)
922              
923             TLS verification mode, defaults to 0x01 on the client-side if a certificate
924             authority file has been provided, or 0x00 otherwise.
925              
926             =head2 auth_mechanism
927              
928             Sets the AMQP authentication mechanism. Defaults to AMQPLAIN. AMQPLAIN and
929             EXTERNAL are supported; EXTERNAL will only work if L does not need
930             to do anything beyond passing along a username and password if specified.
931              
932             =head2 heartbeat
933              
934             Sets requested heartbeat timeout, just like C attribute.
935              
936             =head2 connection_timeout (I)
937              
938             Sets connection timeout - see L attribute.
939              
940             =head2 channel_max
941              
942             Sets maximum number of channels - see L attribute.
943              
944             =head1 SEE ALSO
945              
946             L, L, L
947              
948             =head1 COPYRIGHT AND LICENSE
949              
950             Copyright (C) 2015-2019, Sebastian Podjasek and others
951              
952             Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >>
953              
954             This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
955              
956             Contains AMQP specification (F) licensed under BSD-style license.
957              
958             =cut