File Coverage

blib/lib/AnyEvent/RabbitMQ/Channel.pm
Criterion Covered Total %
statement 31 400 7.7
branch 0 152 0.0
condition 0 66 0.0
subroutine 11 73 15.0
pod 21 30 70.0
total 63 721 8.7


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Channel;
2              
3 1     1   8 use strict;
  1         2  
  1         33  
4 1     1   5 use warnings;
  1         2  
  1         25  
5              
6 1     1   460 use AnyEvent::RabbitMQ::LocalQueue;
  1         3  
  1         33  
7 1     1   7 use AnyEvent;
  1         2  
  1         28  
8 1     1   6 use Scalar::Util qw( looks_like_number weaken );
  1         6  
  1         108  
9 1     1   7 use Devel::GlobalDestruction;
  1         2  
  1         18  
10 1     1   109 use Carp qw(croak cluck);
  1         2  
  1         93  
11 1     1   8 use POSIX qw(ceil);
  1         2  
  1         10  
12 1     1   2028 BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
13              
14             our $VERSION = '1.21_01'; # TRIAL VERSION
15             $VERSION = eval $VERSION;
16              
17 1     1   564 use namespace::clean;
  1         16756  
  1         8  
18              
19             use constant {
20 1         5531 _ST_CLOSED => 0,
21             _ST_OPENING => 1,
22             _ST_OPEN => 2,
23 1     1   294 };
  1         2  
