File Coverage

blib/lib/AnyEvent/RabbitMQ/Channel.pm
Criterion Covered Total %
statement 31 400 7.7
branch 0 152 0.0
condition 0 66 0.0
subroutine 11 73 15.0
pod 21 30 70.0
total 63 721 8.7


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Channel;
2              
3 1     1   8 use strict;
  1         2  
  1         32  
4 1     1   5 use warnings;
  1         9  
  1         26  
5              
6 1     1   420 use AnyEvent::RabbitMQ::LocalQueue;
  1         2  
  1         31  
7 1     1   7 use AnyEvent;
  1         2  
  1         27  
8 1     1   6 use Scalar::Util qw( looks_like_number weaken );
  1         2  
  1         57  
9 1     1   7 use Devel::GlobalDestruction;
  1         2  
  1         11  
10 1     1   72 use Carp qw(croak cluck);
  1         17  
  1         66  
11 1     1   7 use POSIX qw(ceil);
  1         2  
  1         10  
12 1     1   1924 BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
13              
14 1     1   508 use namespace::clean;
  1         16339  
  1         7  
15              
16             use constant {
17 1         5506 _ST_CLOSED => 0,
18             _ST_OPENING => 1,
19             _ST_OPEN => 2,
20 1     1   287 };
  1         2  
