File Coverage

blib/lib/Mojo/RabbitMQ/Client/Channel.pm
Criterion Covered Total %
statement 21 324 6.4
branch 0 46 0.0
condition 1 12 8.3
subroutine 7 91 7.6
pod 27 30 90.0
total 56 503 11.1


line stmt bran cond sub pod time code
1             package Mojo::RabbitMQ::Client::Channel;
2 5     5   35 use Mojo::Base 'Mojo::EventEmitter';
  5         13  
  5         52  
3              
4 5     5   1024 use Mojo::Promise;
  5         12  
  5         36  
5 5     5   2551 use Mojo::RabbitMQ::Client::LocalQueue;
  5         17  
  5         38  
6 5     5   2547 use Mojo::RabbitMQ::Client::Method;
  5         16  
  5         41  
7 5     5   2573 use Mojo::RabbitMQ::Client::Method::Publish;
  5         16  
  5         57  
8 5     5   219 use Scalar::Util qw(isweak weaken);
  5         11  
  5         342  
9              
10 5   50 5   31 use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
  5         11  
  5         25279  
11              
12             has id => 0;
13             has is_open => 0;
14             has is_active => 0;
15             has client => undef;
16             has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new };
17             has content_queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new };
18             has consumer_cbs => sub { {} };
19             has return_cbs => sub { {} };
20              
21             sub _open {
22 0     0     warn "Deprecated call to _open on channel";
23 0           return shift->open(@_);
24             }
25              
26             sub open {
27 0     0 1   my $self = shift;
28              
29 0 0         if ($self->is_open) {
30 0           $self->emit(error => 'Channel has already been opened');
31 0           return $self;
32             }
33              
34 0           weaken $self;
35             $self->client->_write_expect(
36             'Channel::Open' => {},
37             'Channel::OpenOk' => sub {
38 0     0     warn "-- Channel::OpenOk\n" if DEBUG;
39 0           $self->is_open(1)->is_active(1)->emit('open');
40             },
41             sub {
42 0     0     $self->emit(
43             error => 'Invalid response received while trying to open channel: '
44             . shift);
45             },
46 0           $self->id,
47             );
48              
49 0           return $self;
50             }
51              
52             sub _push_queue_or_consume {
53 0     0     my $self = shift;
54 0           my ($frame) = @_;
55              
56 0           weaken $self;
57 0 0         if ($frame->isa('Net::AMQP::Frame::Method')) {
58 0           my $method_frame = $frame->method_frame;
59              
60 0 0         if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
    0          
    0          
    0          
61 0           $self->client->_write_frame(Net::AMQP::Protocol::Channel::CloseOk->new(),
62             $self->id);
63 0           $self->is_open(0)->is_active(0);
64 0           $self->client->delete_channel($self->id);
65 0           $self->emit(close => $frame);
66              
67 0           return $self;
68             }
69             elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
70 0   0 0     my $cb = $self->consumer_cbs->{$method_frame->consumer_tag} || sub { };
71             $self->_push_read_header_and_body(
72             'deliver',
73             $frame => sub {
74 0     0     $cb->emit(message => @_);
75             },
76             sub {
77 0     0     $self->emit(error => 'Consumer callback failure: ' . shift);
78             }
79 0           );
80 0           return $self;
81             }
82             elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
83             my $cb
84             = $self->return_cbs->{$method_frame->exchange . '_'
85             . $method_frame->routing_key}
86 0   0 0     || sub { };
87             $self->_push_read_header_and_body(
88             'return',
89             $frame => sub {
90 0     0     $cb->emit(reject => @_);
91             },
92             sub {
93 0     0     $self->emit(error => 'Return callback failure: ' . shift);
94             }
95 0           );
96 0           return $self;
97             }
98             elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
99 0           $self->is_active($method_frame->active);
100 0           $self->client->_write_frame(
101             Net::AMQP::Protocol::Channel::FlowOk->new(
102             active => $method_frame->active
103             ),
104             $self->id
105             );
106              
107 0           return $self;
108             }
109              
110 0           $self->queue->push($frame);
111             }
112             else {
113 0           $self->content_queue->push($frame);
114             }
115              
116 0           return $self;
117             }
118              
119             sub close {
120 0     0 1   my $self = shift;
121 0 0         my $connection = $self->client or return;
122              
123 0 0         return $self if !$self->is_open;
124              
125 0 0         return $self->_close() if 0 == scalar keys %{$self->consumer_cbs};
  0            
126              
127 0           for my $consumer_tag (keys %{$self->consumer_cbs}) {
  0            
128 0           my $method = $self->cancel(consumer_tag => $consumer_tag);
129 0 0         weaken $self unless isweak $self;
130             $method->on(
131             success => sub {
132 0     0     $self->_close();
133             }
134 0           );
135             $method->catch(
136             sub {
137 0     0     $self->_close();
138 0           $self->emit(error => 'Error canceling consumption: ' . shift, @_);
139             }
140 0           );
141 0           $method->deliver();
142             }
143              
144 0           return $self;
145             }
146              
147             sub _close {
148 0     0     my $self = shift;
149 0           my %args = @_;
150              
151 0 0         return unless 0 == scalar keys %{$self->consumer_cbs};
  0            
152              
153             $self->client->_write_expect(
154             'Channel::Close' => {},
155             'Channel::CloseOk' => sub {
156 0     0     warn "-- Channel::CloseOk\n" if DEBUG;
157 0           $self->is_open(0)->is_active(0);
158 0           $self->client->delete_channel($self->id);
159 0           $self->emit('close');
160             },
161             sub {
162 0     0     $self->is_open(0)->is_active(0);
163 0           $self->client->delete_channel($self->id);
164 0           $self->emit(error => 'Failed closing channel: ' . shift);
165             },
166 0           $self->id,
167             );
168              
169 0           return $self;
170             }
171              
172             sub _assert_open {
173 0     0     my $self = shift;
174              
175 0 0 0       return 0 unless $self->is_open and $self->is_active;
176              
177 0           return 1;
178             }
179              
180             sub _prepare_method {
181 0     0     my $self = shift;
182              
183 0           my $method = Mojo::RabbitMQ::Client::Method->new(
184             client => $self->client,
185             channel => $self
186             );
187 0           weaken $method->{channel};
188 0           weaken $method->{client};
189              
190 0           return $method->setup(@_);
191             }
192              
193             sub declare_exchange {
194 0     0 1   my $self = shift;
195              
196             return $self->_prepare_method(
197             'Exchange::Declare' => {
198             type => 'direct',
199             passive => 0,
200             durable => 0,
201             auto_delete => 0,
202             internal => 0,
203             @_, # exchange
204             ticket => 0,
205             nowait => 0, # FIXME
206             },
207             'Exchange::DeclareOk' => sub {
208 0     0     warn "-- Exchange::DeclareOk\n" if DEBUG;
209             }
210 0           );
211             }
212              
213             sub declare_exchange_p {
214 0     0 1   my $self = shift;
215              
216 0           my $promise = Mojo::Promise->new;
217 0           my $method = $self->declare_exchange(@_);
218 0           weaken $self;
219 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
220 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
221 0           $method->deliver;
222              
223 0           return $promise;
224             }
225              
226             sub delete_exchange {
227 0     0 1   my $self = shift;
228              
229             return $self->_prepare_method(
230             'Exchange::Delete' => {
231             if_unused => 0,
232             @_, # exchange
233             ticket => 0,
234             nowait => 0, # FIXME
235             },
236             'Exchange::DeleteOk' => sub {
237 0     0     warn "-- Exchange::DeleteOk\n" if DEBUG;
238             }
239 0           );
240             }
241              
242             sub delete_exchange_p {
243 0     0 1   my $self = shift;
244              
245 0           my $promise = Mojo::Promise->new;
246 0           my $method = $self->delete_exchange(@_);
247 0           weaken $self;
248 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
249 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
250 0           $method->deliver;
251              
252 0           return $promise;
253             }
254              
255             sub declare_queue {
256 0     0 1   my $self = shift;
257              
258             return $self->_prepare_method(
259             'Queue::Declare' => {
260             queue => '',
261             passive => 0,
262             durable => 0,
263             exclusive => 0,
264             auto_delete => 0,
265             no_ack => 1,
266             @_,
267             ticket => 0,
268             nowait => 0, # FIXME
269             },
270             'Queue::DeclareOk' => sub {
271 0     0     warn "-- Queue::DeclareOk\n" if DEBUG;
272             }
273 0           );
274             }
275              
276             sub declare_queue_p {
277 0     0 1   my $self = shift;
278              
279 0           my $promise = Mojo::Promise->new;
280 0           my $method = $self->declare_queue(@_);
281 0           weaken $self;
282 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
283 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
284 0           $method->deliver;
285              
286 0           return $promise;
287             }
288              
289             sub bind_queue {
290 0     0 1   my $self = shift;
291              
292             return $self->_prepare_method(
293             'Queue::Bind' => {
294             @_, # queue, exchange, routing_key
295             ticket => 0,
296             nowait => 0, # FIXME
297             },
298             'Queue::BindOk' => sub {
299 0     0     warn "-- Queue::BindOk\n" if DEBUG;
300             }
301 0           );
302             }
303              
304             sub bind_queue_p {
305 0     0 1   my $self = shift;
306              
307 0           my $promise = Mojo::Promise->new;
308 0           my $method = $self->bind_queue(@_);
309 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
310 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
311 0           $method->deliver;
312              
313 0           return $promise;
314             }
315              
316             sub unbind_queue {
317 0     0 1   my $self = shift;
318              
319             return $self->_prepare_method(
320             'Queue::Unbind' => {
321             @_, # queue, exchange, routing_key
322             ticket => 0,
323             },
324             'Queue::UnbindOk' => sub {
325 0     0     warn "-- Queue::UnbindOk\n" if DEBUG;
326             }
327 0           );
328             }
329              
330             sub unbind_queue_p {
331 0     0 1   my $self = shift;
332              
333 0           my $promise = Mojo::Promise->new;
334 0           my $method = $self->unbind_queue(@_);
335 0           weaken $self;
336 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
337 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
338 0           $method->deliver;
339              
340 0           return $promise;
341             }
342              
343             sub purge_queue {
344 0     0 1   my $self = shift;
345              
346             return $self->_prepare_method(
347             'Queue::Purge' => {
348             @_, # queue
349             ticket => 0,
350             nowait => 0, # FIXME
351             },
352             'Queue::PurgeOk' => sub {
353 0     0     warn "-- Queue::PurgeOk\n" if DEBUG;
354             }
355 0           );
356             }
357              
358             sub purge_queue_p {
359 0     0 1   my $self = shift;
360              
361 0           my $promise = Mojo::Promise->new;
362 0           my $method = $self->purge_queue(@_);
363 0           weaken $self;
364 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
365 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
366 0           $method->deliver;
367              
368 0           return $promise;
369             }
370              
371             sub delete_queue {
372 0     0 1   my $self = shift;
373              
374             return $self->_prepare_method(
375             'Queue::Delete' => {
376             if_unused => 0,
377             if_empty => 0,
378             @_, # queue
379             ticket => 0,
380             nowait => 0, # FIXME
381             },
382             'Queue::DeleteOk' => sub {
383 0     0     warn "-- Queue::DeleteOk\n" if DEBUG;
384             }
385 0           );
386             }
387              
388             sub delete_queue_p {
389 0     0 1   my $self = shift;
390              
391 0           my $promise = Mojo::Promise->new;
392 0           my $method = $self->delete_queue(@_);
393 0           weaken $self;
394 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
395 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
396 0           $method->deliver;
397              
398 0           return $promise;
399             }
400              
401             sub publish {
402 0     0 1   my $self = shift;
403              
404 0           return Mojo::RabbitMQ::Client::Method::Publish->new(
405             client => $self->client,
406             channel => $self
407             )->setup(@_);
408             }
409              
410             sub publish_p {
411 0     0 0   my $self = shift;
412              
413 0           my $promise = Mojo::Promise->new;
414 0           my $method = Mojo::RabbitMQ::Client::Method::Publish->new(
415             client => $self->client,
416             channel => $self
417             );
418 0           weaken $method->{client};
419 0           weaken $method->{channel};
420 0           $method->setup(@_);
421 0           weaken $self;
422 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
423 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
424 0           $method->deliver;
425              
426 0           return $promise;
427             }
428              
429             sub consume {
430 0     0 1   my $self = shift;
431              
432             my $method = $self->_prepare_method(
433             'Basic::Consume' => {
434             consumer_tag => '',
435             no_local => 0,
436             no_ack => 1,
437             exclusive => 0,
438             @_,
439             ticket => 0,
440             nowait => 0
441             },
442             'Basic::ConsumeOk' => sub {
443 0     0     warn "-- Basic::ConsumeOk\n" if DEBUG;
444             }
445 0           );
446 0           weaken $self;
447             $method->on(
448             success => sub {
449 0     0     my $this = shift;
450 0           my $frame = shift;
451 0           my $tag = $frame->method_frame->consumer_tag;
452              
453 0           $self->consumer_cbs->{$tag} = $this;
454             }
455 0           );
456              
457 0           return $method;
458             }
459              
460             sub cancel {
461 0     0 1   my $self = shift;
462              
463             my $method = $self->_prepare_method(
464             'Basic::Cancel',
465             {
466             @_, # consumer_tag
467             nowait => 0,
468             },
469             'Basic::CancelOk' => sub {
470 0     0     warn "-- Basic::CancelOk\n" if DEBUG;
471             }
472 0           );
473 0           weaken $self;
474             $method->on(
475             success => sub {
476 0     0     my $this = shift;
477 0           my $frame = shift;
478 0           delete $self->consumer_cbs->{$frame->method_frame->consumer_tag};
479             }
480 0           );
481 0           return $method;
482             }
483              
484             sub get {
485 0     0 1   my $self = shift;
486              
487 0           my $method = $self->_prepare_method(
488             'Basic::Get',
489             {
490             no_ack => 1,
491             @_, # queue
492             ticket => 0,
493             },
494             [qw(Basic::GetOk Basic::GetEmpty)]
495             );
496 0           weaken $self;
497             $method->on(
498             success => sub {
499 0     0     warn "-- Basic::GetOk|GetEmpty\n" if DEBUG;
500 0           my $this = shift;
501 0           my $frame = shift;
502              
503 0 0         $this->emit(empty => $frame)
504             if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
505             $self->_push_read_header_and_body(
506             'ok', $frame,
507             sub {
508 0           $this->emit(message => $frame, @_);
509             },
510             sub {
511 0           $this->emit(error => 'Failed to get messages from queue');
512             }
513 0           );
514             }
515 0           );
516              
517 0           return $method;
518             }
519              
520             sub get_p {
521 0     0 0   my $self = shift;
522              
523 0           my $promise = Mojo::Promise->new;
524 0           my $method = $self->get(@_);
525 0           weaken $self;
526 0     0     $method->on('message' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
527 0     0     $method->on('empty' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
528 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
529 0           $method->deliver;
530              
531 0           return $promise;
532             }
533              
534             sub ack {
535 0     0 1   my $self = shift;
536 0           my %args = ();
537 0 0         if (ref($_[0]) eq 'HASH') {
538 0 0         if (defined $_[0]->{ok}) {
    0          
539 0           $args{delivery_tag} = $_[0]->{ok}->method_frame->delivery_tag;
540             } elsif (defined $_[0]->{deliver}) {
541 0           $args{delivery_tag} = $_[0]->{deliver}->method_frame->delivery_tag;
542             }
543             } else {
544 0           %args = @_;
545             }
546              
547 0 0         die "ack requires delivery_tag in arguments" unless defined $args{delivery_tag};
548              
549             return $self->_prepare_method(
550             'Basic::Ack' => {
551             delivery_tag => 0,
552             multiple =>
553 0 0 0       (defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1),
554             %args,
555             }
556             );
557             }
558              
559             sub ack_p {
560 0     0 0   my $self = shift;
561              
562 0           my $promise = Mojo::Promise->new;
563 0           my $method = $self->ack(@_);
564 0           weaken $self;
565 0     0     $method->on('success' => sub { shift; $promise->resolve($self, @_) });
  0            
  0            
566 0     0     $method->on('error' => sub { shift; $promise->reject($self, @_) });
  0            
  0            
567 0           $method->deliver;
568              
569 0           return $promise;
570             }
571              
572             sub qos {
573 0     0 1   my $self = shift;
574              
575 0           return $self->_prepare_method('Basic::Qos',
576             {prefetch_count => 1, @_, prefetch_size => 0, global => 0,},
577             'Basic::QosOk');
578             }
579              
580             sub recover {
581 0     0 1   my $self = shift;
582              
583 0           return $self->_prepare_method('Basic::Recover' => {requeue => 1, @_,});
584             }
585              
586             sub reject {
587 0     0 1   my $self = shift;
588              
589 0           return $self->_prepare_method(
590             'Basic::Reject' => {delivery_tag => 0, requeue => 0, @_,});
591             }
592              
593             sub select_tx {
594 0     0 1   my $self = shift;
595              
596 0           return $self->_prepare_method('Tx::Select', {}, 'Tx::SelectOk');
597             }
598              
599             sub commit_tx {
600 0     0 1   my $self = shift;
601              
602 0           return $self->_prepare_method('Tx::Commit', {}, 'Tx::CommitOk');
603             }
604              
605             sub rollback_tx {
606 0     0 1   my $self = shift;
607              
608 0           return $self->_prepare_method('Tx::Rollback', {}, 'Tx::RollbackOk');
609             }
610              
611             sub _push_read_header_and_body {
612 0     0     my $self = shift;
613 0           my ($type, $frame, $cb, $failure_cb) = @_;
614 0           my $response = {$type => $frame};
615 0           my $body_size = 0;
616              
617             $self->content_queue->get(
618             sub {
619 0     0     my $frame = shift;
620              
621 0 0         return $failure_cb->('Received data is not header frame')
622             if !$frame->isa('Net::AMQP::Frame::Header');
623              
624 0           my $header_frame = $frame->header_frame;
625 0 0         return $failure_cb->('Header is not Protocol::Basic::ContentHeader'
626             . 'Header was '
627             . ref $header_frame)
628             if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
629              
630 0           $response->{header} = $header_frame;
631 0           $body_size = $frame->body_size;
632             }
633 0           );
634              
635 0           my $body_payload = "";
636 0           my $next_frame;
637             $next_frame = sub {
638 0     0     my $frame = shift;
639              
640 0 0         return $failure_cb->('Received data is not body frame')
641             if !$frame->isa('Net::AMQP::Frame::Body');
642              
643 0           $body_payload .= $frame->payload;
644              
645 0 0         if (length($body_payload) < $body_size) {
646              
647             # More to come
648 0           $self->content_queue->get($next_frame);
649             }
650             else {
651 0           $frame->payload($body_payload);
652 0           $response->{body} = $frame;
653 0           $cb->($response);
654             }
655 0           };
656              
657 0           $self->content_queue->get($next_frame);
658              
659 0           return $self;
660             }
661              
662             sub DESTROY {
663 0     0     my $self = shift;
664 0 0         $self->close() if defined $self;
665 0           return;
666             }
667              
668             1;
669              
670             =encoding utf8
671              
672             =head1 NAME
673              
674             Mojo::RabbitMQ::Client::Channel - handles all channel related methods
675              
676             =head1 SYNOPSIS
677              
678             use Mojo::RabbitMQ::Client::Channel;
679              
680             my $channel = Mojo::RabbitMQ::Client::Channel->new();
681              
682             $channel->catch(sub { warn "Some channel error occurred: " . $_[1] });
683              
684             $channel->on(
685             open => sub {
686             my ($channel) = @_;
687             ...
688             }
689             );
690             $channel->on(close => sub { warn "Channel closed" });
691              
692             $client->open_channel($channel);
693              
694             =head1 DESCRIPTION
695              
696             L allows one to call all channel related methods.
697              
698             =head1 EVENTS
699              
700             L inherits all events from L and can emit the
701             following new ones.
702              
703             =head2 open
704              
705             $channel->on(open => sub {
706             my ($channel) = @_;
707             ...
708             });
709              
710             Emitted when channel receives Open-Ok.
711              
712             =head2 close
713              
714             $channel->on(close=> sub {
715             my ($channel, $frame) = @_;
716             ...
717             });
718              
719             Emitted when channel gets closed, C<$frame> contains close reason.
720              
721             =head1 ATTRIBUTES
722              
723             L has following attributes.
724              
725             =head2 id
726              
727             my $id = $channel->id;
728             $channel->id(20810);
729              
730             If not set, L sets it to next free number when channel is opened.
731              
732             =head2 is_open
733              
734             $channel->is_open ? "Channel is open" : "Channel is closed";
735              
736             =head2 is_active
737              
738             $channel->is_active ? "Channel is active" : "Channel is not active";
739              
740             This can be modified on reception of Channel-Flow.
741              
742             =head2 client
743              
744             my $client = $channel->client;
745             $channel->client($client);
746              
747             =head1 METHODS
748              
749             L inherits all methods from L and implements
750             the following new ones.
751              
752             =head2 close
753              
754             $channel->close;
755              
756             Cancels all consumers and closes channel afterwards.
757              
758             =head2 declare_exchange
759              
760             my $exchange = $channel->declare_exchange(
761             exchange => 'mojo',
762             type => 'fanout',
763             durable => 1,
764             ...
765             )->deliver;
766              
767             Verify exchange exists, create if needed.
768              
769             This method creates an exchange if it does not already exist, and if the
770             exchange exists, verifies that it is of the correct and expected class.
771              
772             Following arguments are accepted:
773              
774             =over 2
775              
776             =item exchange
777              
778             Unique exchange name
779              
780             =item type
781              
782             Each exchange belongs to one of a set of exchange types implemented by the server. The
783             exchange types define the functionality of the exchange - i.e. how messages are routed
784             through it. It is not valid or meaningful to attempt to change the type of an existing
785             exchange.
786              
787             =item passive
788              
789             If set, the server will reply with Declare-Ok if the exchange already exists with the same
790             name, and raise an error if not. The client can use this to check whether an exchange
791             exists without modifying the server state. When set, all other method fields except name
792             and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments
793             are compared for semantic equivalence.
794              
795             =item durable
796              
797             If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges
798             remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged
799             if/when a server restarts.
800              
801             =item auto_delete
802              
803             If set, the exchange is deleted when all queues have finished using it.
804              
805             =item internal
806              
807             If set, the exchange may not be used directly by publishers, but only when bound to other exchanges.
808             Internal exchanges are used to construct wiring that is not visible to applications.
809              
810             =back
811              
812             =head2 declare_exchange_p
813              
814             Same as L but auto-delivers method and returns a L object.
815              
816             $channel->declare_exchange_p(
817             exchange => 'mojo',
818             type => 'fanout',
819             durable => 1,
820             ...
821             )->then(sub {
822             say "Exchange declared...";
823             })->catch(sub {
824             my $err = shift;
825             warn "Exchange declaration error: $err";
826             })->wait;
827              
828             =head2 delete_exchange
829              
830             $channel->delete_exchange(exchange => 'mojo')->deliver;
831              
832             Delete an exchange.
833              
834             This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange
835             are cancelled.
836              
837             Following arguments are accepted:
838              
839             =over 2
840              
841             =item exchange
842              
843             Exchange name.
844              
845             =item if_unused
846              
847             If set, the server will only delete the exchange if it has no queue bindings. If the exchange has
848             queue bindings the server does not delete it but raises a channel exception instead.
849              
850             =back
851              
852             =head2 delete_exchange_p
853              
854             Same as L but auto-delivers method and returns a L object.
855              
856             $channel->delete_exchange_p(
857             exchange => 'mojo'
858             )->then(sub {
859             say "Exchange deleted...";
860             })->catch(sub {
861             my $err = shift;
862             warn "Exchange removal error: $err";
863             })->wait;
864              
865             =head2 declare_queue
866              
867             my $queue = $channel->declare_queue(queue => 'mq', durable => 1)->deliver
868              
869             Declare queue, create if needed.
870              
871             This method creates or checks a queue. When creating a new queue the client can
872             specify various properties that control the durability of the queue and its contents,
873             and the level of sharing for the queue.
874              
875             Following arguments are accepted:
876              
877             =over 2
878              
879             =item queue
880              
881             The queue name MAY be empty, in which case the server MUST create a new queue with
882             a unique generated name and return this to the client in the Declare-Ok method.
883              
884             =item passive
885              
886             If set, the server will reply with Declare-Ok if the queue already exists with the same
887             name, and raise an error if not. The client can use this to check whether a queue exists
888             without modifying the server state. When set, all other method fields except name and
889             no-wait are ignored. A declare with both passive and no-wait has no effect.
890             Arguments are compared for semantic equivalence.
891              
892             =item durable
893              
894             If set when creating a new queue, the queue will be marked as durable. Durable queues
895             remain active when a server restarts. Non-durable queues (transient queues) are purged
896             if/when a server restarts. Note that durable queues do not necessarily hold persistent
897             messages, although it does not make sense to send persistent messages to a transient queue.
898              
899             =item exclusive
900              
901             Exclusive queues may only be accessed by the current connection, and are deleted when
902             that connection closes. Passive declaration of an exclusive queue by other connections are
903             not allowed.
904              
905             =item auto_delete
906              
907             If set, the queue is deleted when all consumers have finished using it. The last consumer
908             can be cancelled either explicitly or because its channel is closed. If there was no consumer
909             ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues
910             using the Delete method as normal.
911              
912             =back
913              
914             =head2 declare_queue_p
915              
916             Same as L but auto-delivers method and returns a L object.
917              
918             $channel->declare_queue_p(
919             queue => 'mq',
920             durable => 1
921             )->then(sub {
922             say "Queue declared...";
923             })->catch(sub {
924             my $err = shift;
925             warn "Queue declaration error: $err";
926             })->wait;
927              
928             =head2 bind_queue
929              
930             $channel->bind_queue(
931             exchange => 'mojo',
932             queue => 'mq',
933             routing_key => ''
934             )->deliver;
935              
936             Bind queue to an exchange.
937              
938             This method binds a queue to an exchange. Until a queue is bound it will
939             not receive any messages. In a classic messaging model, store-and-forward
940             queues are bound to a direct exchange and subscription queues are bound
941             to a topic exchange.
942              
943             Following arguments are accepted:
944              
945             =over 2
946              
947             =item queue
948              
949             Specifies the name of the queue to bind.
950              
951             =item exchange
952              
953             Name of the exchange to bind to.
954              
955             =item routing_key
956              
957             Specifies the routing key for the binding. The routing key is used for
958             routing messages depending on the exchange configuration. Not all exchanges
959             use a routing key - refer to the specific exchange documentation. If the
960             queue name is empty, the server uses the last queue declared on the channel.
961             If the routing key is also empty, the server uses this queue name for the
962             routing key as well. If the queue name is provided but the routing key is
963             empty, the server does the binding with that empty routing key. The meaning
964             of empty routing keys depends on the exchange implementation.
965              
966             =back
967              
968             =head2 bind_queue_p
969              
970             Same as L but auto-delivers method and returns a L object.
971              
972             $channel->bind_queue_p(
973             exchange => 'mojo',
974             queue => 'mq',
975             routing_key => ''
976             )->then(sub {
977             say "Queue bound...";
978             })->catch(sub {
979             my $err = shift;
980             warn "Queue binding error: $err";
981             })->wait;
982              
983             =head2 unbind_queue
984              
985             $channel->unbind_queue(
986             exchange => 'mojo',
987             queue => 'mq',
988             routing_key => ''
989             )->deliver;
990              
991             Unbind a queue from an exchange.
992              
993             This method unbinds a queue from an exchange.
994              
995             Following arguments are accepted:
996              
997             =over 2
998              
999             =item queue
1000              
1001             Specifies the name of the queue to unbind.
1002              
1003             =item exchange
1004              
1005             The name of the exchange to unbind from.
1006              
1007             =item routing_key
1008              
1009             Specifies the routing key of the binding to unbind.
1010              
1011             =back
1012              
1013             =head2 unbind_queue_p
1014              
1015             Same as L but auto-delivers method and returns a L object.
1016              
1017             $channel->unbind_queue_p(
1018             exchange => 'mojo',
1019             queue => 'mq',
1020             routing_key => ''
1021             )->then(sub {
1022             say "Queue unbound...";
1023             })->catch(sub {
1024             my $err = shift;
1025             warn "Queue unbinding error: $err";
1026             })->wait;
1027              
1028             =head2 purge_queue
1029              
1030             $channel->purge_queue(queue => 'mq')->deliver;
1031              
1032             Purge a queue.
1033              
1034             This method removes all messages from a queue which are not awaiting acknowledgment.
1035              
1036             Following arguments are accepted:
1037              
1038             =over 2
1039              
1040             =item queue
1041              
1042             Specifies the name of the queue to purge.
1043              
1044             =back
1045              
1046             =head2 purge_queue_p
1047              
1048             Same as L but auto-delivers method and returns a L object.
1049              
1050             $channel->purge_queue_p(
1051             queue => 'mq',
1052             )->then(sub {
1053             say "Queue purged...";
1054             })->catch(sub {
1055             my $err = shift;
1056             warn "Queue purging error: $err";
1057             })->wait;
1058              
1059             =head2 delete_queue
1060              
1061             $channel->delete_queue(queue => 'mq', if_empty => 1)->deliver;
1062              
1063             Delete a queue.
1064              
1065             This method deletes a queue. When a queue is deleted any pending messages
1066             are sent to a dead-letter queue if this is defined in the server configuration,
1067             and all consumers on the queue are cancelled.
1068              
1069             Following arguments are accepted:
1070              
1071             =over 2
1072              
1073             =item queue
1074              
1075             Specifies the name of the queue to delete.
1076              
1077             =item if_unused
1078              
1079             If set, the server will only delete the queue if it has no consumers. If the queue
1080             has consumers the server does does not delete it but raises a channel exception instead.
1081              
1082             =item if_empty
1083              
1084             If set, the server will only delete the queue if it has no messages.
1085              
1086             =back
1087              
1088             =head2 delete_queue_p
1089              
1090             Same as L but auto-delivers method and returns a L object.
1091              
1092             $channel->delete_queue_p(
1093             queue => 'mq',
1094             if_empty => 1
1095             )->then(sub {
1096             say "Queue removed...";
1097             })->catch(sub {
1098             my $err = shift;
1099             warn "Queue removal error: $err";
1100             })->wait;
1101              
1102             =head2 publish
1103              
1104             my $message = $channel->publish(
1105             exchange => 'mojo',
1106             routing_key => 'mq',
1107             body => 'simple text body',
1108             );
1109             $message->deliver();
1110              
1111             Publish a message.
1112              
1113             This method publishes a message to a specific exchange. The message will be
1114             routed to queues as defined by the exchange configuration and distributed to
1115             any active consumers when the transaction, if any, is committed.
1116              
1117             Following arguments are accepted:
1118              
1119             =over 2
1120              
1121             =item exchange
1122              
1123             Specifies the name of the exchange to publish to. The exchange name can be empty,
1124             meaning the default exchange. If the exchange name is specified, and that exchange
1125             does not exist, the server will raise a channel exception.
1126              
1127             =item routing_key
1128              
1129             Specifies the routing key for the message. The routing key is used for routing
1130             messages depending on the exchange configuration.
1131              
1132             =item mandatory
1133              
1134             This flag tells the server how to react if the message cannot be routed to a queue.
1135             If this flag is set, the server will return an unroutable message with a Return method.
1136             If this flag is zero, the server silently drops the message.
1137              
1138             All rejections are emitted as C event.
1139              
1140             $message->on(reject => sub {
1141             my $message = shift;
1142             my $frame = shift;
1143             my $method_frame = $frame->method_frame;
1144              
1145             my $reply_code = $method_frame->reply_code;
1146             my $reply_text = $method_frame->reply_text;
1147             });
1148              
1149             =item immediate
1150              
1151             This flag tells the server how to react if the message cannot be routed to a queue consumer
1152             immediately. If this flag is set, the server will return an undeliverable message with a
1153             Return method. If this flag is zero, the server will queue the message, but with no guarantee
1154             that it will ever be consumed.
1155              
1156             As said above, all rejections are emitted as C event.
1157              
1158             $message->on(reject => sub { ... });
1159              
1160             =back
1161              
1162             =head2 consume
1163              
1164             my $consumer = $channel->consume(queue => 'mq');
1165             $consumer->on(message => sub { ... });
1166             $consumer->deliver;
1167              
1168             This method asks the server to start a "consumer", which is a transient request for messages from a
1169             specific queue. Consumers last as long as the channel they were declared on, or until the client cancels
1170             them.
1171              
1172             Following arguments are accepted:
1173              
1174             =over 2
1175              
1176             =item queue
1177              
1178             Specifies the name of the queue to consume from.
1179              
1180             =item consumer_tag
1181              
1182             Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the
1183             same consumer tags. If this field is empty the server will generate a unique tag.
1184              
1185             $consumer->on(success => sub {
1186             my $consumer = shift;
1187             my $frame = shift;
1188              
1189             my $consumer_tag = $frame->method_frame->consumer_tag;
1190             });
1191              
1192             =item no_local (not implemented in RabbitMQ!)
1193              
1194             If the no-local field is set the server will not send messages to the connection that published them.
1195              
1196             See L
1197              
1198             =item no_ack
1199              
1200             If this field is set the server does not expect acknowledgements for messages. That is, when a message
1201             is delivered to the client the server assumes the delivery will succeed and immediately dequeues it.
1202             This functionality may increase performance but at the cost of reliability. Messages can get lost if
1203             a client dies before they are delivered to the application.
1204              
1205             =item exclusive
1206              
1207             Request exclusive consumer access, meaning only this consumer can access the queue.
1208              
1209             =back
1210              
1211             =head2 cancel
1212              
1213             $channel->cancel(consumer_tag => 'amq.ctag....')->deliver;
1214              
1215             End a queue consumer.
1216              
1217             This method cancels a consumer. This does not affect already delivered messages, but
1218             it does mean the server will not send any more messages for that consumer. The client
1219             may receive an arbitrary number of messages in between sending the cancel method and
1220             receiving the cancel-ok reply.
1221              
1222             Following arguments are accepted:
1223              
1224             =over 2
1225              
1226             =item consumer_tag
1227              
1228             Holds the consumer tag specified by the client or provided by the server.
1229              
1230             =back
1231              
1232             =head2 get
1233              
1234             my $get = $channel->get(queue => 'mq')
1235             $get->deliver;
1236              
1237             Direct access to a queue.
1238              
1239             This method provides a direct access to the messages in a queue using
1240             a synchronous dialogue that is designed for specific types of application
1241             where synchronous functionality is more important than performance.
1242              
1243             This is simple event emitter to which you have to subscribe. It can emit:
1244              
1245             =over 2
1246              
1247             =item message
1248              
1249             Provide client with a message.
1250              
1251             This method delivers a message to the client following a get method. A message
1252             delivered by 'get-ok' must be acknowledged unless the no-ack option was set
1253             in the get method.
1254              
1255             You can access all get-ok reply parameters as below:
1256              
1257             $get->on(message => sub {
1258             my $get = shift;
1259             my $get_ok = shift;
1260             my $message = shift;
1261              
1262             say "Still got: " . $get_ok->method_frame->message_count;
1263             });
1264              
1265             =item empty
1266              
1267             Indicate no messages available.
1268              
1269             This method tells the client that the queue has no messages available for the
1270             client.
1271              
1272             =back
1273              
1274             Following arguments are accepted:
1275              
1276             =over 2
1277              
1278             =item queue
1279              
1280             Specifies the name of the queue to get a message from.
1281              
1282             =item no_ack
1283              
1284             If this field is set the server does not expect acknowledgements for messages. That is, when a message
1285             is delivered to the client the server assumes the delivery will succeed and immediately dequeues it.
1286             This functionality may increase performance but at the cost of reliability. Messages can get lost if
1287             a client dies before they are delivered to the application.
1288              
1289             =back
1290              
1291             =head2 ack
1292              
1293             $channel->ack(delivery_tag => 1);
1294              
1295             Acknowledge one or more messages.
1296              
1297             When sent by the client, this method acknowledges one or more messages
1298             delivered via the Deliver or Get-Ok methods. When sent by server, this
1299             method acknowledges one or more messages published with the Publish
1300             method on a channel in confirm mode. The acknowledgement can be for
1301             a single message or a set of messages up to and including a specific
1302             message.
1303              
1304             Following arguments are accepted:
1305              
1306             =over 2
1307              
1308             =item delivery_tag
1309              
1310             Server assigned delivery tag that was received with a message.
1311              
1312             =item multiple
1313              
1314             If set to 1, the delivery tag is treated as "up to and including", so
1315             that multiple messages can be acknowledged with a single method. If set
1316             to zero, the delivery tag refers to a single message. If the multiple
1317             field is 1, and the delivery tag is zero, this indicates acknowledgement
1318             of all outstanding messages.
1319              
1320             =back
1321              
1322             =head2 qos
1323              
1324             $channel->qos(prefetch_count => 1)->deliver;
1325              
1326             Sets specified Quality of Service to channel, or entire connection. Accepts following arguments:
1327              
1328             =over 2
1329              
1330             =item prefetch_size
1331              
1332             Prefetch window size in octets.
1333              
1334             =item prefetch_count
1335              
1336             Prefetch window in complete messages.
1337              
1338             =item global
1339              
1340             If set all settings will be applied connection wide.
1341              
1342             =back
1343              
1344             =head2 recover
1345              
1346             $channel->recover(requeue => 0)->deliver;
1347              
1348             Redeliver unacknowledged messages.
1349              
1350             This method asks the server to redeliver all unacknowledged messages
1351             on a specified channel. Zero or more messages may be redelivered.
1352              
1353             =over 2
1354              
1355             =item requeue
1356              
1357             If this field is zero, the message will be redelivered to the original
1358             recipient. If this bit is 1, the server will attempt to requeue the
1359             message, potentially then delivering it to an alternative subscriber.
1360              
1361             =back
1362              
1363             =head2 reject
1364              
1365             $channel->reject(delivery_tag => 1, requeue => 0)->deliver;
1366              
1367             Reject an incoming message.
1368              
1369             This method allows a client to reject a message. It can be
1370             used to interrupt and cancel large incoming messages, or
1371             return untreatable messages to their original queue.
1372              
1373             Following arguments are accepted:
1374              
1375             =over 2
1376              
1377             =item delivery_tag
1378              
1379             Server assigned delivery tag that was received with a message.
1380              
1381             =item requeue
1382              
1383             If requeue is true, the server will attempt to requeue the message.
1384             If requeue is false or the requeue attempt fails the messages are
1385             discarded or dead-lettered.
1386              
1387             =back
1388              
1389             =head2 select_tx
1390              
1391             Work with transactions.
1392              
1393             The Tx class allows publish and ack operations to be batched into atomic units of work.
1394             The intention is that all publish and ack requests issued within a transaction will
1395             complete successfully or none of them will. Servers SHOULD implement atomic transactions
1396             at least where all publish or ack requests affect a single queue. Transactions that cover
1397             multiple queues may be non-atomic, given that queues can be created and destroyed
1398             asynchronously, and such events do not form part of any transaction.
1399             Further, the behaviour of transactions with respect to the immediate and mandatory flags
1400             on Basic.Publish methods is not defined.
1401              
1402             $channel->select_tx()->deliver;
1403              
1404             Select standard transaction mode.
1405              
1406             This method sets the channel to use standard transactions. The client must use this method
1407             at least once on a channel before using the Commit or Rollback methods.
1408              
1409             =head2 commit_tx
1410              
1411             $channel->commit_tx()->deliver;
1412              
1413             Commit the current transaction.
1414              
1415             This method commits all message publications and acknowledgments performed in the current
1416             transaction. A new transaction starts immediately after a commit.
1417              
1418             =head2 rollback_tx
1419              
1420             $channel->rollback_tx()->deliver;
1421              
1422             Abandon the current transaction.
1423              
1424             This method abandons all message publications and acknowledgments performed in the current
1425             transaction. A new transaction starts immediately after a rollback. Note that unacked messages
1426             will not be automatically redelivered by rollback; if that is required an explicit recover
1427             call should be issued.
1428              
1429             =head1 SEE ALSO
1430              
1431             L, L, L
1432              
1433             =head1 COPYRIGHT AND LICENSE
1434              
1435             Copyright (C) 2015-2017, Sebastian Podjasek and others
1436              
1437             Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >>
1438              
1439             This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
1440              
1441             =cut