File Coverage

blib/lib/AnyEvent/RabbitMQ/Channel.pm
Criterion Covered Total %
statement 31 400 7.7
branch 0 152 0.0
condition 0 66 0.0
subroutine 11 73 15.0
pod 21 30 70.0
total 63 721 8.7


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Channel;
2              
3 1     1   7 use strict;
  1         3  
  1         34  
4 1     1   5 use warnings;
  1         5  
  1         25  
5              
6 1     1   458 use AnyEvent::RabbitMQ::LocalQueue;
  1         8  
  1         31  
7 1     1   7 use AnyEvent;
  1         2  
  1         28  
8 1     1   7 use Scalar::Util qw( looks_like_number weaken );
  1         2  
  1         48  
9 1     1   5 use Devel::GlobalDestruction;
  1         2  
  1         24  
10 1     1   87 use Carp qw(croak cluck);
  1         2  
  1         69  
11 1     1   7 use POSIX qw(ceil);
  1         2  
  1         9  
12 1     1   1975 BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
13              
14             our $VERSION = '1.22'; # VERSION
15              
16 1     1   535 use namespace::clean;
  1         16214  
  1         6  
17              
18             use constant {
19 1         5471 _ST_CLOSED => 0,
20             _ST_OPENING => 1,
21             _ST_OPEN => 2,
22 1     1   279 };
  1         2  
