| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package AnyEvent::RabbitMQ::Channel; | 
| 2 |  |  |  |  |  |  |  | 
| 3 | 1 |  |  | 1 |  | 8 | use strict; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 32 |  | 
| 4 | 1 |  |  | 1 |  | 5 | use warnings; | 
|  | 1 |  |  |  |  | 9 |  | 
|  | 1 |  |  |  |  | 26 |  | 
| 5 |  |  |  |  |  |  |  | 
| 6 | 1 |  |  | 1 |  | 420 | use AnyEvent::RabbitMQ::LocalQueue; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 31 |  | 
| 7 | 1 |  |  | 1 |  | 7 | use AnyEvent; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 27 |  | 
| 8 | 1 |  |  | 1 |  | 6 | use Scalar::Util qw( looks_like_number weaken ); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 57 |  | 
| 9 | 1 |  |  | 1 |  | 7 | use Devel::GlobalDestruction; | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 11 |  | 
| 10 | 1 |  |  | 1 |  | 72 | use Carp qw(croak cluck); | 
|  | 1 |  |  |  |  | 17 |  | 
|  | 1 |  |  |  |  | 66 |  | 
| 11 | 1 |  |  | 1 |  | 7 | use POSIX qw(ceil); | 
|  | 1 |  |  |  |  | 2 |  | 
|  | 1 |  |  |  |  | 10 |  | 
| 12 | 1 |  |  | 1 |  | 1924 | BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper } | 
| 13 |  |  |  |  |  |  |  | 
| 14 | 1 |  |  | 1 |  | 508 | use namespace::clean; | 
|  | 1 |  |  |  |  | 16339 |  | 
|  | 1 |  |  |  |  | 7 |  | 
| 15 |  |  |  |  |  |  |  | 
| 16 |  |  |  |  |  |  | use constant { | 
| 17 | 1 |  |  |  |  | 5506 | _ST_CLOSED => 0, | 
| 18 |  |  |  |  |  |  | _ST_OPENING => 1, | 
| 19 |  |  |  |  |  |  | _ST_OPEN => 2, | 
| 20 | 1 |  |  | 1 |  | 287 | }; | 
|  | 1 |  |  |  |  | 2 |  | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | sub new { | 
| 23 | 0 |  |  | 0 | 0 |  | my $class = shift; | 
| 24 |  |  |  |  |  |  |  | 
| 25 |  |  |  |  |  |  | my $self = bless { | 
| 26 |  |  |  | 0 |  |  | on_close       => sub {}, | 
| 27 | 0 |  |  |  |  |  | @_,    # id, connection, on_return, on_close, on_inactive, on_active | 
| 28 |  |  |  |  |  |  | _queue         => AnyEvent::RabbitMQ::LocalQueue->new, | 
| 29 |  |  |  |  |  |  | _content_queue => AnyEvent::RabbitMQ::LocalQueue->new, | 
| 30 |  |  |  |  |  |  | }, $class; | 
| 31 | 0 |  |  |  |  |  | weaken($self->{connection}); | 
| 32 | 0 |  |  |  |  |  | return $self->_reset; | 
| 33 |  |  |  |  |  |  | } | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | sub _reset { | 
| 36 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 37 |  |  |  |  |  |  |  | 
| 38 | 0 |  |  |  |  |  | my %a = ( | 
| 39 |  |  |  |  |  |  | _state         => _ST_CLOSED, | 
| 40 |  |  |  |  |  |  | _is_active     => 0, | 
| 41 |  |  |  |  |  |  | _is_confirm    => 0, | 
| 42 |  |  |  |  |  |  | _publish_tag   => 0, | 
| 43 |  |  |  |  |  |  | _publish_cbs   => {},  # values: [on_ack, on_nack, on_return] | 
| 44 |  |  |  |  |  |  | _consumer_cbs  => {},  # values: [on_consume, on_cancel...] | 
| 45 |  |  |  |  |  |  | ); | 
| 46 | 0 |  |  |  |  |  | @$self{keys %a} = values %a; | 
| 47 |  |  |  |  |  |  |  | 
| 48 | 0 |  |  |  |  |  | return $self; | 
| 49 |  |  |  |  |  |  | } | 
| 50 |  |  |  |  |  |  |  | 
| 51 |  |  |  |  |  |  | sub id { | 
| 52 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 53 | 0 |  |  |  |  |  | return $self->{id}; | 
| 54 |  |  |  |  |  |  | } | 
| 55 |  |  |  |  |  |  |  | 
| 56 |  |  |  |  |  |  | sub is_open { | 
| 57 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 58 | 0 |  |  |  |  |  | return $self->{_state} == _ST_OPEN; | 
| 59 |  |  |  |  |  |  | } | 
| 60 |  |  |  |  |  |  |  | 
| 61 |  |  |  |  |  |  | sub is_active { | 
| 62 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 63 | 0 |  |  |  |  |  | return $self->{_is_active}; | 
| 64 |  |  |  |  |  |  | } | 
| 65 |  |  |  |  |  |  |  | 
| 66 |  |  |  |  |  |  | sub is_confirm { | 
| 67 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 68 | 0 |  |  |  |  |  | return $self->{_is_confirm}; | 
| 69 |  |  |  |  |  |  | } | 
| 70 |  |  |  |  |  |  |  | 
| 71 |  |  |  |  |  |  | sub queue { | 
| 72 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 73 | 0 |  |  |  |  |  | return $self->{_queue}; | 
| 74 |  |  |  |  |  |  | } | 
| 75 |  |  |  |  |  |  |  | 
| 76 |  |  |  |  |  |  | sub open { | 
| 77 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 78 | 0 |  |  |  |  |  | my %args = @_; | 
| 79 |  |  |  |  |  |  |  | 
| 80 | 0 | 0 |  |  |  |  | if ($self->{_state} != _ST_CLOSED) { | 
| 81 | 0 |  |  |  |  |  | $args{on_failure}->('Channel has already been opened'); | 
| 82 | 0 |  |  |  |  |  | return $self; | 
| 83 |  |  |  |  |  |  | } | 
| 84 |  |  |  |  |  |  |  | 
| 85 | 0 |  |  |  |  |  | $self->{_state} = _ST_OPENING; | 
| 86 |  |  |  |  |  |  |  | 
| 87 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 88 |  |  |  |  |  |  | 'Channel::Open', {}, 'Channel::OpenOk', | 
| 89 |  |  |  |  |  |  | sub { | 
| 90 | 0 |  |  | 0 |  |  | $self->{_state} = _ST_OPEN; | 
| 91 | 0 |  |  |  |  |  | $self->{_is_active} = 1; | 
| 92 | 0 |  |  |  |  |  | $args{on_success}->($self); | 
| 93 |  |  |  |  |  |  | }, | 
| 94 |  |  |  |  |  |  | sub { | 
| 95 | 0 |  |  | 0 |  |  | $self->{_state} = _ST_CLOSED; | 
| 96 | 0 |  |  |  |  |  | $args{on_failure}->($self); | 
| 97 |  |  |  |  |  |  | }, | 
| 98 |  |  |  |  |  |  | $self->{id}, | 
| 99 | 0 |  |  |  |  |  | ); | 
| 100 |  |  |  |  |  |  |  | 
| 101 | 0 |  |  |  |  |  | return $self; | 
| 102 |  |  |  |  |  |  | } | 
| 103 |  |  |  |  |  |  |  | 
| 104 |  |  |  |  |  |  | sub close { | 
| 105 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 106 |  |  |  |  |  |  | my $connection = $self->{connection} | 
| 107 | 0 | 0 |  |  |  |  | or return; | 
| 108 | 0 |  |  |  |  |  | my %args = $connection->_set_cbs(@_); | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  | # If open in in progess, wait for it; 1s arbitrary timing. | 
| 111 |  |  |  |  |  |  |  | 
| 112 | 0 |  |  |  |  |  | weaken(my $wself = $self); | 
| 113 | 0 |  |  |  |  |  | my $t; $t = AE::timer 0, 1, sub { | 
| 114 | 0 | 0 |  | 0 |  |  | (my $self = $wself) or undef $t, return; | 
| 115 | 0 | 0 |  |  |  |  | return if $self->{_state} == _ST_OPENING; | 
| 116 |  |  |  |  |  |  |  | 
| 117 |  |  |  |  |  |  | # No more tests are required | 
| 118 | 0 |  |  |  |  |  | undef $t; | 
| 119 |  |  |  |  |  |  |  | 
| 120 |  |  |  |  |  |  | # Double close is OK | 
| 121 | 0 | 0 |  |  |  |  | if ($self->{_state} == _ST_CLOSED) { | 
| 122 | 0 |  |  |  |  |  | $args{on_success}->($self); | 
| 123 | 0 |  |  |  |  |  | return; | 
| 124 |  |  |  |  |  |  | } | 
| 125 |  |  |  |  |  |  |  | 
| 126 |  |  |  |  |  |  | $connection->_push_write( | 
| 127 |  |  |  |  |  |  | $self->_close_frame, | 
| 128 |  |  |  |  |  |  | $self->{id}, | 
| 129 | 0 |  |  |  |  |  | ); | 
| 130 |  |  |  |  |  |  |  | 
| 131 |  |  |  |  |  |  | # The spec says that after a party sends Channel::Close, it MUST | 
| 132 |  |  |  |  |  |  | # discard all frames for that channel.  So this channel is dead | 
| 133 |  |  |  |  |  |  | # immediately. | 
| 134 | 0 |  |  |  |  |  | $self->_closed(); | 
| 135 |  |  |  |  |  |  |  | 
| 136 |  |  |  |  |  |  | $connection->_push_read_and_valid( | 
| 137 |  |  |  |  |  |  | 'Channel::CloseOk', | 
| 138 |  |  |  |  |  |  | sub { | 
| 139 | 0 |  |  |  |  |  | $args{on_success}->($self); | 
| 140 | 0 |  |  |  |  |  | $self->_orphan(); | 
| 141 |  |  |  |  |  |  | }, | 
| 142 |  |  |  |  |  |  | sub { | 
| 143 | 0 |  |  |  |  |  | $args{on_failure}->(@_); | 
| 144 | 0 |  |  |  |  |  | $self->_orphan(); | 
| 145 |  |  |  |  |  |  | }, | 
| 146 |  |  |  |  |  |  | $self->{id}, | 
| 147 | 0 |  |  |  |  |  | ); | 
| 148 | 0 |  |  |  |  |  | }; | 
| 149 |  |  |  |  |  |  |  | 
| 150 | 0 |  |  |  |  |  | return $self; | 
| 151 |  |  |  |  |  |  | } | 
| 152 |  |  |  |  |  |  |  | 
| 153 |  |  |  |  |  |  | sub _closed { | 
| 154 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 155 | 0 |  |  |  |  |  | my ($frame,) = @_; | 
| 156 | 0 |  | 0 |  |  |  | $frame ||= $self->_close_frame(); | 
| 157 |  |  |  |  |  |  |  | 
| 158 | 0 | 0 |  |  |  |  | return if $self->{_state} == _ST_CLOSED; | 
| 159 | 0 |  |  |  |  |  | $self->{_state} = _ST_CLOSED; | 
| 160 |  |  |  |  |  |  |  | 
| 161 |  |  |  |  |  |  | # Perform callbacks for all outstanding commands | 
| 162 | 0 |  |  |  |  |  | $self->{_queue}->_flush($frame); | 
| 163 | 0 |  |  |  |  |  | $self->{_content_queue}->_flush($frame); | 
| 164 |  |  |  |  |  |  |  | 
| 165 |  |  |  |  |  |  | # Fake nacks of all outstanding publishes | 
| 166 | 0 |  |  |  |  |  | $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} }; | 
|  | 0 |  |  |  |  |  |  | 
|  | 0 |  |  |  |  |  |  | 
|  | 0 |  |  |  |  |  |  | 
| 167 |  |  |  |  |  |  |  | 
| 168 |  |  |  |  |  |  | # Report cancelation of all outstanding consumes | 
| 169 | 0 |  |  |  |  |  | my @tags = keys %{ $self->{_consumer_cbs} }; | 
|  | 0 |  |  |  |  |  |  | 
| 170 | 0 |  |  |  |  |  | $self->_canceled($_, $frame) for @tags; | 
| 171 |  |  |  |  |  |  |  | 
| 172 |  |  |  |  |  |  | # Report close to on_close callback | 
| 173 | 0 |  |  |  |  |  | { local $@; | 
|  | 0 |  |  |  |  |  |  | 
| 174 | 0 |  |  |  |  |  | eval { $self->{on_close}->($frame) }; | 
|  | 0 |  |  |  |  |  |  | 
| 175 | 0 | 0 |  |  |  |  | warn "Error in channel on_close callback, ignored:\n  $@  " if $@; } | 
| 176 |  |  |  |  |  |  |  | 
| 177 |  |  |  |  |  |  | # Reset state (partly redundant) | 
| 178 | 0 |  |  |  |  |  | $self->_reset; | 
| 179 |  |  |  |  |  |  |  | 
| 180 | 0 |  |  |  |  |  | return $self; | 
| 181 |  |  |  |  |  |  | } | 
| 182 |  |  |  |  |  |  |  | 
| 183 |  |  |  |  |  |  | sub _close_frame { | 
| 184 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 185 | 0 |  |  |  |  |  | my ($text,) = @_; | 
| 186 |  |  |  |  |  |  |  | 
| 187 | 0 |  |  |  |  |  | Net::AMQP::Frame::Method->new( | 
| 188 |  |  |  |  |  |  | method_frame => Net::AMQP::Protocol::Channel::Close->new( | 
| 189 |  |  |  |  |  |  | reply_text => $text, | 
| 190 |  |  |  |  |  |  | ), | 
| 191 |  |  |  |  |  |  | ); | 
| 192 |  |  |  |  |  |  | } | 
| 193 |  |  |  |  |  |  |  | 
| 194 |  |  |  |  |  |  | sub _orphan { | 
| 195 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 196 |  |  |  |  |  |  |  | 
| 197 | 0 | 0 |  |  |  |  | if (my $connection = $self->{connection}) { | 
| 198 | 0 |  |  |  |  |  | $connection->_delete_channel($self); | 
| 199 |  |  |  |  |  |  | } | 
| 200 | 0 |  |  |  |  |  | return $self; | 
| 201 |  |  |  |  |  |  | } | 
| 202 |  |  |  |  |  |  |  | 
| 203 |  |  |  |  |  |  | sub declare_exchange { | 
| 204 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 205 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 206 |  |  |  |  |  |  |  | 
| 207 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 208 |  |  |  |  |  |  |  | 
| 209 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 210 |  |  |  |  |  |  | 'Exchange::Declare', | 
| 211 |  |  |  |  |  |  | { | 
| 212 |  |  |  |  |  |  | type        => 'direct', | 
| 213 |  |  |  |  |  |  | passive     => 0, | 
| 214 |  |  |  |  |  |  | durable     => 0, | 
| 215 |  |  |  |  |  |  | auto_delete => 0, | 
| 216 |  |  |  |  |  |  | internal    => 0, | 
| 217 |  |  |  |  |  |  | %args, # exchange | 
| 218 |  |  |  |  |  |  | ticket      => 0, | 
| 219 |  |  |  |  |  |  | nowait      => 0, # FIXME | 
| 220 |  |  |  |  |  |  | }, | 
| 221 |  |  |  |  |  |  | 'Exchange::DeclareOk', | 
| 222 |  |  |  |  |  |  | $cb, | 
| 223 |  |  |  |  |  |  | $failure_cb, | 
| 224 |  |  |  |  |  |  | $self->{id}, | 
| 225 | 0 |  |  |  |  |  | ); | 
| 226 |  |  |  |  |  |  |  | 
| 227 | 0 |  |  |  |  |  | return $self; | 
| 228 |  |  |  |  |  |  | } | 
| 229 |  |  |  |  |  |  |  | 
| 230 |  |  |  |  |  |  | sub bind_exchange { | 
| 231 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 232 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 233 |  |  |  |  |  |  |  | 
| 234 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 235 |  |  |  |  |  |  |  | 
| 236 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 237 |  |  |  |  |  |  | 'Exchange::Bind', | 
| 238 |  |  |  |  |  |  | { | 
| 239 |  |  |  |  |  |  | %args, # source, destination, routing_key | 
| 240 |  |  |  |  |  |  | ticket      => 0, | 
| 241 |  |  |  |  |  |  | nowait      => 0, # FIXME | 
| 242 |  |  |  |  |  |  | }, | 
| 243 |  |  |  |  |  |  | 'Exchange::BindOk', | 
| 244 |  |  |  |  |  |  | $cb, | 
| 245 |  |  |  |  |  |  | $failure_cb, | 
| 246 |  |  |  |  |  |  | $self->{id}, | 
| 247 | 0 |  |  |  |  |  | ); | 
| 248 |  |  |  |  |  |  |  | 
| 249 | 0 |  |  |  |  |  | return $self; | 
| 250 |  |  |  |  |  |  | } | 
| 251 |  |  |  |  |  |  |  | 
| 252 |  |  |  |  |  |  | sub unbind_exchange { | 
| 253 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 254 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 255 |  |  |  |  |  |  |  | 
| 256 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 257 |  |  |  |  |  |  |  | 
| 258 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 259 |  |  |  |  |  |  | 'Exchange::Unbind', | 
| 260 |  |  |  |  |  |  | { | 
| 261 |  |  |  |  |  |  | %args, # source, destination, routing_key | 
| 262 |  |  |  |  |  |  | ticket      => 0, | 
| 263 |  |  |  |  |  |  | nowait      => 0, # FIXME | 
| 264 |  |  |  |  |  |  | }, | 
| 265 |  |  |  |  |  |  | 'Exchange::UnbindOk', | 
| 266 |  |  |  |  |  |  | $cb, | 
| 267 |  |  |  |  |  |  | $failure_cb, | 
| 268 |  |  |  |  |  |  | $self->{id}, | 
| 269 | 0 |  |  |  |  |  | ); | 
| 270 |  |  |  |  |  |  |  | 
| 271 | 0 |  |  |  |  |  | return $self; | 
| 272 |  |  |  |  |  |  | } | 
| 273 |  |  |  |  |  |  |  | 
| 274 |  |  |  |  |  |  | sub delete_exchange { | 
| 275 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 276 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 277 |  |  |  |  |  |  |  | 
| 278 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 279 |  |  |  |  |  |  |  | 
| 280 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 281 |  |  |  |  |  |  | 'Exchange::Delete', | 
| 282 |  |  |  |  |  |  | { | 
| 283 |  |  |  |  |  |  | if_unused => 0, | 
| 284 |  |  |  |  |  |  | %args, # exchange | 
| 285 |  |  |  |  |  |  | ticket    => 0, | 
| 286 |  |  |  |  |  |  | nowait    => 0, # FIXME | 
| 287 |  |  |  |  |  |  | }, | 
| 288 |  |  |  |  |  |  | 'Exchange::DeleteOk', | 
| 289 |  |  |  |  |  |  | $cb, | 
| 290 |  |  |  |  |  |  | $failure_cb, | 
| 291 |  |  |  |  |  |  | $self->{id}, | 
| 292 | 0 |  |  |  |  |  | ); | 
| 293 |  |  |  |  |  |  |  | 
| 294 | 0 |  |  |  |  |  | return $self; | 
| 295 |  |  |  |  |  |  | } | 
| 296 |  |  |  |  |  |  |  | 
| 297 |  |  |  |  |  |  | sub declare_queue { | 
| 298 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 299 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 300 |  |  |  |  |  |  |  | 
| 301 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 302 |  |  |  |  |  |  |  | 
| 303 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 304 |  |  |  |  |  |  | 'Queue::Declare', | 
| 305 |  |  |  |  |  |  | { | 
| 306 |  |  |  |  |  |  | queue       => '', | 
| 307 |  |  |  |  |  |  | passive     => 0, | 
| 308 |  |  |  |  |  |  | durable     => 0, | 
| 309 |  |  |  |  |  |  | exclusive   => 0, | 
| 310 |  |  |  |  |  |  | auto_delete => 0, | 
| 311 |  |  |  |  |  |  | no_ack      => 1, | 
| 312 |  |  |  |  |  |  | %args, | 
| 313 |  |  |  |  |  |  | ticket      => 0, | 
| 314 |  |  |  |  |  |  | nowait      => 0, # FIXME | 
| 315 |  |  |  |  |  |  | }, | 
| 316 |  |  |  |  |  |  | 'Queue::DeclareOk', | 
| 317 |  |  |  |  |  |  | $cb, | 
| 318 |  |  |  |  |  |  | $failure_cb, | 
| 319 |  |  |  |  |  |  | $self->{id}, | 
| 320 | 0 |  |  |  |  |  | ); | 
| 321 |  |  |  |  |  |  | } | 
| 322 |  |  |  |  |  |  |  | 
| 323 |  |  |  |  |  |  | sub bind_queue { | 
| 324 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 325 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 326 |  |  |  |  |  |  |  | 
| 327 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 328 |  |  |  |  |  |  |  | 
| 329 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 330 |  |  |  |  |  |  | 'Queue::Bind', | 
| 331 |  |  |  |  |  |  | { | 
| 332 |  |  |  |  |  |  | %args, # queue, exchange, routing_key | 
| 333 |  |  |  |  |  |  | ticket => 0, | 
| 334 |  |  |  |  |  |  | nowait => 0, # FIXME | 
| 335 |  |  |  |  |  |  | }, | 
| 336 |  |  |  |  |  |  | 'Queue::BindOk', | 
| 337 |  |  |  |  |  |  | $cb, | 
| 338 |  |  |  |  |  |  | $failure_cb, | 
| 339 |  |  |  |  |  |  | $self->{id}, | 
| 340 | 0 |  |  |  |  |  | ); | 
| 341 |  |  |  |  |  |  |  | 
| 342 | 0 |  |  |  |  |  | return $self; | 
| 343 |  |  |  |  |  |  | } | 
| 344 |  |  |  |  |  |  |  | 
| 345 |  |  |  |  |  |  | sub unbind_queue { | 
| 346 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 347 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 348 |  |  |  |  |  |  |  | 
| 349 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 350 |  |  |  |  |  |  |  | 
| 351 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 352 |  |  |  |  |  |  | 'Queue::Unbind', | 
| 353 |  |  |  |  |  |  | { | 
| 354 |  |  |  |  |  |  | %args, # queue, exchange, routing_key | 
| 355 |  |  |  |  |  |  | ticket => 0, | 
| 356 |  |  |  |  |  |  | }, | 
| 357 |  |  |  |  |  |  | 'Queue::UnbindOk', | 
| 358 |  |  |  |  |  |  | $cb, | 
| 359 |  |  |  |  |  |  | $failure_cb, | 
| 360 |  |  |  |  |  |  | $self->{id}, | 
| 361 | 0 |  |  |  |  |  | ); | 
| 362 |  |  |  |  |  |  |  | 
| 363 | 0 |  |  |  |  |  | return $self; | 
| 364 |  |  |  |  |  |  | } | 
| 365 |  |  |  |  |  |  |  | 
| 366 |  |  |  |  |  |  | sub purge_queue { | 
| 367 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 368 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 369 |  |  |  |  |  |  |  | 
| 370 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 371 |  |  |  |  |  |  |  | 
| 372 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 373 |  |  |  |  |  |  | 'Queue::Purge', | 
| 374 |  |  |  |  |  |  | { | 
| 375 |  |  |  |  |  |  | %args, # queue | 
| 376 |  |  |  |  |  |  | ticket => 0, | 
| 377 |  |  |  |  |  |  | nowait => 0, # FIXME | 
| 378 |  |  |  |  |  |  | }, | 
| 379 |  |  |  |  |  |  | 'Queue::PurgeOk', | 
| 380 |  |  |  |  |  |  | $cb, | 
| 381 |  |  |  |  |  |  | $failure_cb, | 
| 382 |  |  |  |  |  |  | $self->{id}, | 
| 383 | 0 |  |  |  |  |  | ); | 
| 384 |  |  |  |  |  |  |  | 
| 385 | 0 |  |  |  |  |  | return $self; | 
| 386 |  |  |  |  |  |  | } | 
| 387 |  |  |  |  |  |  |  | 
| 388 |  |  |  |  |  |  | sub delete_queue { | 
| 389 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 390 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 391 |  |  |  |  |  |  |  | 
| 392 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 395 |  |  |  |  |  |  | 'Queue::Delete', | 
| 396 |  |  |  |  |  |  | { | 
| 397 |  |  |  |  |  |  | if_unused => 0, | 
| 398 |  |  |  |  |  |  | if_empty  => 0, | 
| 399 |  |  |  |  |  |  | %args, # queue | 
| 400 |  |  |  |  |  |  | ticket    => 0, | 
| 401 |  |  |  |  |  |  | nowait    => 0, # FIXME | 
| 402 |  |  |  |  |  |  | }, | 
| 403 |  |  |  |  |  |  | 'Queue::DeleteOk', | 
| 404 |  |  |  |  |  |  | $cb, | 
| 405 |  |  |  |  |  |  | $failure_cb, | 
| 406 |  |  |  |  |  |  | $self->{id}, | 
| 407 | 0 |  |  |  |  |  | ); | 
| 408 |  |  |  |  |  |  |  | 
| 409 | 0 |  |  |  |  |  | return $self; | 
| 410 |  |  |  |  |  |  | } | 
| 411 |  |  |  |  |  |  |  | 
| 412 |  |  |  |  |  |  | sub publish { | 
| 413 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 414 | 0 |  |  |  |  |  | my %args = @_; | 
| 415 |  |  |  |  |  |  |  | 
| 416 |  |  |  |  |  |  | # Docs should advise channel-level callback over this, but still, better to give user an out | 
| 417 | 0 | 0 |  |  |  |  | unless ($self->{_is_active}) { | 
| 418 | 0 | 0 |  |  |  |  | if (defined $args{on_inactive}) { | 
| 419 | 0 |  |  |  |  |  | $args{on_inactive}->(); | 
| 420 | 0 |  |  |  |  |  | return $self; | 
| 421 |  |  |  |  |  |  | } | 
| 422 | 0 |  |  |  |  |  | croak "Can't publish on inactive channel (server flow control); provide on_inactive callback"; | 
| 423 |  |  |  |  |  |  | } | 
| 424 |  |  |  |  |  |  |  | 
| 425 | 0 |  |  |  |  |  | my $header_args = delete $args{header}; | 
| 426 | 0 |  |  |  |  |  | my $body        = delete $args{body}; | 
| 427 | 0 |  |  |  |  |  | my $ack_cb      = delete $args{on_ack}; | 
| 428 | 0 |  |  |  |  |  | my $nack_cb     = delete $args{on_nack}; | 
| 429 | 0 |  |  |  |  |  | my $return_cb   = delete $args{on_return}; | 
| 430 |  |  |  |  |  |  |  | 
| 431 | 0 | 0 |  |  |  |  | defined($header_args) or $header_args = {}; | 
| 432 | 0 | 0 |  |  |  |  | defined($body) or $body = ''; | 
| 433 | 0 | 0 | 0 |  |  |  | if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) { | 
|  |  |  | 0 |  |  |  |  | 
| 434 |  |  |  |  |  |  | cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode" | 
| 435 | 0 | 0 |  |  |  |  | unless $self->{_is_confirm}; | 
| 436 |  |  |  |  |  |  | } | 
| 437 |  |  |  |  |  |  |  | 
| 438 | 0 |  |  |  |  |  | my $tag; | 
| 439 | 0 | 0 |  |  |  |  | if ($self->{_is_confirm}) { | 
| 440 |  |  |  |  |  |  | # yeah, delivery tags in acks are sequential.  see Java client | 
| 441 | 0 |  |  |  |  |  | $tag = ++$self->{_publish_tag}; | 
| 442 | 0 | 0 |  |  |  |  | if ($return_cb) { | 
| 443 | 0 |  |  |  |  |  | $header_args = { %$header_args }; | 
| 444 | 0 |  |  |  |  |  | $header_args->{headers}->{_ar_return} = $tag;  # just reuse the same value, why not | 
| 445 |  |  |  |  |  |  | } | 
| 446 | 0 |  |  |  |  |  | $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb]; | 
| 447 |  |  |  |  |  |  | } | 
| 448 |  |  |  |  |  |  |  | 
| 449 |  |  |  |  |  |  | $self->_publish( | 
| 450 | 0 |  |  |  |  |  | %args, | 
| 451 |  |  |  |  |  |  | )->_header( | 
| 452 |  |  |  |  |  |  | $header_args, $body, | 
| 453 |  |  |  |  |  |  | )->_body( | 
| 454 |  |  |  |  |  |  | $body, | 
| 455 |  |  |  |  |  |  | ); | 
| 456 |  |  |  |  |  |  |  | 
| 457 | 0 |  |  |  |  |  | return $self; | 
| 458 |  |  |  |  |  |  | } | 
| 459 |  |  |  |  |  |  |  | 
| 460 |  |  |  |  |  |  | sub _publish { | 
| 461 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 462 | 0 |  |  |  |  |  | my %args = @_; | 
| 463 |  |  |  |  |  |  |  | 
| 464 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 465 |  |  |  |  |  |  | Net::AMQP::Protocol::Basic::Publish->new( | 
| 466 |  |  |  |  |  |  | exchange  => '', | 
| 467 |  |  |  |  |  |  | mandatory => 0, | 
| 468 |  |  |  |  |  |  | immediate => 0, | 
| 469 |  |  |  |  |  |  | %args, # routing_key | 
| 470 |  |  |  |  |  |  | ticket    => 0, | 
| 471 |  |  |  |  |  |  | ), | 
| 472 |  |  |  |  |  |  | $self->{id}, | 
| 473 | 0 |  |  |  |  |  | ); | 
| 474 |  |  |  |  |  |  |  | 
| 475 | 0 |  |  |  |  |  | return $self; | 
| 476 |  |  |  |  |  |  | } | 
| 477 |  |  |  |  |  |  |  | 
| 478 |  |  |  |  |  |  | sub _header { | 
| 479 | 0 |  |  | 0 |  |  | my ($self, $args, $body) = @_; | 
| 480 |  |  |  |  |  |  |  | 
| 481 | 0 |  | 0 |  |  |  | my $weight = delete $args->{weight} || 0; | 
| 482 |  |  |  |  |  |  |  | 
| 483 |  |  |  |  |  |  | # user-provided message headers must be strings.  protect values that look like numbers. | 
| 484 | 0 |  | 0 |  |  |  | my $headers = $args->{headers} || {}; | 
| 485 | 0 | 0 |  |  |  |  | my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers; | 
|  | 0 |  |  |  |  |  |  | 
|  | 0 |  |  |  |  |  |  | 
| 486 | 0 | 0 |  |  |  |  | if (@prot) { | 
| 487 |  |  |  |  |  |  | $headers = { | 
| 488 |  |  |  |  |  |  | %$headers, | 
| 489 | 0 |  |  |  |  |  | map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot | 
|  | 0 |  |  |  |  |  |  | 
| 490 |  |  |  |  |  |  | }; | 
| 491 |  |  |  |  |  |  | } | 
| 492 |  |  |  |  |  |  |  | 
| 493 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 494 |  |  |  |  |  |  | Net::AMQP::Frame::Header->new( | 
| 495 |  |  |  |  |  |  | weight       => $weight, | 
| 496 |  |  |  |  |  |  | body_size    => length($body), | 
| 497 |  |  |  |  |  |  | header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( | 
| 498 |  |  |  |  |  |  | content_type     => 'application/octet-stream', | 
| 499 |  |  |  |  |  |  | content_encoding => undef, | 
| 500 |  |  |  |  |  |  | delivery_mode    => 1, | 
| 501 |  |  |  |  |  |  | priority         => 1, | 
| 502 |  |  |  |  |  |  | correlation_id   => undef, | 
| 503 |  |  |  |  |  |  | expiration       => undef, | 
| 504 |  |  |  |  |  |  | message_id       => undef, | 
| 505 |  |  |  |  |  |  | timestamp        => time, | 
| 506 |  |  |  |  |  |  | type             => undef, | 
| 507 |  |  |  |  |  |  | user_id          => $self->{connection}->login_user, | 
| 508 |  |  |  |  |  |  | app_id           => undef, | 
| 509 |  |  |  |  |  |  | cluster_id       => undef, | 
| 510 |  |  |  |  |  |  | %$args, | 
| 511 |  |  |  |  |  |  | headers          => $headers, | 
| 512 |  |  |  |  |  |  | ), | 
| 513 |  |  |  |  |  |  | ), | 
| 514 |  |  |  |  |  |  | $self->{id}, | 
| 515 | 0 |  |  |  |  |  | ); | 
| 516 |  |  |  |  |  |  |  | 
| 517 | 0 |  |  |  |  |  | return $self; | 
| 518 |  |  |  |  |  |  | } | 
| 519 |  |  |  |  |  |  |  | 
| 520 |  |  |  |  |  |  | sub _body { | 
| 521 | 0 |  |  | 0 |  |  | my ($self, $body,) = @_; | 
| 522 |  |  |  |  |  |  |  | 
| 523 | 0 |  | 0 |  |  |  | my $body_max = $self->{connection}->{_body_max} || length $body; | 
| 524 |  |  |  |  |  |  |  | 
| 525 |  |  |  |  |  |  | # chunk up body into segments measured by $frame_max | 
| 526 | 0 |  |  |  |  |  | while (length $body) { | 
| 527 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 528 |  |  |  |  |  |  | Net::AMQP::Frame::Body->new( | 
| 529 |  |  |  |  |  |  | payload => substr($body, 0, $body_max, '')), | 
| 530 |  |  |  |  |  |  | $self->{id} | 
| 531 | 0 |  |  |  |  |  | ); | 
| 532 |  |  |  |  |  |  | } | 
| 533 |  |  |  |  |  |  |  | 
| 534 | 0 |  |  |  |  |  | return $self; | 
| 535 |  |  |  |  |  |  | } | 
| 536 |  |  |  |  |  |  |  | 
| 537 |  |  |  |  |  |  | sub consume { | 
| 538 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 539 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 540 |  |  |  |  |  |  |  | 
| 541 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 542 |  |  |  |  |  |  |  | 
| 543 | 0 |  | 0 | 0 |  |  | my $consumer_cb = delete $args{on_consume}  || sub {}; | 
| 544 | 0 |  | 0 | 0 |  |  | my $cancel_cb   = delete $args{on_cancel}   || sub {}; | 
| 545 | 0 |  | 0 |  |  |  | my $no_ack      = delete $args{no_ack}      // 1; | 
| 546 |  |  |  |  |  |  |  | 
| 547 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 548 |  |  |  |  |  |  | 'Basic::Consume', | 
| 549 |  |  |  |  |  |  | { | 
| 550 |  |  |  |  |  |  | consumer_tag => '', | 
| 551 |  |  |  |  |  |  | no_local     => 0, | 
| 552 |  |  |  |  |  |  | no_ack       => $no_ack, | 
| 553 |  |  |  |  |  |  | exclusive    => 0, | 
| 554 |  |  |  |  |  |  |  | 
| 555 |  |  |  |  |  |  | %args, # queue | 
| 556 |  |  |  |  |  |  | ticket       => 0, | 
| 557 |  |  |  |  |  |  | nowait       => 0, # FIXME | 
| 558 |  |  |  |  |  |  | }, | 
| 559 |  |  |  |  |  |  | 'Basic::ConsumeOk', | 
| 560 |  |  |  |  |  |  | sub { | 
| 561 | 0 |  |  | 0 |  |  | my $frame = shift; | 
| 562 | 0 |  |  |  |  |  | my $tag = $frame->method_frame->consumer_tag; | 
| 563 | 0 |  |  |  |  |  | $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ]; | 
| 564 | 0 |  |  |  |  |  | $cb->($frame); | 
| 565 |  |  |  |  |  |  | }, | 
| 566 |  |  |  |  |  |  | $failure_cb, | 
| 567 |  |  |  |  |  |  | $self->{id}, | 
| 568 | 0 |  |  |  |  |  | ); | 
| 569 |  |  |  |  |  |  |  | 
| 570 | 0 |  |  |  |  |  | return $self; | 
| 571 |  |  |  |  |  |  | } | 
| 572 |  |  |  |  |  |  |  | 
| 573 |  |  |  |  |  |  | sub cancel { | 
| 574 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 575 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 576 |  |  |  |  |  |  |  | 
| 577 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 578 |  |  |  |  |  |  |  | 
| 579 | 0 | 0 |  |  |  |  | if (!defined $args{consumer_tag}) { | 
| 580 | 0 |  |  |  |  |  | $failure_cb->('consumer_tag is not set'); | 
| 581 | 0 |  |  |  |  |  | return $self; | 
| 582 |  |  |  |  |  |  | } | 
| 583 |  |  |  |  |  |  |  | 
| 584 | 0 |  |  |  |  |  | my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}}; | 
| 585 | 0 | 0 |  |  |  |  | unless ($cons_cbs) { | 
| 586 | 0 |  |  |  |  |  | $failure_cb->('Unknown consumer_tag'); | 
| 587 | 0 |  |  |  |  |  | return $self; | 
| 588 |  |  |  |  |  |  | } | 
| 589 | 0 |  |  |  |  |  | push @$cons_cbs, $cb; | 
| 590 |  |  |  |  |  |  |  | 
| 591 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 592 |  |  |  |  |  |  | Net::AMQP::Protocol::Basic::Cancel->new( | 
| 593 |  |  |  |  |  |  | %args, # consumer_tag | 
| 594 |  |  |  |  |  |  | nowait => 0, | 
| 595 |  |  |  |  |  |  | ), | 
| 596 |  |  |  |  |  |  | $self->{id}, | 
| 597 | 0 |  |  |  |  |  | ); | 
| 598 |  |  |  |  |  |  |  | 
| 599 | 0 |  |  |  |  |  | return $self; | 
| 600 |  |  |  |  |  |  | } | 
| 601 |  |  |  |  |  |  |  | 
| 602 |  |  |  |  |  |  | sub _canceled { | 
| 603 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 604 | 0 |  |  |  |  |  | my ($tag, $frame,) = @_; | 
| 605 |  |  |  |  |  |  |  | 
| 606 | 0 | 0 |  |  |  |  | my $cons_cbs = delete $self->{_consumer_cbs}->{$tag} | 
| 607 |  |  |  |  |  |  | or return 0; | 
| 608 |  |  |  |  |  |  |  | 
| 609 | 0 |  |  |  |  |  | shift @$cons_cbs; # no more deliveries | 
| 610 | 0 |  |  |  |  |  | for my $cb (reverse @$cons_cbs) { | 
| 611 | 0 |  |  |  |  |  | $cb->($frame); | 
| 612 |  |  |  |  |  |  | } | 
| 613 | 0 |  |  |  |  |  | return 1; | 
| 614 |  |  |  |  |  |  | } | 
| 615 |  |  |  |  |  |  |  | 
| 616 |  |  |  |  |  |  | sub get { | 
| 617 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 618 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 619 |  |  |  |  |  |  |  | 
| 620 | 0 |  | 0 |  |  |  | my $no_ack = delete $args{no_ack} // 1; | 
| 621 |  |  |  |  |  |  |  | 
| 622 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 623 |  |  |  |  |  |  |  | 
| 624 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 625 |  |  |  |  |  |  | 'Basic::Get', | 
| 626 |  |  |  |  |  |  | { | 
| 627 |  |  |  |  |  |  | no_ack => $no_ack, | 
| 628 |  |  |  |  |  |  | %args, # queue | 
| 629 |  |  |  |  |  |  | ticket => 0, | 
| 630 |  |  |  |  |  |  | }, | 
| 631 |  |  |  |  |  |  | [qw(Basic::GetOk Basic::GetEmpty)], | 
| 632 |  |  |  |  |  |  | sub { | 
| 633 | 0 |  |  | 0 |  |  | my $frame = shift; | 
| 634 | 0 | 0 |  |  |  |  | return $cb->({empty => $frame}) | 
| 635 |  |  |  |  |  |  | if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty'); | 
| 636 | 0 |  |  |  |  |  | $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb); | 
| 637 |  |  |  |  |  |  | }, | 
| 638 |  |  |  |  |  |  | $failure_cb, | 
| 639 |  |  |  |  |  |  | $self->{id}, | 
| 640 | 0 |  |  |  |  |  | ); | 
| 641 |  |  |  |  |  |  |  | 
| 642 | 0 |  |  |  |  |  | return $self; | 
| 643 |  |  |  |  |  |  | } | 
| 644 |  |  |  |  |  |  |  | 
| 645 |  |  |  |  |  |  | sub ack { | 
| 646 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 647 | 0 |  |  |  |  |  | my %args = @_; | 
| 648 |  |  |  |  |  |  |  | 
| 649 | 0 | 0 |  | 0 |  |  | return $self if !$self->_check_open(sub {}); | 
| 650 |  |  |  |  |  |  |  | 
| 651 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 652 |  |  |  |  |  |  | Net::AMQP::Protocol::Basic::Ack->new( | 
| 653 |  |  |  |  |  |  | delivery_tag => 0, | 
| 654 |  |  |  |  |  |  | multiple     => ( | 
| 655 |  |  |  |  |  |  | defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1 | 
| 656 |  |  |  |  |  |  | ), | 
| 657 |  |  |  |  |  |  | %args, | 
| 658 |  |  |  |  |  |  | ), | 
| 659 |  |  |  |  |  |  | $self->{id}, | 
| 660 | 0 | 0 | 0 |  |  |  | ); | 
| 661 |  |  |  |  |  |  |  | 
| 662 | 0 |  |  |  |  |  | return $self; | 
| 663 |  |  |  |  |  |  | } | 
| 664 |  |  |  |  |  |  |  | 
| 665 |  |  |  |  |  |  | sub qos { | 
| 666 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 667 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 668 |  |  |  |  |  |  |  | 
| 669 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 670 |  |  |  |  |  |  |  | 
| 671 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 672 |  |  |  |  |  |  | 'Basic::Qos', | 
| 673 |  |  |  |  |  |  | { | 
| 674 |  |  |  |  |  |  | prefetch_count => 1, | 
| 675 |  |  |  |  |  |  | prefetch_size  => 0, | 
| 676 |  |  |  |  |  |  | global         => 0, | 
| 677 |  |  |  |  |  |  | %args, | 
| 678 |  |  |  |  |  |  | }, | 
| 679 |  |  |  |  |  |  | 'Basic::QosOk', | 
| 680 |  |  |  |  |  |  | $cb, | 
| 681 |  |  |  |  |  |  | $failure_cb, | 
| 682 |  |  |  |  |  |  | $self->{id}, | 
| 683 | 0 |  |  |  |  |  | ); | 
| 684 |  |  |  |  |  |  |  | 
| 685 | 0 |  |  |  |  |  | return $self; | 
| 686 |  |  |  |  |  |  | } | 
| 687 |  |  |  |  |  |  |  | 
| 688 |  |  |  |  |  |  | sub confirm { | 
| 689 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 690 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 691 |  |  |  |  |  |  |  | 
| 692 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 693 | 0 | 0 |  |  |  |  | return $self if !$self->_check_version(0, 9, $failure_cb); | 
| 694 |  |  |  |  |  |  |  | 
| 695 | 0 |  |  |  |  |  | weaken(my $wself = $self); | 
| 696 |  |  |  |  |  |  |  | 
| 697 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 698 |  |  |  |  |  |  | 'Confirm::Select', | 
| 699 |  |  |  |  |  |  | { | 
| 700 |  |  |  |  |  |  | %args, | 
| 701 |  |  |  |  |  |  | nowait       => 0, # FIXME | 
| 702 |  |  |  |  |  |  | }, | 
| 703 |  |  |  |  |  |  | 'Confirm::SelectOk', | 
| 704 |  |  |  |  |  |  | sub { | 
| 705 | 0 | 0 |  | 0 |  |  | my $me = $wself or return; | 
| 706 | 0 |  |  |  |  |  | $me->{_is_confirm} = 1; | 
| 707 | 0 |  |  |  |  |  | $cb->(); | 
| 708 |  |  |  |  |  |  | }, | 
| 709 |  |  |  |  |  |  | $failure_cb, | 
| 710 |  |  |  |  |  |  | $self->{id}, | 
| 711 | 0 |  |  |  |  |  | ); | 
| 712 |  |  |  |  |  |  |  | 
| 713 | 0 |  |  |  |  |  | return $self; | 
| 714 |  |  |  |  |  |  | } | 
| 715 |  |  |  |  |  |  |  | 
| 716 |  |  |  |  |  |  | sub recover { | 
| 717 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 718 | 0 |  |  |  |  |  | my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); | 
| 719 |  |  |  |  |  |  |  | 
| 720 | 0 | 0 |  | 0 |  |  | return $self if !$self->_check_open(sub {}); | 
| 721 |  |  |  |  |  |  |  | 
| 722 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 723 |  |  |  |  |  |  | Net::AMQP::Protocol::Basic::Recover->new( | 
| 724 |  |  |  |  |  |  | requeue => 1, | 
| 725 |  |  |  |  |  |  | %args, | 
| 726 |  |  |  |  |  |  | ), | 
| 727 |  |  |  |  |  |  | $self->{id}, | 
| 728 | 0 |  |  |  |  |  | ); | 
| 729 |  |  |  |  |  |  |  | 
| 730 | 0 | 0 | 0 |  |  |  | if (!$args{nowait} && $self->_check_version(0, 9)) { | 
| 731 |  |  |  |  |  |  | $self->{connection}->_push_read_and_valid( | 
| 732 |  |  |  |  |  |  | 'Basic::RecoverOk', | 
| 733 |  |  |  |  |  |  | $cb, | 
| 734 |  |  |  |  |  |  | $failure_cb, | 
| 735 |  |  |  |  |  |  | $self->{id}, | 
| 736 | 0 |  |  |  |  |  | ); | 
| 737 |  |  |  |  |  |  | } | 
| 738 |  |  |  |  |  |  | else { | 
| 739 | 0 |  |  |  |  |  | $cb->(); | 
| 740 |  |  |  |  |  |  | } | 
| 741 |  |  |  |  |  |  |  | 
| 742 | 0 |  |  |  |  |  | return $self; | 
| 743 |  |  |  |  |  |  | } | 
| 744 |  |  |  |  |  |  |  | 
| 745 |  |  |  |  |  |  | sub reject { | 
| 746 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 747 | 0 |  |  |  |  |  | my %args = @_; | 
| 748 |  |  |  |  |  |  |  | 
| 749 | 0 | 0 |  | 0 |  |  | return $self if !$self->_check_open( sub { } ); | 
| 750 |  |  |  |  |  |  |  | 
| 751 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 752 |  |  |  |  |  |  | Net::AMQP::Protocol::Basic::Reject->new( | 
| 753 |  |  |  |  |  |  | delivery_tag => 0, | 
| 754 |  |  |  |  |  |  | requeue      => 0, | 
| 755 |  |  |  |  |  |  | %args, | 
| 756 |  |  |  |  |  |  | ), | 
| 757 |  |  |  |  |  |  | $self->{id}, | 
| 758 | 0 |  |  |  |  |  | ); | 
| 759 |  |  |  |  |  |  |  | 
| 760 | 0 |  |  |  |  |  | return $self; | 
| 761 |  |  |  |  |  |  | } | 
| 762 |  |  |  |  |  |  |  | 
| 763 |  |  |  |  |  |  | sub select_tx { | 
| 764 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 765 | 0 |  |  |  |  |  | my ($cb, $failure_cb,) = $self->_delete_cbs(@_); | 
| 766 |  |  |  |  |  |  |  | 
| 767 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 768 |  |  |  |  |  |  |  | 
| 769 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 770 |  |  |  |  |  |  | 'Tx::Select', {}, 'Tx::SelectOk', | 
| 771 |  |  |  |  |  |  | $cb, | 
| 772 |  |  |  |  |  |  | $failure_cb, | 
| 773 |  |  |  |  |  |  | $self->{id}, | 
| 774 | 0 |  |  |  |  |  | ); | 
| 775 |  |  |  |  |  |  |  | 
| 776 | 0 |  |  |  |  |  | return $self; | 
| 777 |  |  |  |  |  |  | } | 
| 778 |  |  |  |  |  |  |  | 
| 779 |  |  |  |  |  |  | sub commit_tx { | 
| 780 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 781 | 0 |  |  |  |  |  | my ($cb, $failure_cb,) = $self->_delete_cbs(@_); | 
| 782 |  |  |  |  |  |  |  | 
| 783 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 784 |  |  |  |  |  |  |  | 
| 785 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 786 |  |  |  |  |  |  | 'Tx::Commit', {}, 'Tx::CommitOk', | 
| 787 |  |  |  |  |  |  | $cb, | 
| 788 |  |  |  |  |  |  | $failure_cb, | 
| 789 |  |  |  |  |  |  | $self->{id}, | 
| 790 | 0 |  |  |  |  |  | ); | 
| 791 |  |  |  |  |  |  |  | 
| 792 | 0 |  |  |  |  |  | return $self; | 
| 793 |  |  |  |  |  |  | } | 
| 794 |  |  |  |  |  |  |  | 
| 795 |  |  |  |  |  |  | sub rollback_tx { | 
| 796 | 0 |  |  | 0 | 1 |  | my $self = shift; | 
| 797 | 0 |  |  |  |  |  | my ($cb, $failure_cb,) = $self->_delete_cbs(@_); | 
| 798 |  |  |  |  |  |  |  | 
| 799 | 0 | 0 |  |  |  |  | return $self if !$self->_check_open($failure_cb); | 
| 800 |  |  |  |  |  |  |  | 
| 801 |  |  |  |  |  |  | $self->{connection}->_push_write_and_read( | 
| 802 |  |  |  |  |  |  | 'Tx::Rollback', {}, 'Tx::RollbackOk', | 
| 803 |  |  |  |  |  |  | $cb, | 
| 804 |  |  |  |  |  |  | $failure_cb, | 
| 805 |  |  |  |  |  |  | $self->{id}, | 
| 806 | 0 |  |  |  |  |  | ); | 
| 807 |  |  |  |  |  |  |  | 
| 808 | 0 |  |  |  |  |  | return $self; | 
| 809 |  |  |  |  |  |  | } | 
| 810 |  |  |  |  |  |  |  | 
| 811 |  |  |  |  |  |  | sub push_queue_or_consume { | 
| 812 | 0 |  |  | 0 | 0 |  | my $self = shift; | 
| 813 | 0 |  |  |  |  |  | my ($frame, $failure_cb,) = @_; | 
| 814 |  |  |  |  |  |  |  | 
| 815 |  |  |  |  |  |  | # Note: the spec says that after a party sends Channel::Close, it MUST | 
| 816 |  |  |  |  |  |  | # discard all frames for that channel other than Close and CloseOk. | 
| 817 |  |  |  |  |  |  |  | 
| 818 | 0 | 0 |  |  |  |  | if ($frame->isa('Net::AMQP::Frame::Method')) { | 
| 819 | 0 |  |  |  |  |  | my $method_frame = $frame->method_frame; | 
| 820 | 0 | 0 | 0 |  |  |  | if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { | 
|  |  | 0 | 0 |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
| 821 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 822 |  |  |  |  |  |  | Net::AMQP::Protocol::Channel::CloseOk->new(), | 
| 823 |  |  |  |  |  |  | $self->{id}, | 
| 824 | 0 |  |  |  |  |  | ); | 
| 825 | 0 |  |  |  |  |  | $self->_closed($frame); | 
| 826 | 0 |  |  |  |  |  | $self->_orphan(); | 
| 827 | 0 |  |  |  |  |  | return $self; | 
| 828 |  |  |  |  |  |  | } elsif ($self->{_state} != _ST_OPEN) { | 
| 829 | 0 | 0 | 0 |  |  |  | if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') || | 
| 830 |  |  |  |  |  |  | $method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) { | 
| 831 | 0 |  |  |  |  |  | $self->{_queue}->push($frame); | 
| 832 |  |  |  |  |  |  | } | 
| 833 | 0 |  |  |  |  |  | return $self; | 
| 834 |  |  |  |  |  |  | } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { | 
| 835 | 0 |  |  |  |  |  | my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag}; | 
| 836 | 0 |  | 0 | 0 |  |  | my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {}; | 
| 837 | 0 |  |  |  |  |  | $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb); | 
| 838 | 0 |  |  |  |  |  | return $self; | 
| 839 |  |  |  |  |  |  | } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') || | 
| 840 |  |  |  |  |  |  | $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) { | 
| 841 |  |  |  |  |  |  | # CancelOk means we asked for a cancel. | 
| 842 |  |  |  |  |  |  | # Cancel means queue was deleted; it is not AMQP, but RMQ supports it. | 
| 843 | 0 | 0 | 0 |  |  |  | if (!$self->_canceled($method_frame->consumer_tag, $frame) | 
| 844 |  |  |  |  |  |  | && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) { | 
| 845 | 0 |  |  |  |  |  | $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag); | 
| 846 |  |  |  |  |  |  | } | 
| 847 | 0 |  |  |  |  |  | return $self; | 
| 848 |  |  |  |  |  |  | } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { | 
| 849 | 0 |  |  |  |  |  | weaken(my $wself = $self); | 
| 850 |  |  |  |  |  |  | my $cb = sub { | 
| 851 | 0 |  |  | 0 |  |  | my $ret = shift; | 
| 852 | 0 | 0 |  |  |  |  | my $me = $wself or return; | 
| 853 | 0 |  | 0 |  |  |  | my $headers = $ret->{header}->headers || {}; | 
| 854 | 0 |  |  |  |  |  | my $onret_cb; | 
| 855 | 0 | 0 |  |  |  |  | if (defined(my $tag = $headers->{_ar_return})) { | 
| 856 | 0 |  |  |  |  |  | my $cbs = $me->{_publish_cbs}->{$tag}; | 
| 857 | 0 | 0 |  |  |  |  | $onret_cb = $cbs->[2] if $cbs; | 
| 858 |  |  |  |  |  |  | } | 
| 859 | 0 |  | 0 |  |  |  | $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well | 
|  |  |  | 0 |  |  |  |  | 
| 860 | 0 |  |  |  |  |  | $onret_cb->($frame); | 
| 861 | 0 |  |  |  |  |  | }; | 
| 862 | 0 |  |  |  |  |  | $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb); | 
| 863 | 0 |  |  |  |  |  | return $self; | 
| 864 |  |  |  |  |  |  | } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') || | 
| 865 |  |  |  |  |  |  | $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) { | 
| 866 | 0 |  |  |  |  |  | (my $resp = ref($method_frame)) =~ s/.*:://; | 
| 867 | 0 |  |  |  |  |  | my $cbs; | 
| 868 | 0 | 0 |  |  |  |  | if (!$self->{_is_confirm}) { | 
| 869 | 0 |  |  |  |  |  | $failure_cb->("Received $resp when not in confirm mode"); | 
| 870 |  |  |  |  |  |  | } | 
| 871 |  |  |  |  |  |  | else { | 
| 872 | 0 |  |  |  |  |  | my @tags; | 
| 873 | 0 | 0 |  |  |  |  | if ($method_frame->{multiple}) { | 
| 874 | 0 |  |  |  |  |  | @tags = sort { $a <=> $b } | 
| 875 | 0 |  |  |  |  |  | grep { $_ <= $method_frame->{delivery_tag} } | 
| 876 | 0 |  |  |  |  |  | keys %{$self->{_publish_cbs}}; | 
|  | 0 |  |  |  |  |  |  | 
| 877 |  |  |  |  |  |  | } | 
| 878 |  |  |  |  |  |  | else { | 
| 879 | 0 |  |  |  |  |  | @tags = ($method_frame->{delivery_tag}); | 
| 880 |  |  |  |  |  |  | } | 
| 881 | 0 | 0 |  |  |  |  | my $cbi = ($resp eq 'Ack') ? 0 : 1; | 
| 882 | 0 |  |  |  |  |  | for my $tag (@tags) { | 
| 883 | 0 |  |  |  |  |  | my $cbs; | 
| 884 | 0 | 0 |  |  |  |  | if (not $cbs = delete $self->{_publish_cbs}->{$tag}) { | 
|  |  | 0 |  |  |  |  |  | 
| 885 | 0 |  |  |  |  |  | $failure_cb->("Received $resp of unknown delivery tag $tag"); | 
| 886 |  |  |  |  |  |  | } | 
| 887 |  |  |  |  |  |  | elsif ($cbs->[$cbi]) { | 
| 888 | 0 |  |  |  |  |  | $cbs->[$cbi]->($frame); | 
| 889 |  |  |  |  |  |  | } | 
| 890 |  |  |  |  |  |  | } | 
| 891 |  |  |  |  |  |  | } | 
| 892 | 0 |  |  |  |  |  | return $self; | 
| 893 |  |  |  |  |  |  | } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { | 
| 894 | 0 |  |  |  |  |  | $self->{_is_active} = $method_frame->active; | 
| 895 |  |  |  |  |  |  | $self->{connection}->_push_write( | 
| 896 |  |  |  |  |  |  | Net::AMQP::Protocol::Channel::FlowOk->new( | 
| 897 |  |  |  |  |  |  | active => $method_frame->active, | 
| 898 |  |  |  |  |  |  | ), | 
| 899 |  |  |  |  |  |  | $self->{id}, | 
| 900 | 0 |  |  |  |  |  | ); | 
| 901 | 0 | 0 |  |  |  |  | my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive'; | 
| 902 | 0 |  | 0 | 0 |  |  | my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {}; | 
| 903 | 0 |  |  |  |  |  | $cb->($frame); | 
| 904 | 0 |  |  |  |  |  | return $self; | 
| 905 |  |  |  |  |  |  | } | 
| 906 | 0 |  |  |  |  |  | $self->{_queue}->push($frame); | 
| 907 |  |  |  |  |  |  | } else { | 
| 908 | 0 |  |  |  |  |  | $self->{_content_queue}->push($frame); | 
| 909 |  |  |  |  |  |  | } | 
| 910 |  |  |  |  |  |  |  | 
| 911 | 0 |  |  |  |  |  | return $self; | 
| 912 |  |  |  |  |  |  | } | 
| 913 |  |  |  |  |  |  |  | 
| 914 |  |  |  |  |  |  | sub _push_read_header_and_body { | 
| 915 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 916 | 0 |  |  |  |  |  | my ($type, $frame, $cb, $failure_cb,) = @_; | 
| 917 | 0 |  |  |  |  |  | my $response = {$type => $frame}; | 
| 918 | 0 |  |  |  |  |  | my $body_size = 0; | 
| 919 | 0 |  |  |  |  |  | my $body_payload = ""; | 
| 920 |  |  |  |  |  |  |  | 
| 921 | 0 |  |  |  |  |  | weaken(my $wcontq = $self->{_content_queue}); | 
| 922 | 0 |  |  |  |  |  | my $w_body_frame; | 
| 923 |  |  |  |  |  |  | my $body_frame = sub { | 
| 924 | 0 |  |  | 0 |  |  | my $frame = shift; | 
| 925 |  |  |  |  |  |  |  | 
| 926 | 0 | 0 |  |  |  |  | return $failure_cb->('Received data is not body frame') | 
| 927 |  |  |  |  |  |  | if !$frame->isa('Net::AMQP::Frame::Body'); | 
| 928 |  |  |  |  |  |  |  | 
| 929 | 0 |  |  |  |  |  | $body_payload .= $frame->payload; | 
| 930 |  |  |  |  |  |  |  | 
| 931 | 0 | 0 |  |  |  |  | if (length($body_payload) < $body_size) { | 
| 932 |  |  |  |  |  |  | # More to come | 
| 933 | 0 | 0 |  |  |  |  | my $contq = $wcontq or return; | 
| 934 | 0 |  |  |  |  |  | $contq->get($w_body_frame); | 
| 935 |  |  |  |  |  |  | } | 
| 936 |  |  |  |  |  |  | else { | 
| 937 | 0 |  |  |  |  |  | $frame->payload($body_payload); | 
| 938 | 0 |  |  |  |  |  | $response->{body} = $frame; | 
| 939 | 0 |  |  |  |  |  | $cb->($response); | 
| 940 |  |  |  |  |  |  | } | 
| 941 | 0 |  |  |  |  |  | }; | 
| 942 | 0 |  |  |  |  |  | $w_body_frame = $body_frame; | 
| 943 | 0 |  |  |  |  |  | weaken($w_body_frame); | 
| 944 |  |  |  |  |  |  |  | 
| 945 |  |  |  |  |  |  | $self->{_content_queue}->get(sub{ | 
| 946 | 0 |  |  | 0 |  |  | my $frame = shift; | 
| 947 |  |  |  |  |  |  |  | 
| 948 | 0 | 0 |  |  |  |  | return $failure_cb->('Received data is not header frame') | 
| 949 |  |  |  |  |  |  | if !$frame->isa('Net::AMQP::Frame::Header'); | 
| 950 |  |  |  |  |  |  |  | 
| 951 | 0 |  |  |  |  |  | my $header_frame = $frame->header_frame; | 
| 952 | 0 | 0 |  |  |  |  | return $failure_cb->( | 
| 953 |  |  |  |  |  |  | 'Header is not Protocol::Basic::ContentHeader' | 
| 954 |  |  |  |  |  |  | . 'Header was ' . ref $header_frame | 
| 955 |  |  |  |  |  |  | ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); | 
| 956 |  |  |  |  |  |  |  | 
| 957 | 0 |  |  |  |  |  | $response->{header} = $header_frame; | 
| 958 |  |  |  |  |  |  |  | 
| 959 | 0 |  |  |  |  |  | $body_size = $frame->body_size; | 
| 960 | 0 | 0 |  |  |  |  | if ( $body_size ) { | 
| 961 | 0 | 0 |  |  |  |  | my $contq = $wcontq or return; | 
| 962 | 0 |  |  |  |  |  | $contq->get($body_frame); | 
| 963 |  |  |  |  |  |  | } else { | 
| 964 | 0 |  |  |  |  |  | $response->{body} = undef; | 
| 965 | 0 |  |  |  |  |  | $cb->($response); | 
| 966 |  |  |  |  |  |  | } | 
| 967 | 0 |  |  |  |  |  | }); | 
| 968 |  |  |  |  |  |  |  | 
| 969 | 0 |  |  |  |  |  | return $self; | 
| 970 |  |  |  |  |  |  | } | 
| 971 |  |  |  |  |  |  |  | 
| 972 |  |  |  |  |  |  | sub _delete_cbs { | 
| 973 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 974 | 0 |  |  |  |  |  | my %args = @_; | 
| 975 |  |  |  |  |  |  |  | 
| 976 | 0 |  | 0 | 0 |  |  | my $cb         = delete $args{on_success} || sub {}; | 
| 977 | 0 |  | 0 | 0 |  |  | my $failure_cb = delete $args{on_failure} || sub {die @_}; | 
|  | 0 |  |  |  |  |  |  | 
| 978 |  |  |  |  |  |  |  | 
| 979 | 0 |  |  |  |  |  | return $cb, $failure_cb, %args; | 
| 980 |  |  |  |  |  |  | } | 
| 981 |  |  |  |  |  |  |  | 
| 982 |  |  |  |  |  |  | sub _check_open { | 
| 983 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 984 | 0 |  |  |  |  |  | my ($failure_cb) = @_; | 
| 985 |  |  |  |  |  |  |  | 
| 986 | 0 | 0 |  |  |  |  | return 1 if $self->is_open(); | 
| 987 |  |  |  |  |  |  |  | 
| 988 | 0 |  |  |  |  |  | $failure_cb->('Channel has already been closed'); | 
| 989 | 0 |  |  |  |  |  | return 0; | 
| 990 |  |  |  |  |  |  | } | 
| 991 |  |  |  |  |  |  |  | 
| 992 |  |  |  |  |  |  | sub _check_version { | 
| 993 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 994 | 0 |  |  |  |  |  | my ($major, $minor, $failure_cb) = @_; | 
| 995 |  |  |  |  |  |  |  | 
| 996 | 0 |  |  |  |  |  | my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR; | 
| 997 | 0 |  |  |  |  |  | my $amin = $Net::AMQP::Protocol::VERSION_MINOR; | 
| 998 |  |  |  |  |  |  |  | 
| 999 | 0 | 0 | 0 |  |  |  | return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor; | 
|  |  |  | 0 |  |  |  |  | 
| 1000 |  |  |  |  |  |  |  | 
| 1001 | 0 | 0 |  |  |  |  | $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb; | 
| 1002 | 0 |  |  |  |  |  | return 0; | 
| 1003 |  |  |  |  |  |  | } | 
| 1004 |  |  |  |  |  |  |  | 
| 1005 |  |  |  |  |  |  | sub DESTROY { | 
| 1006 | 0 |  |  | 0 |  |  | my $self = shift; | 
| 1007 | 0 | 0 | 0 |  |  |  | $self->close() if !in_global_destruction && $self->is_open(); | 
| 1008 | 0 |  |  |  |  |  | return; | 
| 1009 |  |  |  |  |  |  | } | 
| 1010 |  |  |  |  |  |  |  | 
| 1011 |  |  |  |  |  |  | 1; | 
| 1012 |  |  |  |  |  |  | __END__ |