24              
25             sub new {
26 0     0 0   my $class = shift;
27              
28             my $self = bless {
29       0     on_close => sub {},
30 0           @_, # id, connection, on_return, on_close, on_inactive, on_active
31             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
32             _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
33             }, $class;
34 0           weaken($self->{connection});
35 0           return $self->_reset;
36             }
37              
38             sub _reset {
39 0     0     my $self = shift;
40              
41 0           my %a = (
42             _state => _ST_CLOSED,
43             _is_active => 0,
44             _is_confirm => 0,
45             _publish_tag => 0,
46             _publish_cbs => {}, # values: [on_ack, on_nack, on_return]
47             _consumer_cbs => {}, # values: [on_consume, on_cancel...]
48             );
49 0           @$self{keys %a} = values %a;
50              
51 0           return $self;
52             }
53              
54             sub id {
55 0     0 0   my $self = shift;
56 0           return $self->{id};
57             }
58              
59             sub is_open {
60 0     0 0   my $self = shift;
61 0           return $self->{_state} == _ST_OPEN;
62             }
63              
64             sub is_active {
65 0     0 0   my $self = shift;
66 0           return $self->{_is_active};
67             }
68              
69             sub is_confirm {
70 0     0 0   my $self = shift;
71 0           return $self->{_is_confirm};
72             }
73              
74             sub queue {
75 0     0 1   my $self = shift;
76 0           return $self->{_queue};
77             }
78              
79             sub open {
80 0     0 0   my $self = shift;
81 0           my %args = @_;
82              
83 0 0         if ($self->{_state} != _ST_CLOSED) {
84 0           $args{on_failure}->('Channel has already been opened');
85 0           return $self;
86             }
87              
88 0           $self->{_state} = _ST_OPENING;
89              
90             $self->{connection}->_push_write_and_read(
91             'Channel::Open', {}, 'Channel::OpenOk',
92             sub {
93 0     0     $self->{_state} = _ST_OPEN;
94 0           $self->{_is_active} = 1;
95 0           $args{on_success}->($self);
96             },
97             sub {
98 0     0     $self->{_state} = _ST_CLOSED;
99 0           $args{on_failure}->($self);
100             },
101             $self->{id},
102 0           );
103              
104 0           return $self;
105             }
106              
107             sub close {
108 0     0 0   my $self = shift;
109             my $connection = $self->{connection}
110 0 0         or return;
111 0           my %args = $connection->_set_cbs(@_);
112              
113             # If open in in progess, wait for it; 1s arbitrary timing.
114              
115 0           weaken(my $wself = $self);
116 0           my $t; $t = AE::timer 0, 1, sub {
117 0 0   0     (my $self = $wself) or undef $t, return;
118 0 0         return if $self->{_state} == _ST_OPENING;
119              
120             # No more tests are required
121 0           undef $t;
122              
123             # Double close is OK
124 0 0         if ($self->{_state} == _ST_CLOSED) {
125 0           $args{on_success}->($self);
126 0           return;
127             }
128              
129             $connection->_push_write(
130             $self->_close_frame,
131             $self->{id},
132 0           );
133              
134             # The spec says that after a party sends Channel::Close, it MUST
135             # discard all frames for that channel. So this channel is dead
136             # immediately.
137 0           $self->_closed();
138              
139             $connection->_push_read_and_valid(
140             'Channel::CloseOk',
141             sub {
142 0           $args{on_success}->($self);
143 0           $self->_orphan();
144             },
145             sub {
146 0           $args{on_failure}->(@_);
147 0           $self->_orphan();
148             },
149             $self->{id},
150 0           );
151 0           };
152              
153 0           return $self;
154             }
155              
156             sub _closed {
157 0     0     my $self = shift;
158 0           my ($frame,) = @_;
159 0   0       $frame ||= $self->_close_frame();
160              
161 0 0         return if $self->{_state} == _ST_CLOSED;
162 0           $self->{_state} = _ST_CLOSED;
163              
164             # Perform callbacks for all outstanding commands
165 0           $self->{_queue}->_flush($frame);
166 0           $self->{_content_queue}->_flush($frame);
167              
168             # Fake nacks of all outstanding publishes
169 0           $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
  0            
  0            
  0            
170              
171             # Report cancelation of all outstanding consumes
172 0           my @tags = keys %{ $self->{_consumer_cbs} };
  0            
173 0           $self->_canceled($_, $frame) for @tags;
174              
175             # Report close to on_close callback
176 0           { local $@;
  0            
177 0           eval { $self->{on_close}->($frame) };
  0            
178 0 0         warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
179              
180             # Reset state (partly redundant)
181 0           $self->_reset;
182              
183 0           return $self;
184             }
185              
186             sub _close_frame {
187 0     0     my $self = shift;
188 0           my ($text,) = @_;
189              
190 0           Net::AMQP::Frame::Method->new(
191             method_frame => Net::AMQP::Protocol::Channel::Close->new(
192             reply_text => $text,
193             ),
194             );
195             }
196              
197             sub _orphan {
198 0     0     my $self = shift;
199              
200 0 0         if (my $connection = $self->{connection}) {
201 0           $connection->_delete_channel($self);
202             }
203 0           return $self;
204             }
205              
206             sub declare_exchange {
207 0     0 1   my $self = shift;
208 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
209              
210 0 0         return $self if !$self->_check_open($failure_cb);
211              
212             $self->{connection}->_push_write_and_read(
213             'Exchange::Declare',
214             {
215             type => 'direct',
216             passive => 0,
217             durable => 0,
218             auto_delete => 0,
219             internal => 0,
220             %args, # exchange
221             ticket => 0,
222             nowait => 0, # FIXME
223             },
224             'Exchange::DeclareOk',
225             $cb,
226             $failure_cb,
227             $self->{id},
228 0           );
229              
230 0           return $self;
231             }
232              
233             sub bind_exchange {
234 0     0 1   my $self = shift;
235 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
236              
237 0 0         return $self if !$self->_check_open($failure_cb);
238              
239             $self->{connection}->_push_write_and_read(
240             'Exchange::Bind',
241             {
242             %args, # source, destination, routing_key
243             ticket => 0,
244             nowait => 0, # FIXME
245             },
246             'Exchange::BindOk',
247             $cb,
248             $failure_cb,
249             $self->{id},
250 0           );
251              
252 0           return $self;
253             }
254              
255             sub unbind_exchange {
256 0     0 1   my $self = shift;
257 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
258              
259 0 0         return $self if !$self->_check_open($failure_cb);
260              
261             $self->{connection}->_push_write_and_read(
262             'Exchange::Unbind',
263             {
264             %args, # source, destination, routing_key
265             ticket => 0,
266             nowait => 0, # FIXME
267             },
268             'Exchange::UnbindOk',
269             $cb,
270             $failure_cb,
271             $self->{id},
272 0           );
273              
274 0           return $self;
275             }
276              
277             sub delete_exchange {
278 0     0 1   my $self = shift;
279 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
280              
281 0 0         return $self if !$self->_check_open($failure_cb);
282              
283             $self->{connection}->_push_write_and_read(
284             'Exchange::Delete',
285             {
286             if_unused => 0,
287             %args, # exchange
288             ticket => 0,
289             nowait => 0, # FIXME
290             },
291             'Exchange::DeleteOk',
292             $cb,
293             $failure_cb,
294             $self->{id},
295 0           );
296              
297 0           return $self;
298             }
299              
300             sub declare_queue {
301 0     0 1   my $self = shift;
302 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
303              
304 0 0         return $self if !$self->_check_open($failure_cb);
305              
306             $self->{connection}->_push_write_and_read(
307             'Queue::Declare',
308             {
309             queue => '',
310             passive => 0,
311             durable => 0,
312             exclusive => 0,
313             auto_delete => 0,
314             no_ack => 1,
315             %args,
316             ticket => 0,
317             nowait => 0, # FIXME
318             },
319             'Queue::DeclareOk',
320             $cb,
321             $failure_cb,
322             $self->{id},
323 0           );
324             }
325              
326             sub bind_queue {
327 0     0 1   my $self = shift;
328 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
329              
330 0 0         return $self if !$self->_check_open($failure_cb);
331              
332             $self->{connection}->_push_write_and_read(
333             'Queue::Bind',
334             {
335             %args, # queue, exchange, routing_key
336             ticket => 0,
337             nowait => 0, # FIXME
338             },
339             'Queue::BindOk',
340             $cb,
341             $failure_cb,
342             $self->{id},
343 0           );
344              
345 0           return $self;
346             }
347              
348             sub unbind_queue {
349 0     0 1   my $self = shift;
350 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
351              
352 0 0         return $self if !$self->_check_open($failure_cb);
353              
354             $self->{connection}->_push_write_and_read(
355             'Queue::Unbind',
356             {
357             %args, # queue, exchange, routing_key
358             ticket => 0,
359             },
360             'Queue::UnbindOk',
361             $cb,
362             $failure_cb,
363             $self->{id},
364 0           );
365              
366 0           return $self;
367             }
368              
369             sub purge_queue {
370 0     0 1   my $self = shift;
371 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
372              
373 0 0         return $self if !$self->_check_open($failure_cb);
374              
375             $self->{connection}->_push_write_and_read(
376             'Queue::Purge',
377             {
378             %args, # queue
379             ticket => 0,
380             nowait => 0, # FIXME
381             },
382             'Queue::PurgeOk',
383             $cb,
384             $failure_cb,
385             $self->{id},
386 0           );
387              
388 0           return $self;
389             }
390              
391             sub delete_queue {
392 0     0 1   my $self = shift;
393 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
394              
395 0 0         return $self if !$self->_check_open($failure_cb);
396              
397             $self->{connection}->_push_write_and_read(
398             'Queue::Delete',
399             {
400             if_unused => 0,
401             if_empty => 0,
402             %args, # queue
403             ticket => 0,
404             nowait => 0, # FIXME
405             },
406             'Queue::DeleteOk',
407             $cb,
408             $failure_cb,
409             $self->{id},
410 0           );
411              
412 0           return $self;
413             }
414              
415             sub publish {
416 0     0 1   my $self = shift;
417 0           my %args = @_;
418              
419             # Docs should advise channel-level callback over this, but still, better to give user an out
420 0 0         unless ($self->{_is_active}) {
421 0 0         if (defined $args{on_inactive}) {
422 0           $args{on_inactive}->();
423 0           return $self;
424             }
425 0           croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
426             }
427              
428 0           my $header_args = delete $args{header};
429 0           my $body = delete $args{body};
430 0           my $ack_cb = delete $args{on_ack};
431 0           my $nack_cb = delete $args{on_nack};
432 0           my $return_cb = delete $args{on_return};
433              
434 0 0         defined($header_args) or $header_args = {};
435 0 0         defined($body) or $body = '';
436 0 0 0       if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
      0        
437             cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
438 0 0         unless $self->{_is_confirm};
439             }
440              
441 0           my $tag;
442 0 0         if ($self->{_is_confirm}) {
443             # yeah, delivery tags in acks are sequential. see Java client
444 0           $tag = ++$self->{_publish_tag};
445 0 0         if ($return_cb) {
446 0           $header_args = { %$header_args };
447 0           $header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not
448             }
449 0           $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
450             }
451              
452             $self->_publish(
453 0           %args,
454             )->_header(
455             $header_args, $body,
456             )->_body(
457             $body,
458             );
459              
460 0           return $self;
461             }
462              
463             sub _publish {
464 0     0     my $self = shift;
465 0           my %args = @_;
466              
467             $self->{connection}->_push_write(
468             Net::AMQP::Protocol::Basic::Publish->new(
469             exchange => '',
470             mandatory => 0,
471             immediate => 0,
472             %args, # routing_key
473             ticket => 0,
474             ),
475             $self->{id},
476 0           );
477              
478 0           return $self;
479             }
480              
481             sub _header {
482 0     0     my ($self, $args, $body) = @_;
483              
484 0   0       my $weight = delete $args->{weight} || 0;
485              
486             # user-provided message headers must be strings. protect values that look like numbers.
487 0   0       my $headers = $args->{headers} || {};
488 0 0         my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
  0            
  0            
489 0 0         if (@prot) {
490             $headers = {
491             %$headers,
492 0           map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
  0            
493             };
494             }
495              
496             $self->{connection}->_push_write(
497             Net::AMQP::Frame::Header->new(
498             weight => $weight,
499             body_size => length($body),
500             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
501             content_type => 'application/octet-stream',
502             content_encoding => undef,
503             delivery_mode => 1,
504             priority => 1,
505             correlation_id => undef,
506             expiration => undef,
507             message_id => undef,
508             timestamp => time,
509             type => undef,
510             user_id => $self->{connection}->login_user,
511             app_id => undef,
512             cluster_id => undef,
513             %$args,
514             headers => $headers,
515             ),
516             ),
517             $self->{id},
518 0           );
519              
520 0           return $self;
521             }
522              
523             sub _body {
524 0     0     my ($self, $body,) = @_;
525              
526 0   0       my $body_max = $self->{connection}->{_body_max} || length $body;
527              
528             # chunk up body into segments measured by $frame_max
529 0           while (length $body) {
530             $self->{connection}->_push_write(
531             Net::AMQP::Frame::Body->new(
532             payload => substr($body, 0, $body_max, '')),
533             $self->{id}
534 0           );
535             }
536              
537 0           return $self;
538             }
539              
540             sub consume {
541 0     0 1   my $self = shift;
542 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
543              
544 0 0         return $self if !$self->_check_open($failure_cb);
545              
546 0   0 0     my $consumer_cb = delete $args{on_consume} || sub {};
547 0   0 0     my $cancel_cb = delete $args{on_cancel} || sub {};
548 0   0       my $no_ack = delete $args{no_ack} // 1;
549              
550             $self->{connection}->_push_write_and_read(
551             'Basic::Consume',
552             {
553             consumer_tag => '',
554             no_local => 0,
555             no_ack => $no_ack,
556             exclusive => 0,
557              
558             %args, # queue
559             ticket => 0,
560             nowait => 0, # FIXME
561             },
562             'Basic::ConsumeOk',
563             sub {
564 0     0     my $frame = shift;
565 0           my $tag = $frame->method_frame->consumer_tag;
566 0           $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
567 0           $cb->($frame);
568             },
569             $failure_cb,
570             $self->{id},
571 0           );
572              
573 0           return $self;
574             }
575              
576             sub cancel {
577 0     0 1   my $self = shift;
578 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
579              
580 0 0         return $self if !$self->_check_open($failure_cb);
581              
582 0 0         if (!defined $args{consumer_tag}) {
583 0           $failure_cb->('consumer_tag is not set');
584 0           return $self;
585             }
586              
587 0           my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}};
588 0 0         unless ($cons_cbs) {
589 0           $failure_cb->('Unknown consumer_tag');
590 0           return $self;
591             }
592 0           push @$cons_cbs, $cb;
593              
594             $self->{connection}->_push_write(
595             Net::AMQP::Protocol::Basic::Cancel->new(
596             %args, # consumer_tag
597             nowait => 0,
598             ),
599             $self->{id},
600 0           );
601              
602 0           return $self;
603             }
604              
605             sub _canceled {
606 0     0     my $self = shift;
607 0           my ($tag, $frame,) = @_;
608              
609 0 0         my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
610             or return 0;
611              
612 0           shift @$cons_cbs; # no more deliveries
613 0           for my $cb (reverse @$cons_cbs) {
614 0           $cb->($frame);
615             }
616 0           return 1;
617             }
618              
619             sub get {
620 0     0 1   my $self = shift;
621 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
622              
623 0   0       my $no_ack = delete $args{no_ack} // 1;
624              
625 0 0         return $self if !$self->_check_open($failure_cb);
626              
627             $self->{connection}->_push_write_and_read(
628             'Basic::Get',
629             {
630             no_ack => $no_ack,
631             %args, # queue
632             ticket => 0,
633             },
634             [qw(Basic::GetOk Basic::GetEmpty)],
635             sub {
636 0     0     my $frame = shift;
637 0 0         return $cb->({empty => $frame})
638             if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
639 0           $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
640             },
641             $failure_cb,
642             $self->{id},
643 0           );
644              
645 0           return $self;
646             }
647              
648             sub ack {
649 0     0 1   my $self = shift;
650 0           my %args = @_;
651              
652 0 0   0     return $self if !$self->_check_open(sub {});
653              
654             $self->{connection}->_push_write(
655             Net::AMQP::Protocol::Basic::Ack->new(
656             delivery_tag => 0,
657             multiple => (
658             defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
659             ),
660             %args,
661             ),
662             $self->{id},
663 0 0 0       );
664              
665 0           return $self;
666             }
667              
668             sub qos {
669 0     0 1   my $self = shift;
670 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
671              
672 0 0         return $self if !$self->_check_open($failure_cb);
673              
674             $self->{connection}->_push_write_and_read(
675             'Basic::Qos',
676             {
677             prefetch_count => 1,
678             prefetch_size => 0,
679             global => 0,
680             %args,
681             },
682             'Basic::QosOk',
683             $cb,
684             $failure_cb,
685             $self->{id},
686 0           );
687              
688 0           return $self;
689             }
690              
691             sub confirm {
692 0     0 1   my $self = shift;
693 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
694              
695 0 0         return $self if !$self->_check_open($failure_cb);
696 0 0         return $self if !$self->_check_version(0, 9, $failure_cb);
697              
698 0           weaken(my $wself = $self);
699              
700             $self->{connection}->_push_write_and_read(
701             'Confirm::Select',
702             {
703             %args,
704             nowait => 0, # FIXME
705             },
706             'Confirm::SelectOk',
707             sub {
708 0 0   0     my $me = $wself or return;
709 0           $me->{_is_confirm} = 1;
710 0           $cb->();
711             },
712             $failure_cb,
713             $self->{id},
714 0           );
715              
716 0           return $self;
717             }
718              
719             sub recover {
720 0     0 1   my $self = shift;
721 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
722              
723 0 0   0     return $self if !$self->_check_open(sub {});
724              
725             $self->{connection}->_push_write(
726             Net::AMQP::Protocol::Basic::Recover->new(
727             requeue => 1,
728             %args,
729             ),
730             $self->{id},
731 0           );
732              
733 0 0 0       if (!$args{nowait} && $self->_check_version(0, 9)) {
734             $self->{connection}->_push_read_and_valid(
735             'Basic::RecoverOk',
736             $cb,
737             $failure_cb,
738             $self->{id},
739 0           );
740             }
741             else {
742 0           $cb->();
743             }
744              
745 0           return $self;
746             }
747              
748             sub reject {
749 0     0 0   my $self = shift;
750 0           my %args = @_;
751              
752 0 0   0     return $self if !$self->_check_open( sub { } );
753              
754             $self->{connection}->_push_write(
755             Net::AMQP::Protocol::Basic::Reject->new(
756             delivery_tag => 0,
757             requeue => 0,
758             %args,
759             ),
760             $self->{id},
761 0           );
762              
763 0           return $self;
764             }
765              
766             sub select_tx {
767 0     0 1   my $self = shift;
768 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
769              
770 0 0         return $self if !$self->_check_open($failure_cb);
771              
772             $self->{connection}->_push_write_and_read(
773             'Tx::Select', {}, 'Tx::SelectOk',
774             $cb,
775             $failure_cb,
776             $self->{id},
777 0           );
778              
779 0           return $self;
780             }
781              
782             sub commit_tx {
783 0     0 1   my $self = shift;
784 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
785              
786 0 0         return $self if !$self->_check_open($failure_cb);
787              
788             $self->{connection}->_push_write_and_read(
789             'Tx::Commit', {}, 'Tx::CommitOk',
790             $cb,
791             $failure_cb,
792             $self->{id},
793 0           );
794              
795 0           return $self;
796             }
797              
798             sub rollback_tx {
799 0     0 1   my $self = shift;
800 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
801              
802 0 0         return $self if !$self->_check_open($failure_cb);
803              
804             $self->{connection}->_push_write_and_read(
805             'Tx::Rollback', {}, 'Tx::RollbackOk',
806             $cb,
807             $failure_cb,
808             $self->{id},
809 0           );
810              
811 0           return $self;
812             }
813              
814             sub push_queue_or_consume {
815 0     0 0   my $self = shift;
816 0           my ($frame, $failure_cb,) = @_;
817              
818             # Note: the spec says that after a party sends Channel::Close, it MUST
819             # discard all frames for that channel other than Close and CloseOk.
820              
821 0 0         if ($frame->isa('Net::AMQP::Frame::Method')) {
822 0           my $method_frame = $frame->method_frame;
823 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
    0 0        
    0          
    0          
    0          
    0          
    0          
824             $self->{connection}->_push_write(
825             Net::AMQP::Protocol::Channel::CloseOk->new(),
826             $self->{id},
827 0           );
828 0           $self->_closed($frame);
829 0           $self->_orphan();
830 0           return $self;
831             } elsif ($self->{_state} != _ST_OPEN) {
832 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
833             $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
834 0           $self->{_queue}->push($frame);
835             }
836 0           return $self;
837             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
838 0           my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
839 0   0 0     my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
840 0           $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
841 0           return $self;
842             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
843             $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
844             # CancelOk means we asked for a cancel.
845             # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
846 0 0 0       if (!$self->_canceled($method_frame->consumer_tag, $frame)
847             && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
848 0           $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
849             }
850 0           return $self;
851             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
852 0           weaken(my $wself = $self);
853             my $cb = sub {
854 0     0     my $ret = shift;
855 0 0         my $me = $wself or return;
856 0   0       my $headers = $ret->{header}->headers || {};
857 0           my $onret_cb;
858 0 0         if (defined(my $tag = $headers->{_ar_return})) {
859 0           my $cbs = $me->{_publish_cbs}->{$tag};
860 0 0         $onret_cb = $cbs->[2] if $cbs;
861             }
862 0   0       $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
      0        
863 0           $onret_cb->($frame);
864 0           };
865 0           $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
866 0           return $self;
867             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
868             $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
869 0           (my $resp = ref($method_frame)) =~ s/.*:://;
870 0           my $cbs;
871 0 0         if (!$self->{_is_confirm}) {
872 0           $failure_cb->("Received $resp when not in confirm mode");
873             }
874             else {
875 0           my @tags;
876 0 0         if ($method_frame->{multiple}) {
877 0           @tags = sort { $a <=> $b }
878 0           grep { $_ <= $method_frame->{delivery_tag} }
879 0           keys %{$self->{_publish_cbs}};
  0            
880             }
881             else {
882 0           @tags = ($method_frame->{delivery_tag});
883             }
884 0 0         my $cbi = ($resp eq 'Ack') ? 0 : 1;
885 0           for my $tag (@tags) {
886 0           my $cbs;
887 0 0         if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
    0          
888 0           $failure_cb->("Received $resp of unknown delivery tag $tag");
889             }
890             elsif ($cbs->[$cbi]) {
891 0           $cbs->[$cbi]->($frame);
892             }
893             }
894             }
895 0           return $self;
896             } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
897 0           $self->{_is_active} = $method_frame->active;
898             $self->{connection}->_push_write(
899             Net::AMQP::Protocol::Channel::FlowOk->new(
900             active => $method_frame->active,
901             ),
902             $self->{id},
903 0           );
904 0 0         my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
905 0   0 0     my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
906 0           $cb->($frame);
907 0           return $self;
908             }
909 0           $self->{_queue}->push($frame);
910             } else {
911 0           $self->{_content_queue}->push($frame);
912             }
913              
914 0           return $self;
915             }
916              
917             sub _push_read_header_and_body {
918 0     0     my $self = shift;
919 0           my ($type, $frame, $cb, $failure_cb,) = @_;
920 0           my $response = {$type => $frame};
921 0           my $body_size = 0;
922 0           my $body_payload = "";
923              
924 0           weaken(my $wcontq = $self->{_content_queue});
925 0           my $w_body_frame;
926             my $body_frame = sub {
927 0     0     my $frame = shift;
928              
929 0 0         return $failure_cb->('Received data is not body frame')
930             if !$frame->isa('Net::AMQP::Frame::Body');
931              
932 0           $body_payload .= $frame->payload;
933              
934 0 0         if (length($body_payload) < $body_size) {
935             # More to come
936 0 0         my $contq = $wcontq or return;
937 0           $contq->get($w_body_frame);
938             }
939             else {
940 0           $frame->payload($body_payload);
941 0           $response->{body} = $frame;
942 0           $cb->($response);
943             }
944 0           };
945 0           $w_body_frame = $body_frame;
946 0           weaken($w_body_frame);
947              
948             $self->{_content_queue}->get(sub{
949 0     0     my $frame = shift;
950              
951 0 0         return $failure_cb->('Received data is not header frame')
952             if !$frame->isa('Net::AMQP::Frame::Header');
953              
954 0           my $header_frame = $frame->header_frame;
955 0 0         return $failure_cb->(
956             'Header is not Protocol::Basic::ContentHeader'
957             . 'Header was ' . ref $header_frame
958             ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
959              
960 0           $response->{header} = $header_frame;
961              
962 0           $body_size = $frame->body_size;
963 0 0         if ( $body_size ) {
964 0 0         my $contq = $wcontq or return;
965 0           $contq->get($body_frame);
966             } else {
967 0           $response->{body} = undef;
968 0           $cb->($response);
969             }
970 0           });
971              
972 0           return $self;
973             }
974              
975             sub _delete_cbs {
976 0     0     my $self = shift;
977 0           my %args = @_;
978              
979 0   0 0     my $cb = delete $args{on_success} || sub {};
980 0   0 0     my $failure_cb = delete $args{on_failure} || sub {die @_};
  0            
981              
982 0           return $cb, $failure_cb, %args;
983             }
984              
985             sub _check_open {
986 0     0     my $self = shift;
987 0           my ($failure_cb) = @_;
988              
989 0 0         return 1 if $self->is_open();
990              
991 0           $failure_cb->('Channel has already been closed');
992 0           return 0;
993             }
994              
995             sub _check_version {
996 0     0     my $self = shift;
997 0           my ($major, $minor, $failure_cb) = @_;
998              
999 0           my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
1000 0           my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
1001              
1002 0 0 0       return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
      0        
1003              
1004 0 0         $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
1005 0           return 0;
1006             }
1007              
1008             sub DESTROY {
1009 0     0     my $self = shift;
1010 0 0 0       $self->close() if !in_global_destruction && $self->is_open();
1011 0           return;
1012             }
1013              
1014             1;
1015             __END__