21              
22             sub new {
23 0     0 0   my $class = shift;
24              
25             my $self = bless {
26       0     on_close => sub {},
27 0           @_, # id, connection, on_return, on_close, on_inactive, on_active
28             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
29             _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
30             }, $class;
31 0           weaken($self->{connection});
32 0           return $self->_reset;
33             }
34              
35             sub _reset {
36 0     0     my $self = shift;
37              
38 0           my %a = (
39             _state => _ST_CLOSED,
40             _is_active => 0,
41             _is_confirm => 0,
42             _publish_tag => 0,
43             _publish_cbs => {}, # values: [on_ack, on_nack, on_return]
44             _consumer_cbs => {}, # values: [on_consume, on_cancel...]
45             );
46 0           @$self{keys %a} = values %a;
47              
48 0           return $self;
49             }
50              
51             sub id {
52 0     0 0   my $self = shift;
53 0           return $self->{id};
54             }
55              
56             sub is_open {
57 0     0 0   my $self = shift;
58 0           return $self->{_state} == _ST_OPEN;
59             }
60              
61             sub is_active {
62 0     0 0   my $self = shift;
63 0           return $self->{_is_active};
64             }
65              
66             sub is_confirm {
67 0     0 0   my $self = shift;
68 0           return $self->{_is_confirm};
69             }
70              
71             sub queue {
72 0     0 1   my $self = shift;
73 0           return $self->{_queue};
74             }
75              
76             sub open {
77 0     0 0   my $self = shift;
78 0           my %args = @_;
79              
80 0 0         if ($self->{_state} != _ST_CLOSED) {
81 0           $args{on_failure}->('Channel has already been opened');
82 0           return $self;
83             }
84              
85 0           $self->{_state} = _ST_OPENING;
86              
87             $self->{connection}->_push_write_and_read(
88             'Channel::Open', {}, 'Channel::OpenOk',
89             sub {
90 0     0     $self->{_state} = _ST_OPEN;
91 0           $self->{_is_active} = 1;
92 0           $args{on_success}->($self);
93             },
94             sub {
95 0     0     $self->{_state} = _ST_CLOSED;
96 0           $args{on_failure}->($self);
97             },
98             $self->{id},
99 0           );
100              
101 0           return $self;
102             }
103              
104             sub close {
105 0     0 0   my $self = shift;
106             my $connection = $self->{connection}
107 0 0         or return;
108 0           my %args = $connection->_set_cbs(@_);
109              
110             # If open in in progess, wait for it; 1s arbitrary timing.
111              
112 0           weaken(my $wself = $self);
113 0           my $t; $t = AE::timer 0, 1, sub {
114 0 0   0     (my $self = $wself) or undef $t, return;
115 0 0         return if $self->{_state} == _ST_OPENING;
116              
117             # No more tests are required
118 0           undef $t;
119              
120             # Double close is OK
121 0 0         if ($self->{_state} == _ST_CLOSED) {
122 0           $args{on_success}->($self);
123 0           return;
124             }
125              
126             $connection->_push_write(
127             $self->_close_frame,
128             $self->{id},
129 0           );
130              
131             # The spec says that after a party sends Channel::Close, it MUST
132             # discard all frames for that channel. So this channel is dead
133             # immediately.
134 0           $self->_closed();
135              
136             $connection->_push_read_and_valid(
137             'Channel::CloseOk',
138             sub {
139 0           $args{on_success}->($self);
140 0           $self->_orphan();
141             },
142             sub {
143 0           $args{on_failure}->(@_);
144 0           $self->_orphan();
145             },
146             $self->{id},
147 0           );
148 0           };
149              
150 0           return $self;
151             }
152              
153             sub _closed {
154 0     0     my $self = shift;
155 0           my ($frame,) = @_;
156 0   0       $frame ||= $self->_close_frame();
157              
158 0 0         return if $self->{_state} == _ST_CLOSED;
159 0           $self->{_state} = _ST_CLOSED;
160              
161             # Perform callbacks for all outstanding commands
162 0           $self->{_queue}->_flush($frame);
163 0           $self->{_content_queue}->_flush($frame);
164              
165             # Fake nacks of all outstanding publishes
166 0           $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
  0            
  0            
  0            
167              
168             # Report cancelation of all outstanding consumes
169 0           my @tags = keys %{ $self->{_consumer_cbs} };
  0            
170 0           $self->_canceled($_, $frame) for @tags;
171              
172             # Report close to on_close callback
173 0           { local $@;
  0            
174 0           eval { $self->{on_close}->($frame) };
  0            
175 0 0         warn "Error in channel on_close callback, ignored:\n $@ " if $@; }
176              
177             # Reset state (partly redundant)
178 0           $self->_reset;
179              
180 0           return $self;
181             }
182              
183             sub _close_frame {
184 0     0     my $self = shift;
185 0           my ($text,) = @_;
186              
187 0           Net::AMQP::Frame::Method->new(
188             method_frame => Net::AMQP::Protocol::Channel::Close->new(
189             reply_text => $text,
190             ),
191             );
192             }
193              
194             sub _orphan {
195 0     0     my $self = shift;
196              
197 0 0         if (my $connection = $self->{connection}) {
198 0           $connection->_delete_channel($self);
199             }
200 0           return $self;
201             }
202              
203             sub declare_exchange {
204 0     0 1   my $self = shift;
205 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
206              
207 0 0         return $self if !$self->_check_open($failure_cb);
208              
209             $self->{connection}->_push_write_and_read(
210             'Exchange::Declare',
211             {
212             type => 'direct',
213             passive => 0,
214             durable => 0,
215             auto_delete => 0,
216             internal => 0,
217             %args, # exchange
218             ticket => 0,
219             nowait => 0, # FIXME
220             },
221             'Exchange::DeclareOk',
222             $cb,
223             $failure_cb,
224             $self->{id},
225 0           );
226              
227 0           return $self;
228             }
229              
230             sub bind_exchange {
231 0     0 1   my $self = shift;
232 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
233              
234 0 0         return $self if !$self->_check_open($failure_cb);
235              
236             $self->{connection}->_push_write_and_read(
237             'Exchange::Bind',
238             {
239             %args, # source, destination, routing_key
240             ticket => 0,
241             nowait => 0, # FIXME
242             },
243             'Exchange::BindOk',
244             $cb,
245             $failure_cb,
246             $self->{id},
247 0           );
248              
249 0           return $self;
250             }
251              
252             sub unbind_exchange {
253 0     0 1   my $self = shift;
254 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
255              
256 0 0         return $self if !$self->_check_open($failure_cb);
257              
258             $self->{connection}->_push_write_and_read(
259             'Exchange::Unbind',
260             {
261             %args, # source, destination, routing_key
262             ticket => 0,
263             nowait => 0, # FIXME
264             },
265             'Exchange::UnbindOk',
266             $cb,
267             $failure_cb,
268             $self->{id},
269 0           );
270              
271 0           return $self;
272             }
273              
274             sub delete_exchange {
275 0     0 1   my $self = shift;
276 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
277              
278 0 0         return $self if !$self->_check_open($failure_cb);
279              
280             $self->{connection}->_push_write_and_read(
281             'Exchange::Delete',
282             {
283             if_unused => 0,
284             %args, # exchange
285             ticket => 0,
286             nowait => 0, # FIXME
287             },
288             'Exchange::DeleteOk',
289             $cb,
290             $failure_cb,
291             $self->{id},
292 0           );
293              
294 0           return $self;
295             }
296              
297             sub declare_queue {
298 0     0 1   my $self = shift;
299 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
300              
301 0 0         return $self if !$self->_check_open($failure_cb);
302              
303             $self->{connection}->_push_write_and_read(
304             'Queue::Declare',
305             {
306             queue => '',
307             passive => 0,
308             durable => 0,
309             exclusive => 0,
310             auto_delete => 0,
311             no_ack => 1,
312             %args,
313             ticket => 0,
314             nowait => 0, # FIXME
315             },
316             'Queue::DeclareOk',
317             $cb,
318             $failure_cb,
319             $self->{id},
320 0           );
321             }
322              
323             sub bind_queue {
324 0     0 1   my $self = shift;
325 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
326              
327 0 0         return $self if !$self->_check_open($failure_cb);
328              
329             $self->{connection}->_push_write_and_read(
330             'Queue::Bind',
331             {
332             %args, # queue, exchange, routing_key
333             ticket => 0,
334             nowait => 0, # FIXME
335             },
336             'Queue::BindOk',
337             $cb,
338             $failure_cb,
339             $self->{id},
340 0           );
341              
342 0           return $self;
343             }
344              
345             sub unbind_queue {
346 0     0 1   my $self = shift;
347 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
348              
349 0 0         return $self if !$self->_check_open($failure_cb);
350              
351             $self->{connection}->_push_write_and_read(
352             'Queue::Unbind',
353             {
354             %args, # queue, exchange, routing_key
355             ticket => 0,
356             },
357             'Queue::UnbindOk',
358             $cb,
359             $failure_cb,
360             $self->{id},
361 0           );
362              
363 0           return $self;
364             }
365              
366             sub purge_queue {
367 0     0 1   my $self = shift;
368 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
369              
370 0 0         return $self if !$self->_check_open($failure_cb);
371              
372             $self->{connection}->_push_write_and_read(
373             'Queue::Purge',
374             {
375             %args, # queue
376             ticket => 0,
377             nowait => 0, # FIXME
378             },
379             'Queue::PurgeOk',
380             $cb,
381             $failure_cb,
382             $self->{id},
383 0           );
384              
385 0           return $self;
386             }
387              
388             sub delete_queue {
389 0     0 1   my $self = shift;
390 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
391              
392 0 0         return $self if !$self->_check_open($failure_cb);
393              
394             $self->{connection}->_push_write_and_read(
395             'Queue::Delete',
396             {
397             if_unused => 0,
398             if_empty => 0,
399             %args, # queue
400             ticket => 0,
401             nowait => 0, # FIXME
402             },
403             'Queue::DeleteOk',
404             $cb,
405             $failure_cb,
406             $self->{id},
407 0           );
408              
409 0           return $self;
410             }
411              
412             sub publish {
413 0     0 1   my $self = shift;
414 0           my %args = @_;
415              
416             # Docs should advise channel-level callback over this, but still, better to give user an out
417 0 0         unless ($self->{_is_active}) {
418 0 0         if (defined $args{on_inactive}) {
419 0           $args{on_inactive}->();
420 0           return $self;
421             }
422 0           croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
423             }
424              
425 0           my $header_args = delete $args{header};
426 0           my $body = delete $args{body};
427 0           my $ack_cb = delete $args{on_ack};
428 0           my $nack_cb = delete $args{on_nack};
429 0           my $return_cb = delete $args{on_return};
430              
431 0 0         defined($header_args) or $header_args = {};
432 0 0         defined($body) or $body = '';
433 0 0 0       if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) {
      0        
434             cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode"
435 0 0         unless $self->{_is_confirm};
436             }
437              
438 0           my $tag;
439 0 0         if ($self->{_is_confirm}) {
440             # yeah, delivery tags in acks are sequential. see Java client
441 0           $tag = ++$self->{_publish_tag};
442 0 0         if ($return_cb) {
443 0           $header_args = { %$header_args };
444 0           $header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not
445             }
446 0           $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
447             }
448              
449             $self->_publish(
450 0           %args,
451             )->_header(
452             $header_args, $body,
453             )->_body(
454             $body,
455             );
456              
457 0           return $self;
458             }
459              
460             sub _publish {
461 0     0     my $self = shift;
462 0           my %args = @_;
463              
464             $self->{connection}->_push_write(
465             Net::AMQP::Protocol::Basic::Publish->new(
466             exchange => '',
467             mandatory => 0,
468             immediate => 0,
469             %args, # routing_key
470             ticket => 0,
471             ),
472             $self->{id},
473 0           );
474              
475 0           return $self;
476             }
477              
478             sub _header {
479 0     0     my ($self, $args, $body) = @_;
480              
481 0   0       my $weight = delete $args->{weight} || 0;
482              
483             # user-provided message headers must be strings. protect values that look like numbers.
484 0   0       my $headers = $args->{headers} || {};
485 0 0         my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
  0            
  0            
486 0 0         if (@prot) {
487             $headers = {
488             %$headers,
489 0           map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
  0            
490             };
491             }
492              
493             $self->{connection}->_push_write(
494             Net::AMQP::Frame::Header->new(
495             weight => $weight,
496             body_size => length($body),
497             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
498             content_type => 'application/octet-stream',
499             content_encoding => undef,
500             delivery_mode => 1,
501             priority => 1,
502             correlation_id => undef,
503             expiration => undef,
504             message_id => undef,
505             timestamp => time,
506             type => undef,
507             user_id => $self->{connection}->login_user,
508             app_id => undef,
509             cluster_id => undef,
510             %$args,
511             headers => $headers,
512             ),
513             ),
514             $self->{id},
515 0           );
516              
517 0           return $self;
518             }
519              
520             sub _body {
521 0     0     my ($self, $body,) = @_;
522              
523 0   0       my $body_max = $self->{connection}->{_body_max} || length $body;
524              
525             # chunk up body into segments measured by $frame_max
526 0           while (length $body) {
527             $self->{connection}->_push_write(
528             Net::AMQP::Frame::Body->new(
529             payload => substr($body, 0, $body_max, '')),
530             $self->{id}
531 0           );
532             }
533              
534 0           return $self;
535             }
536              
537             sub consume {
538 0     0 1   my $self = shift;
539 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
540              
541 0 0         return $self if !$self->_check_open($failure_cb);
542              
543 0   0 0     my $consumer_cb = delete $args{on_consume} || sub {};
544 0   0 0     my $cancel_cb = delete $args{on_cancel} || sub {};
545 0   0       my $no_ack = delete $args{no_ack} // 1;
546              
547             $self->{connection}->_push_write_and_read(
548             'Basic::Consume',
549             {
550             consumer_tag => '',
551             no_local => 0,
552             no_ack => $no_ack,
553             exclusive => 0,
554              
555             %args, # queue
556             ticket => 0,
557             nowait => 0, # FIXME
558             },
559             'Basic::ConsumeOk',
560             sub {
561 0     0     my $frame = shift;
562 0           my $tag = $frame->method_frame->consumer_tag;
563 0           $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
564 0           $cb->($frame);
565             },
566             $failure_cb,
567             $self->{id},
568 0           );
569              
570 0           return $self;
571             }
572              
573             sub cancel {
574 0     0 1   my $self = shift;
575 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
576              
577 0 0         return $self if !$self->_check_open($failure_cb);
578              
579 0 0         if (!defined $args{consumer_tag}) {
580 0           $failure_cb->('consumer_tag is not set');
581 0           return $self;
582             }
583              
584 0           my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}};
585 0 0         unless ($cons_cbs) {
586 0           $failure_cb->('Unknown consumer_tag');
587 0           return $self;
588             }
589 0           push @$cons_cbs, $cb;
590              
591             $self->{connection}->_push_write(
592             Net::AMQP::Protocol::Basic::Cancel->new(
593             %args, # consumer_tag
594             nowait => 0,
595             ),
596             $self->{id},
597 0           );
598              
599 0           return $self;
600             }
601              
602             sub _canceled {
603 0     0     my $self = shift;
604 0           my ($tag, $frame,) = @_;
605              
606 0 0         my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
607             or return 0;
608              
609 0           shift @$cons_cbs; # no more deliveries
610 0           for my $cb (reverse @$cons_cbs) {
611 0           $cb->($frame);
612             }
613 0           return 1;
614             }
615              
616             sub get {
617 0     0 1   my $self = shift;
618 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
619              
620 0   0       my $no_ack = delete $args{no_ack} // 1;
621              
622 0 0         return $self if !$self->_check_open($failure_cb);
623              
624             $self->{connection}->_push_write_and_read(
625             'Basic::Get',
626             {
627             no_ack => $no_ack,
628             %args, # queue
629             ticket => 0,
630             },
631             [qw(Basic::GetOk Basic::GetEmpty)],
632             sub {
633 0     0     my $frame = shift;
634 0 0         return $cb->({empty => $frame})
635             if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
636 0           $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
637             },
638             $failure_cb,
639             $self->{id},
640 0           );
641              
642 0           return $self;
643             }
644              
645             sub ack {
646 0     0 1   my $self = shift;
647 0           my %args = @_;
648              
649 0 0   0     return $self if !$self->_check_open(sub {});
650              
651             $self->{connection}->_push_write(
652             Net::AMQP::Protocol::Basic::Ack->new(
653             delivery_tag => 0,
654             multiple => (
655             defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
656             ),
657             %args,
658             ),
659             $self->{id},
660 0 0 0       );
661              
662 0           return $self;
663             }
664              
665             sub qos {
666 0     0 1   my $self = shift;
667 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
668              
669 0 0         return $self if !$self->_check_open($failure_cb);
670              
671             $self->{connection}->_push_write_and_read(
672             'Basic::Qos',
673             {
674             prefetch_count => 1,
675             prefetch_size => 0,
676             global => 0,
677             %args,
678             },
679             'Basic::QosOk',
680             $cb,
681             $failure_cb,
682             $self->{id},
683 0           );
684              
685 0           return $self;
686             }
687              
688             sub confirm {
689 0     0 1   my $self = shift;
690 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
691              
692 0 0         return $self if !$self->_check_open($failure_cb);
693 0 0         return $self if !$self->_check_version(0, 9, $failure_cb);
694              
695 0           weaken(my $wself = $self);
696              
697             $self->{connection}->_push_write_and_read(
698             'Confirm::Select',
699             {
700             %args,
701             nowait => 0, # FIXME
702             },
703             'Confirm::SelectOk',
704             sub {
705 0 0   0     my $me = $wself or return;
706 0           $me->{_is_confirm} = 1;
707 0           $cb->();
708             },
709             $failure_cb,
710             $self->{id},
711 0           );
712              
713 0           return $self;
714             }
715              
716             sub recover {
717 0     0 1   my $self = shift;
718 0           my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
719              
720 0 0   0     return $self if !$self->_check_open(sub {});
721              
722             $self->{connection}->_push_write(
723             Net::AMQP::Protocol::Basic::Recover->new(
724             requeue => 1,
725             %args,
726             ),
727             $self->{id},
728 0           );
729              
730 0 0 0       if (!$args{nowait} && $self->_check_version(0, 9)) {
731             $self->{connection}->_push_read_and_valid(
732             'Basic::RecoverOk',
733             $cb,
734             $failure_cb,
735             $self->{id},
736 0           );
737             }
738             else {
739 0           $cb->();
740             }
741              
742 0           return $self;
743             }
744              
745             sub reject {
746 0     0 0   my $self = shift;
747 0           my %args = @_;
748              
749 0 0   0     return $self if !$self->_check_open( sub { } );
750              
751             $self->{connection}->_push_write(
752             Net::AMQP::Protocol::Basic::Reject->new(
753             delivery_tag => 0,
754             requeue => 0,
755             %args,
756             ),
757             $self->{id},
758 0           );
759              
760 0           return $self;
761             }
762              
763             sub select_tx {
764 0     0 1   my $self = shift;
765 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
766              
767 0 0         return $self if !$self->_check_open($failure_cb);
768              
769             $self->{connection}->_push_write_and_read(
770             'Tx::Select', {}, 'Tx::SelectOk',
771             $cb,
772             $failure_cb,
773             $self->{id},
774 0           );
775              
776 0           return $self;
777             }
778              
779             sub commit_tx {
780 0     0 1   my $self = shift;
781 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
782              
783 0 0         return $self if !$self->_check_open($failure_cb);
784              
785             $self->{connection}->_push_write_and_read(
786             'Tx::Commit', {}, 'Tx::CommitOk',
787             $cb,
788             $failure_cb,
789             $self->{id},
790 0           );
791              
792 0           return $self;
793             }
794              
795             sub rollback_tx {
796 0     0 1   my $self = shift;
797 0           my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
798              
799 0 0         return $self if !$self->_check_open($failure_cb);
800              
801             $self->{connection}->_push_write_and_read(
802             'Tx::Rollback', {}, 'Tx::RollbackOk',
803             $cb,
804             $failure_cb,
805             $self->{id},
806 0           );
807              
808 0           return $self;
809             }
810              
811             sub push_queue_or_consume {
812 0     0 0   my $self = shift;
813 0           my ($frame, $failure_cb,) = @_;
814              
815             # Note: the spec says that after a party sends Channel::Close, it MUST
816             # discard all frames for that channel other than Close and CloseOk.
817              
818 0 0         if ($frame->isa('Net::AMQP::Frame::Method')) {
819 0           my $method_frame = $frame->method_frame;
820 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
    0 0        
    0          
    0          
    0          
    0          
    0          
821             $self->{connection}->_push_write(
822             Net::AMQP::Protocol::Channel::CloseOk->new(),
823             $self->{id},
824 0           );
825 0           $self->_closed($frame);
826 0           $self->_orphan();
827 0           return $self;
828             } elsif ($self->{_state} != _ST_OPEN) {
829 0 0 0       if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') ||
830             $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) {
831 0           $self->{_queue}->push($frame);
832             }
833 0           return $self;
834             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
835 0           my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
836 0   0 0     my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
837 0           $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
838 0           return $self;
839             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
840             $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
841             # CancelOk means we asked for a cancel.
842             # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
843 0 0 0       if (!$self->_canceled($method_frame->consumer_tag, $frame)
844             && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
845 0           $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
846             }
847 0           return $self;
848             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
849 0           weaken(my $wself = $self);
850             my $cb = sub {
851 0     0     my $ret = shift;
852 0 0         my $me = $wself or return;
853 0   0       my $headers = $ret->{header}->headers || {};
854 0           my $onret_cb;
855 0 0         if (defined(my $tag = $headers->{_ar_return})) {
856 0           my $cbs = $me->{_publish_cbs}->{$tag};
857 0 0         $onret_cb = $cbs->[2] if $cbs;
858             }
859 0   0       $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well
      0        
860 0           $onret_cb->($frame);
861 0           };
862 0           $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
863 0           return $self;
864             } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
865             $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
866 0           (my $resp = ref($method_frame)) =~ s/.*:://;
867 0           my $cbs;
868 0 0         if (!$self->{_is_confirm}) {
869 0           $failure_cb->("Received $resp when not in confirm mode");
870             }
871             else {
872 0           my @tags;
873 0 0         if ($method_frame->{multiple}) {
874 0           @tags = sort { $a <=> $b }
875 0           grep { $_ <= $method_frame->{delivery_tag} }
876 0           keys %{$self->{_publish_cbs}};
  0            
877             }
878             else {
879 0           @tags = ($method_frame->{delivery_tag});
880             }
881 0 0         my $cbi = ($resp eq 'Ack') ? 0 : 1;
882 0           for my $tag (@tags) {
883 0           my $cbs;
884 0 0         if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
    0          
885 0           $failure_cb->("Received $resp of unknown delivery tag $tag");
886             }
887             elsif ($cbs->[$cbi]) {
888 0           $cbs->[$cbi]->($frame);
889             }
890             }
891             }
892 0           return $self;
893             } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
894 0           $self->{_is_active} = $method_frame->active;
895             $self->{connection}->_push_write(
896             Net::AMQP::Protocol::Channel::FlowOk->new(
897             active => $method_frame->active,
898             ),
899             $self->{id},
900 0           );
901 0 0         my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
902 0   0 0     my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
903 0           $cb->($frame);
904 0           return $self;
905             }
906 0           $self->{_queue}->push($frame);
907             } else {
908 0           $self->{_content_queue}->push($frame);
909             }
910              
911 0           return $self;
912             }
913              
914             sub _push_read_header_and_body {
915 0     0     my $self = shift;
916 0           my ($type, $frame, $cb, $failure_cb,) = @_;
917 0           my $response = {$type => $frame};
918 0           my $body_size = 0;
919 0           my $body_payload = "";
920              
921 0           weaken(my $wcontq = $self->{_content_queue});
922 0           my $w_body_frame;
923             my $body_frame = sub {
924 0     0     my $frame = shift;
925              
926 0 0         return $failure_cb->('Received data is not body frame')
927             if !$frame->isa('Net::AMQP::Frame::Body');
928              
929 0           $body_payload .= $frame->payload;
930              
931 0 0         if (length($body_payload) < $body_size) {
932             # More to come
933 0 0         my $contq = $wcontq or return;
934 0           $contq->get($w_body_frame);
935             }
936             else {
937 0           $frame->payload($body_payload);
938 0           $response->{body} = $frame;
939 0           $cb->($response);
940             }
941 0           };
942 0           $w_body_frame = $body_frame;
943 0           weaken($w_body_frame);
944              
945             $self->{_content_queue}->get(sub{
946 0     0     my $frame = shift;
947              
948 0 0         return $failure_cb->('Received data is not header frame')
949             if !$frame->isa('Net::AMQP::Frame::Header');
950              
951 0           my $header_frame = $frame->header_frame;
952 0 0         return $failure_cb->(
953             'Header is not Protocol::Basic::ContentHeader'
954             . 'Header was ' . ref $header_frame
955             ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
956              
957 0           $response->{header} = $header_frame;
958              
959 0           $body_size = $frame->body_size;
960 0 0         if ( $body_size ) {
961 0 0         my $contq = $wcontq or return;
962 0           $contq->get($body_frame);
963             } else {
964 0           $response->{body} = undef;
965 0           $cb->($response);
966             }
967 0           });
968              
969 0           return $self;
970             }
971              
972             sub _delete_cbs {
973 0     0     my $self = shift;
974 0           my %args = @_;
975              
976 0   0 0     my $cb = delete $args{on_success} || sub {};
977 0   0 0     my $failure_cb = delete $args{on_failure} || sub {die @_};
  0            
978              
979 0           return $cb, $failure_cb, %args;
980             }
981              
982             sub _check_open {
983 0     0     my $self = shift;
984 0           my ($failure_cb) = @_;
985              
986 0 0         return 1 if $self->is_open();
987              
988 0           $failure_cb->('Channel has already been closed');
989 0           return 0;
990             }
991              
992             sub _check_version {
993 0     0     my $self = shift;
994 0           my ($major, $minor, $failure_cb) = @_;
995              
996 0           my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
997 0           my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
998              
999 0 0 0       return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
      0        
1000              
1001 0 0         $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
1002 0           return 0;
1003             }
1004              
1005             sub DESTROY {
1006 0     0     my $self = shift;
1007 0 0 0       $self->close() if !in_global_destruction && $self->is_open();
1008 0           return;
1009             }
1010              
1011             1;
1012             __END__