23              
24             sub new {
25 0     0 0   my $class = shift;
26              
27             my $self = bless {
28       0     on_close => sub {},
29 0           @_, # id, connection, on_return, on_close, on_inactive, on_active
30             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
31             _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
32             }, $class;
33 0           weaken($self->{connection});
34 0           return $self->_reset;
35             }
36              
37             sub _reset {
38 0     0     my $self = shift;
39              
40 0           my %a = (
41             _state => _ST_CLOSED,
42             _is_active => 0,
43             _is_confirm => 0,
44             _publish_tag => 0,
45             _publish_cbs => {}, # values: [on_ack, on_nack, on_return]
46             _consumer_cbs => {}, # values: [on_consume, on_cancel...]
47             );
48 0           @$self{keys %a} = values %a;
49              
50 0           return $self;
51             }
52              
53             sub id {
54 0     0 0   my $self = shift;
55 0           return $self->{id};
56             }
57              
58             sub is_open {
59 0     0 0   my $self = shift;
60 0           return $self->{_state} == _ST_OPEN;
61             }
62              
63             sub is_active {
64 0     0 0   my $self = shift;
65 0           return $self->{_is_active};
66             }
67              
68             sub is_confirm {
69 0     0 0   my $self = shift;
70 0           return $self->{_is_confirm};
71             }
72              
73             sub queue {
74 0     0 1   my $self = shift;
75 0           return $self->{_queue};
76             }
77              
78             sub open {
79 0     0 0   my $self = shift;
80 0           my %args = @_;
81              
82 0 0         if ($self->{_state} != _ST_CLOSED) {
83 0           $args{on_failure}->('Channel has already been opened');
84 0           return $self;
85             }
86              
87 0           $self->{_state} = _ST_OPENING;
88              
89             $self->{connection}->_push_write_and_read(
90             'Channel::Open', {}, 'Channel::OpenOk',
91             sub {
92 0     0     $self->{_state} = _ST_OPEN;
93 0           $self->{_is_active} = 1;
94 0           $args{on_success}->($self);
95             },
96             sub {
97 0     0     $self->{_state} = _ST_CLOSED;
98 0           $args{on_failure}->($self);
99             },
100             $self->{id},
101 0           );
102              
103 0           return $self;
104             }
105              
106             sub close {
107 0     0 0   my $self = shift;
108             my $connection = $self->{connection}
109 0 0         or return;
110 0           my %args = $connection->_set_cbs(@_);
111              
112             # If open in in progess, wait for it; 1s arbitrary timing.
113              
114 0           weaken(my $wself = $self);
115 0           my $t; $t = AE::timer 0, 1, sub {
116 0 0   0     (my $self = $wself) or undef $t, return;
117 0 0         return if $self->{_state} == _ST_OPENING;
118              
119             # No more tests are required
120 0           undef $t;
121              
122             # Double close is OK
123 0 0         if ($self->{_state} == _ST_CLOSED) {
124 0           $args{on_success}->($self);
125 0           return;
126             }
127              
128             $connection->_push_write(
129             $self->_close_frame,
130             $self->{id},
131 0           );
132              
133             # The spec says that after a party sends Channel::Close, it MUST
134             # discard all frames for that channel. So this channel is dead
135             # immediately.
136 0           $self->_closed();
137              
138             $connection->_push_read_and_valid(
139             'Channel::CloseOk',
140             sub {
141 0           $args{on_success}->($self);
142 0           $self->_orphan();
143             },
144             sub {
145 0           $args{on_failure}->(@_);
146 0           $self->_orphan();
147             },
148             $self->{id},
149 0           );
150 0           };
151              
152 0           return $self;
153             }
154              
155             sub _closed {
156 0     0     my $self = shift;
157 0           my ($frame,) = @_;
158 0   0       $frame ||= $self->_close_frame();
159              
160 0 0         return if $self->{_state} == _ST_CLOSED;
161 0           $self->{_state} = _ST_CLOSED;
162              
163             # Perform callbacks for all outstanding commands
164 0           $self->{_queue}->_flush($frame);
165 0           $self->{_content_queue}->_flush($frame);
166              
167             # Fake nacks of all outstanding publishes
168 0           $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
  0            
  0            
  0            
169              
170             # Report cancelation of all outstanding consumes
171 0           my @tags = keys %{ $self->{_consumer_cbs} };
  0            
172 0           $self->_canceled($_, $frame) for @tags;
173              
174             # Report close to on_close callback
175 0           { local $@;
  0            
176 0           eval { $self->{on_close}->($frame) };
  0            
177 0 0         warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
178              
179             # Reset state (partly redundant)
180 0           $self->_reset;
181              
182 0           return $self;
183             }
184              
185             sub _close_frame {
186 0     0     my $self = shift;
187 0           my ($text,) = @_;
188              
189 0           Net::AMQP::Frame::Method->new(
190             method_frame => Net::AMQP::Protocol::Channel::Close->new(
191             reply_text => $text,
192             ),
193             );
194             }
195              
196             sub _orphan {
197 0     0     my $self = shift;
198              
199 0 0         if (my $connection = $self->{connection}) {
200 0           $connection->_delete_channel($self);
201             }
202 0           return $self;
203             }
204              
205             sub declare_exchange {
206 0     0 1   my $self = shift;
207 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
208              
209 0 0         return $self if !$self->_check_open($failure_cb);
210              
211             $self->{connection}->_push_write_and_read(
212             'Exchange::Declare',
213             {
214             type => 'direct',
215             passive => 0,
216             durable => 0,
217             auto_delete => 0,
218             internal => 0,
219             %args, # exchange
220             ticket => 0,
221             nowait => 0, # FIXME
222             },
223             'Exchange::DeclareOk',
224             $cb,
225             $failure_cb,
226             $self->{id},
227 0           );
228              
229 0           return $self;
230             }
231              
232             sub bind_exchange {
233 0     0 1   my $self = shift;
234 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
235              
236 0 0         return $self if !$self->_check_open($failure_cb);
237              
238             $self->{connection}->_push_write_and_read(
239             'Exchange::Bind',
240             {
241             %args, # source, destination, routing_key
242             ticket => 0,
243             nowait => 0, # FIXME
244             },
245             'Exchange::BindOk',
246             $cb,
247             $failure_cb,
248             $self->{id},
249 0           );
250              
251 0           return $self;
252             }
253              
254             sub unbind_exchange {
255 0     0 1   my $self = shift;
256 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
257              
258 0 0         return $self if !$self->_check_open($failure_cb);
259              
260             $self->{connection}->_push_write_and_read(
261             'Exchange::Unbind',
262             {
263             %args, # source, destination, routing_key
264             ticket => 0,
265             nowait => 0, # FIXME
266             },
267             'Exchange::UnbindOk',
268             $cb,
269             $failure_cb,
270             $self->{id},
271 0           );
272              
273 0           return $self;
274             }
275              
276             sub delete_exchange {
277 0     0 1   my $self = shift;
278 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
279              
280 0 0         return $self if !$self->_check_open($failure_cb);
281              
282             $self->{connection}->_push_write_and_read(
283             'Exchange::Delete',
284             {
285             if_unused => 0,
286             %args, # exchange
287             ticket => 0,
288             nowait => 0, # FIXME
289             },
290             'Exchange::DeleteOk',
291             $cb,
292             $failure_cb,
293             $self->{id},
294 0           );
295              
296 0           return $self;
297             }
298              
299             sub declare_queue {
300 0     0 1   my $self = shift;
301 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
302              
303 0 0         return $self if !$self->_check_open($failure_cb);
304              
305             $self->{connection}->_push_write_and_read(
306             'Queue::Declare',
307             {
308             queue => '',
309             passive => 0,
310             durable => 0,
311             exclusive => 0,
312             auto_delete => 0,
313             no_ack => 1,
314             %args,
315             ticket => 0,
316             nowait => 0, # FIXME
317             },
318             'Queue::DeclareOk',
319             $cb,
320             $failure_cb,
321             $self->{id},
322 0           );
323             }
324              
325             sub bind_queue {
326 0     0 1   my $self = shift;
327 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
328              
329 0 0         return $self if !$self->_check_open($failure_cb);
330              
331             $self->{connection}->_push_write_and_read(
332             'Queue::Bind',
333             {
334             %args, # queue, exchange, routing_key
335             ticket => 0,
336             nowait => 0, # FIXME
337             },
338             'Queue::BindOk',
339             $cb,
340             $failure_cb,
341             $self->{id},
342 0           );
343              
344 0           return $self;
345             }
346              
347             sub unbind_queue {
348 0     0 1   my $self = shift;
349 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
350              
351 0 0         return $self if !$self->_check_open($failure_cb);
352              
353             $self->{connection}->_push_write_and_read(
354             'Queue::Unbind',
355             {
356             %args, # queue, exchange, routing_key
357             ticket => 0,
358             },
359             'Queue::UnbindOk',
360             $cb,
361             $failure_cb,
362             $self->{id},
363 0           );
364              
365 0           return $self;
366             }
367              
368             sub purge_queue {
369 0     0 1   my $self = shift;
370 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
371              
372 0 0         return $self if !$self->_check_open($failure_cb);
373              
374             $self->{connection}->_push_write_and_read(
375             'Queue::Purge',
376             {
377             %args, # queue
378             ticket => 0,
379             nowait => 0, # FIXME
380             },
381             'Queue::PurgeOk',
382             $cb,
383             $failure_cb,
384             $self->{id},
385 0           );
386              
387 0           return $self;
388             }
389              
390             sub delete_queue {
391 0     0 1   my $self = shift;
392 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
393              
394 0 0         return $self if !$self->_check_open($failure_cb);
395              
396             $self->{connection}->_push_write_and_read(
397             'Queue::Delete',
398             {
399             if_unused => 0,
400             if_empty => 0,
401             %args, # queue
402             ticket => 0,
403             nowait => 0, # FIXME
404             },
405             'Queue::DeleteOk',
406             $cb,
407             $failure_cb,
408             $self->{id},
409 0           );
410              
411 0           return $self;
412             }
413              
414             sub publish {
415 0     0 1   my $self = shift;
416 0           my %args = @_;
417              
418             # Docs should advise channel-level callback over this, but still, better to give user an out
419 0 0         unless ($self->{_is_active}) {
420 0 0         if (defined $args{on_inactive}) {
421 0           $args{on_inactive}->();
422 0           return $self;
423             }
424 0           croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
425             }
426              
427 0           my $header_args = delete $args{header};
428 0           my $body = delete $args{body};
429 0           my $ack_cb = delete $args{on_ack};
430 0           my $nack_cb = delete $args{on_nack};
431 0           my $return_cb = delete $args{on_return};
432              
433 0 0         defined($header_args) or $header_args = {};
434 0 0         defined($body) or $body = '';
435 0 0 0       if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
      0        
436             cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
437 0 0         unless $self->{_is_confirm};
438             }
439              
440 0           my $tag;
441 0 0         if ($self->{_is_confirm}) {
442             # yeah, delivery tags in acks are sequential. see Java client
443 0           $tag = ++$self->{_publish_tag};
444 0 0         if ($return_cb) {
445 0           $header_args = { %$header_args };
446 0           $header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not
447             }
448 0           $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
449             }
450              
451             $self->_publish(
452 0           %args,
453             )->_header(
454             $header_args, $body,
455             )->_body(
456             $body,
457             );
458              
459 0           return $self;
460             }
461              
462             sub _publish {
463 0     0     my $self = shift;
464 0           my %args = @_;
465              
466             $self->{connection}->_push_write(
467             Net::AMQP::Protocol::Basic::Publish->new(
468             exchange => '',
469             mandatory => 0,
470             immediate => 0,
471             %args, # routing_key
472             ticket => 0,
473             ),
474             $self->{id},
475 0           );
476              
477 0           return $self;
478             }
479              
480             sub _header {
481 0     0     my ($self, $args, $body) = @_;
482              
483 0   0       my $weight = delete $args->{weight} || 0;
484              
485             # user-provided message headers must be strings. protect values that look like numbers.
486 0   0       my $headers = $args->{headers} || {};
487 0 0         my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
  0            
  0            
488 0 0         if (@prot) {
489             $headers = {
490             %$headers,
491 0           map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
  0            
492             };
493             }
494              
495             $self->{connection}->_push_write(
496             Net::AMQP::Frame::Header->new(
497             weight => $weight,
498             body_size => length($body),
499             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
500             content_type => 'application/octet-stream',
501             content_encoding => undef,
502             delivery_mode => 1,
503             priority => 1,
504             correlation_id => undef,
505             expiration => undef,
506             message_id => undef,
507             timestamp => time,
508             type => undef,
509             user_id => $self->{connection}->login_user,
510             app_id => undef,
511             cluster_id => undef,
512             %$args,
513             headers => $headers,
514             ),
515             ),
516             $self->{id},
517 0           );
518              
519 0           return $self;
520             }
521              
522             sub _body {
523 0     0     my ($self, $body,) = @_;
524              
525 0   0       my $body_max = $self->{connection}->{_body_max} || length $body;
526              
527             # chunk up body into segments measured by $frame_max
528 0           while (length $body) {
529             $self->{connection}->_push_write(
530             Net::AMQP::Frame::Body->new(
531             payload => substr($body, 0, $body_max, '')),
532             $self->{id}
533 0           );
534             }
535              
536 0           return $self;
537             }
538              
539             sub consume {
540 0     0 1   my $self = shift;
541 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
542              
543 0 0         return $self if !$self->_check_open($failure_cb);
544              
545 0   0 0     my $consumer_cb = delete $args{on_consume} || sub {};
546 0   0 0     my $cancel_cb = delete $args{on_cancel} || sub {};
547 0   0       my $no_ack = delete $args{no_ack} // 1;
548              
549             $self->{connection}->_push_write_and_read(
550             'Basic::Consume',
551             {
552             consumer_tag => '',
553             no_local => 0,
554             no_ack => $no_ack,
555             exclusive => 0,
556              
557             %args, # queue
558             ticket => 0,
559             nowait => 0, # FIXME
560             },
561             'Basic::ConsumeOk',
562             sub {
563 0     0     my $frame = shift;
564 0           my $tag = $frame->method_frame->consumer_tag;
565 0           $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
566 0           $cb->($frame);
567             },
568             $failure_cb,
569             $self->{id},
570 0           );
571              
572 0           return $self;
573             }
574              
575             sub cancel {
576 0     0 1   my $self = shift;
577 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
578              
579 0 0         return $self if !$self->_check_open($failure_cb);
580              
581 0 0         if (!defined $args{consumer_tag}) {
582 0           $failure_cb->('consumer_tag is not set');
583 0           return $self;
584             }
585              
586 0           my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}};
587 0 0         unless ($cons_cbs) {
588 0           $failure_cb->('Unknown consumer_tag');
589 0           return $self;
590             }
591 0           push @$cons_cbs, $cb;
592              
593             $self->{connection}->_push_write(
594             Net::AMQP::Protocol::Basic::Cancel->new(
595             %args, # consumer_tag
596             nowait => 0,
597             ),
598             $self->{id},
599 0           );
600              
601 0           return $self;
602             }
603              
604             sub _canceled {
605 0     0     my $self = shift;
606 0           my ($tag, $frame,) = @_;
607              
608 0 0         my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
609             or return 0;
610              
611 0           shift @$cons_cbs; # no more deliveries
612 0           for my $cb (reverse @$cons_cbs) {
613 0           $cb->($frame);
614             }
615 0           return 1;
616             }
617              
618             sub get {
619 0     0 1   my $self = shift;
620 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
621              
622 0   0       my $no_ack = delete $args{no_ack} // 1;
623              
624 0 0         return $self if !$self->_check_open($failure_cb);
625              
626             $self->{connection}->_push_write_and_read(
627             'Basic::Get',
628             {
629             no_ack => $no_ack,
630             %args, # queue
631             ticket => 0,
632             },
633             [qw(Basic::GetOk Basic::GetEmpty)],
634             sub {
635 0     0     my $frame = shift;
636 0 0         return $cb->({empty => $frame})
637             if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
638 0           $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
639             },
640             $failure_cb,
641             $self->{id},
642 0           );
643              
644 0           return $self;
645             }
646              
647             sub ack {
648 0     0 1   my $self = shift;
649 0           my %args = @_;
650              
651 0 0   0     return $self if !$self->_check_open(sub {});
652              
653             $self->{connection}->_push_write(
654             Net::AMQP::Protocol::Basic::Ack->new(
655             delivery_tag => 0,
656             multiple => (
657             defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
658             ),
659             %args,
660             ),
661             $self->{id},
662 0 0 0       );
663              
664 0           return $self;
665             }
666              
667             sub qos {
668 0     0 1   my $self = shift;
669 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
670              
671 0 0         return $self if !$self->_check_open($failure_cb);
672              
673             $self->{connection}->_push_write_and_read(
674             'Basic::Qos',
675             {
676             prefetch_count => 1,
677             prefetch_size => 0,
678             global => 0,
679             %args,
680             },
681             'Basic::QosOk',
682             $cb,
683             $failure_cb,
684             $self->{id},
685 0           );
686              
687 0           return $self;
688             }
689              
690             sub confirm {
691 0     0 1   my $self = shift;
692 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
693              
694 0 0         return $self if !$self->_check_open($failure_cb);
695 0 0         return $self if !$self->_check_version(0, 9, $failure_cb);
696              
697 0           weaken(my $wself = $self);
698              
699             $self->{connection}->_push_write_and_read(
700             'Confirm::Select',
701             {
702             %args,
703             nowait => 0, # FIXME
704             },
705             'Confirm::SelectOk',
706             sub {
707 0 0   0     my $me = $wself or return;
708 0           $me->{_is_confirm} = 1;
709 0           $cb->();
710             },
711             $failure_cb,
712             $self->{id},
713 0           );
714              
715 0           return $self;
716             }
717              
718             sub recover {
719 0     0 1   my $self = shift;
720 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
721              
722 0 0   0     return $self if !$self->_check_open(sub {});
723              
724             $self->{connection}->_push_write(
725             Net::AMQP::Protocol::Basic::Recover->new(
726             requeue => 1,
727             %args,
728             ),
729             $self->{id},
730 0           );
731              
732 0 0 0       if (!$args{nowait} && $self->_check_version(0, 9)) {
733             $self->{connection}->_push_read_and_valid(
734             'Basic::RecoverOk',
735             $cb,
736             $failure_cb,
737             $self->{id},
738 0           );
739             }
740             else {
741 0           $cb->();
742             }
743              
744 0           return $self;
745             }
746              
747             sub reject {
748 0     0 0   my $self = shift;
749 0           my %args = @_;
750              
751 0 0   0     return $self if !$self->_check_open( sub { } );
752              
753             $self->{connection}->_push_write(
754             Net::AMQP::Protocol::Basic::Reject->new(
755             delivery_tag => 0,
756             requeue => 0,
757             %args,
758             ),
759             $self->{id},
760 0           );
761              
762 0           return $self;
763             }
764              
765             sub select_tx {
766 0     0 1   my $self = shift;
767 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
768              
769 0 0         return $self if !$self->_check_open($failure_cb);
770              
771             $self->{connection}->_push_write_and_read(
772             'Tx::Select', {}, 'Tx::SelectOk',
773             $cb,
774             $failure_cb,
775             $self->{id},
776 0           );
777              
778 0           return $self;
779             }
780              
781             sub commit_tx {
782 0     0 1   my $self = shift;
783 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
784              
785 0 0         return $self if !$self->_check_open($failure_cb);
786              
787             $self->{connection}->_push_write_and_read(
788             'Tx::Commit', {}, 'Tx::CommitOk',
789             $cb,
790             $failure_cb,
791             $self->{id},
792 0           );
793              
794 0           return $self;
795             }
796              
797             sub rollback_tx {
798 0     0 1   my $self = shift;
799 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
800              
801 0 0         return $self if !$self->_check_open($failure_cb);
802              
803             $self->{connection}->_push_write_and_read(
804             'Tx::Rollback', {}, 'Tx::RollbackOk',
805             $cb,
806             $failure_cb,
807             $self->{id},
808 0           );
809              
810 0           return $self;
811             }
812              
813             sub push_queue_or_consume {
814 0     0 0   my $self = shift;
815 0           my ($frame, $failure_cb,) = @_;
816              
817             # Note: the spec says that after a party sends Channel::Close, it MUST
818             # discard all frames for that channel other than Close and CloseOk.
819              
820 0 0         if ($frame->isa('Net::AMQP::Frame::Method')) {
821 0           my $method_frame = $frame->method_frame;
822 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
    0 0        
    0          
    0          
    0          
    0          
    0          
823             $self->{connection}->_push_write(
824             Net::AMQP::Protocol::Channel::CloseOk->new(),
825             $self->{id},
826 0           );
827 0           $self->_closed($frame);
828 0           $self->_orphan();
829 0           return $self;
830             } elsif ($self->{_state} != _ST_OPEN) {
831 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
832             $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
833 0           $self->{_queue}->push($frame);
834             }
835 0           return $self;
836             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
837 0           my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
838 0   0 0     my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
839 0           $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
840 0           return $self;
841             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
842             $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
843             # CancelOk means we asked for a cancel.
844             # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
845 0 0 0       if (!$self->_canceled($method_frame->consumer_tag, $frame)
846             && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
847 0           $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
848             }
849 0           return $self;
850             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
851 0           weaken(my $wself = $self);
852             my $cb = sub {
853 0     0     my $ret = shift;
854 0 0         my $me = $wself or return;
855 0   0       my $headers = $ret->{header}->headers || {};
856 0           my $onret_cb;
857 0 0         if (defined(my $tag = $headers->{_ar_return})) {
858 0           my $cbs = $me->{_publish_cbs}->{$tag};
859 0 0         $onret_cb = $cbs->[2] if $cbs;
860             }
861 0   0       $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
      0        
862 0           $onret_cb->($frame);
863 0           };
864 0           $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
865 0           return $self;
866             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
867             $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
868 0           (my $resp = ref($method_frame)) =~ s/.*:://;
869 0           my $cbs;
870 0 0         if (!$self->{_is_confirm}) {
871 0           $failure_cb->("Received $resp when not in confirm mode");
872             }
873             else {
874 0           my @tags;
875 0 0         if ($method_frame->{multiple}) {
876 0           @tags = sort { $a <=> $b }
877 0           grep { $_ <= $method_frame->{delivery_tag} }
878 0           keys %{$self->{_publish_cbs}};
  0            
879             }
880             else {
881 0           @tags = ($method_frame->{delivery_tag});
882             }
883 0 0         my $cbi = ($resp eq 'Ack') ? 0 : 1;
884 0           for my $tag (@tags) {
885 0           my $cbs;
886 0 0         if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
    0          
887 0           $failure_cb->("Received $resp of unknown delivery tag $tag");
888             }
889             elsif ($cbs->[$cbi]) {
890 0           $cbs->[$cbi]->($frame);
891             }
892             }
893             }
894 0           return $self;
895             } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
896 0           $self->{_is_active} = $method_frame->active;
897             $self->{connection}->_push_write(
898             Net::AMQP::Protocol::Channel::FlowOk->new(
899             active => $method_frame->active,
900             ),
901             $self->{id},
902 0           );
903 0 0         my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
904 0   0 0     my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
905 0           $cb->($frame);
906 0           return $self;
907             }
908 0           $self->{_queue}->push($frame);
909             } else {
910 0           $self->{_content_queue}->push($frame);
911             }
912              
913 0           return $self;
914             }
915              
916             sub _push_read_header_and_body {
917 0     0     my $self = shift;
918 0           my ($type, $frame, $cb, $failure_cb,) = @_;
919 0           my $response = {$type => $frame};
920 0           my $body_size = 0;
921 0           my $body_payload = "";
922              
923 0           weaken(my $wcontq = $self->{_content_queue});
924 0           my $w_body_frame;
925             my $body_frame = sub {
926 0     0     my $frame = shift;
927              
928 0 0         return $failure_cb->('Received data is not body frame')
929             if !$frame->isa('Net::AMQP::Frame::Body');
930              
931 0           $body_payload .= $frame->payload;
932              
933 0 0         if (length($body_payload) < $body_size) {
934             # More to come
935 0 0         my $contq = $wcontq or return;
936 0           $contq->get($w_body_frame);
937             }
938             else {
939 0           $frame->payload($body_payload);
940 0           $response->{body} = $frame;
941 0           $cb->($response);
942             }
943 0           };
944 0           $w_body_frame = $body_frame;
945 0           weaken($w_body_frame);
946              
947             $self->{_content_queue}->get(sub{
948 0     0     my $frame = shift;
949              
950 0 0         return $failure_cb->('Received data is not header frame')
951             if !$frame->isa('Net::AMQP::Frame::Header');
952              
953 0           my $header_frame = $frame->header_frame;
954 0 0         return $failure_cb->(
955             'Header is not Protocol::Basic::ContentHeader'
956             . 'Header was ' . ref $header_frame
957             ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
958              
959 0           $response->{header} = $header_frame;
960              
961 0           $body_size = $frame->body_size;
962 0 0         if ( $body_size ) {
963 0 0         my $contq = $wcontq or return;
964 0           $contq->get($body_frame);
965             } else {
966 0           $response->{body} = undef;
967 0           $cb->($response);
968             }
969 0           });
970              
971 0           return $self;
972             }
973              
974             sub _delete_cbs {
975 0     0     my $self = shift;
976 0           my %args = @_;
977              
978 0   0 0     my $cb = delete $args{on_success} || sub {};
979 0   0 0     my $failure_cb = delete $args{on_failure} || sub {die @_};
  0            
980              
981 0           return $cb, $failure_cb, %args;
982             }
983              
984             sub _check_open {
985 0     0     my $self = shift;
986 0           my ($failure_cb) = @_;
987              
988 0 0         return 1 if $self->is_open();
989              
990 0           $failure_cb->('Channel has already been closed');
991 0           return 0;
992             }
993              
994             sub _check_version {
995 0     0     my $self = shift;
996 0           my ($major, $minor, $failure_cb) = @_;
997              
998 0           my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
999 0           my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
1000              
1001 0 0 0       return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
      0        
1002              
1003 0 0         $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
1004 0           return 0;
1005             }
1006              
1007             sub DESTROY {
1008 0     0     my $self = shift;
1009 0 0 0       $self->close() if !in_global_destruction && $self->is_open();
1010 0           return;
1011             }
1012              
1013             1;
1014             __END__