line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::RabbitMQ::Channel; |
2
|
|
|
|
|
|
|
|
3
|
1
|
|
|
1
|
|
8
|
use strict; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
33
|
|
4
|
1
|
|
|
1
|
|
5
|
use warnings; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
25
|
|
5
|
|
|
|
|
|
|
|
6
|
1
|
|
|
1
|
|
460
|
use AnyEvent::RabbitMQ::LocalQueue; |
|
1
|
|
|
|
|
3
|
|
|
1
|
|
|
|
|
33
|
|
7
|
1
|
|
|
1
|
|
7
|
use AnyEvent; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
28
|
|
8
|
1
|
|
|
1
|
|
6
|
use Scalar::Util qw( looks_like_number weaken ); |
|
1
|
|
|
|
|
6
|
|
|
1
|
|
|
|
|
108
|
|
9
|
1
|
|
|
1
|
|
7
|
use Devel::GlobalDestruction; |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
18
|
|
10
|
1
|
|
|
1
|
|
109
|
use Carp qw(croak cluck); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
93
|
|
11
|
1
|
|
|
1
|
|
8
|
use POSIX qw(ceil); |
|
1
|
|
|
|
|
2
|
|
|
1
|
|
|
|
|
10
|
|
12
|
1
|
|
|
1
|
|
2028
|
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper } |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $VERSION = '1.21_01'; # TRIAL VERSION |
15
|
|
|
|
|
|
|
$VERSION = eval $VERSION; |
16
|
|
|
|
|
|
|
|
17
|
1
|
|
|
1
|
|
564
|
use namespace::clean; |
|
1
|
|
|
|
|
16756
|
|
|
1
|
|
|
|
|
8
|
|
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
use constant { |
20
|
1
|
|
|
|
|
5531
|
_ST_CLOSED => 0, |
21
|
|
|
|
|
|
|
_ST_OPENING => 1, |
22
|
|
|
|
|
|
|
_ST_OPEN => 2, |
23
|
1
|
|
|
1
|
|
294
|
}; |
|
1
|
|
|
|
|
2
|
|
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
sub new { |
26
|
0
|
|
|
0
|
0
|
|
my $class = shift; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
my $self = bless { |
29
|
|
|
|
0
|
|
|
on_close => sub {}, |
30
|
0
|
|
|
|
|
|
@_, # id, connection, on_return, on_close, on_inactive, on_active |
31
|
|
|
|
|
|
|
_queue => AnyEvent::RabbitMQ::LocalQueue->new, |
32
|
|
|
|
|
|
|
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new, |
33
|
|
|
|
|
|
|
}, $class; |
34
|
0
|
|
|
|
|
|
weaken($self->{connection}); |
35
|
0
|
|
|
|
|
|
return $self->_reset; |
36
|
|
|
|
|
|
|
} |
37
|
|
|
|
|
|
|
|
38
|
|
|
|
|
|
|
sub _reset { |
39
|
0
|
|
|
0
|
|
|
my $self = shift; |
40
|
|
|
|
|
|
|
|
41
|
0
|
|
|
|
|
|
my %a = ( |
42
|
|
|
|
|
|
|
_state => _ST_CLOSED, |
43
|
|
|
|
|
|
|
_is_active => 0, |
44
|
|
|
|
|
|
|
_is_confirm => 0, |
45
|
|
|
|
|
|
|
_publish_tag => 0, |
46
|
|
|
|
|
|
|
_publish_cbs => {}, # values: [on_ack, on_nack, on_return] |
47
|
|
|
|
|
|
|
_consumer_cbs => {}, # values: [on_consume, on_cancel...] |
48
|
|
|
|
|
|
|
); |
49
|
0
|
|
|
|
|
|
@$self{keys %a} = values %a; |
50
|
|
|
|
|
|
|
|
51
|
0
|
|
|
|
|
|
return $self; |
52
|
|
|
|
|
|
|
} |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
sub id { |
55
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
56
|
0
|
|
|
|
|
|
return $self->{id}; |
57
|
|
|
|
|
|
|
} |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
sub is_open { |
60
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
61
|
0
|
|
|
|
|
|
return $self->{_state} == _ST_OPEN; |
62
|
|
|
|
|
|
|
} |
63
|
|
|
|
|
|
|
|
64
|
|
|
|
|
|
|
sub is_active { |
65
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
66
|
0
|
|
|
|
|
|
return $self->{_is_active}; |
67
|
|
|
|
|
|
|
} |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
sub is_confirm { |
70
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
71
|
0
|
|
|
|
|
|
return $self->{_is_confirm}; |
72
|
|
|
|
|
|
|
} |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
sub queue { |
75
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
76
|
0
|
|
|
|
|
|
return $self->{_queue}; |
77
|
|
|
|
|
|
|
} |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
sub open { |
80
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
81
|
0
|
|
|
|
|
|
my %args = @_; |
82
|
|
|
|
|
|
|
|
83
|
0
|
0
|
|
|
|
|
if ($self->{_state} != _ST_CLOSED) { |
84
|
0
|
|
|
|
|
|
$args{on_failure}->('Channel has already been opened'); |
85
|
0
|
|
|
|
|
|
return $self; |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
0
|
|
|
|
|
|
$self->{_state} = _ST_OPENING; |
89
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
91
|
|
|
|
|
|
|
'Channel::Open', {}, 'Channel::OpenOk', |
92
|
|
|
|
|
|
|
sub { |
93
|
0
|
|
|
0
|
|
|
$self->{_state} = _ST_OPEN; |
94
|
0
|
|
|
|
|
|
$self->{_is_active} = 1; |
95
|
0
|
|
|
|
|
|
$args{on_success}->($self); |
96
|
|
|
|
|
|
|
}, |
97
|
|
|
|
|
|
|
sub { |
98
|
0
|
|
|
0
|
|
|
$self->{_state} = _ST_CLOSED; |
99
|
0
|
|
|
|
|
|
$args{on_failure}->($self); |
100
|
|
|
|
|
|
|
}, |
101
|
|
|
|
|
|
|
$self->{id}, |
102
|
0
|
|
|
|
|
|
); |
103
|
|
|
|
|
|
|
|
104
|
0
|
|
|
|
|
|
return $self; |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
sub close { |
108
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
109
|
|
|
|
|
|
|
my $connection = $self->{connection} |
110
|
0
|
0
|
|
|
|
|
or return; |
111
|
0
|
|
|
|
|
|
my %args = $connection->_set_cbs(@_); |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
# If open in in progess, wait for it; 1s arbitrary timing. |
114
|
|
|
|
|
|
|
|
115
|
0
|
|
|
|
|
|
weaken(my $wself = $self); |
116
|
0
|
|
|
|
|
|
my $t; $t = AE::timer 0, 1, sub { |
117
|
0
|
0
|
|
0
|
|
|
(my $self = $wself) or undef $t, return; |
118
|
0
|
0
|
|
|
|
|
return if $self->{_state} == _ST_OPENING; |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
# No more tests are required |
121
|
0
|
|
|
|
|
|
undef $t; |
122
|
|
|
|
|
|
|
|
123
|
|
|
|
|
|
|
# Double close is OK |
124
|
0
|
0
|
|
|
|
|
if ($self->{_state} == _ST_CLOSED) { |
125
|
0
|
|
|
|
|
|
$args{on_success}->($self); |
126
|
0
|
|
|
|
|
|
return; |
127
|
|
|
|
|
|
|
} |
128
|
|
|
|
|
|
|
|
129
|
|
|
|
|
|
|
$connection->_push_write( |
130
|
|
|
|
|
|
|
$self->_close_frame, |
131
|
|
|
|
|
|
|
$self->{id}, |
132
|
0
|
|
|
|
|
|
); |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
# The spec says that after a party sends Channel::Close, it MUST |
135
|
|
|
|
|
|
|
# discard all frames for that channel. So this channel is dead |
136
|
|
|
|
|
|
|
# immediately. |
137
|
0
|
|
|
|
|
|
$self->_closed(); |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
$connection->_push_read_and_valid( |
140
|
|
|
|
|
|
|
'Channel::CloseOk', |
141
|
|
|
|
|
|
|
sub { |
142
|
0
|
|
|
|
|
|
$args{on_success}->($self); |
143
|
0
|
|
|
|
|
|
$self->_orphan(); |
144
|
|
|
|
|
|
|
}, |
145
|
|
|
|
|
|
|
sub { |
146
|
0
|
|
|
|
|
|
$args{on_failure}->(@_); |
147
|
0
|
|
|
|
|
|
$self->_orphan(); |
148
|
|
|
|
|
|
|
}, |
149
|
|
|
|
|
|
|
$self->{id}, |
150
|
0
|
|
|
|
|
|
); |
151
|
0
|
|
|
|
|
|
}; |
152
|
|
|
|
|
|
|
|
153
|
0
|
|
|
|
|
|
return $self; |
154
|
|
|
|
|
|
|
} |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
sub _closed { |
157
|
0
|
|
|
0
|
|
|
my $self = shift; |
158
|
0
|
|
|
|
|
|
my ($frame,) = @_; |
159
|
0
|
|
0
|
|
|
|
$frame ||= $self->_close_frame(); |
160
|
|
|
|
|
|
|
|
161
|
0
|
0
|
|
|
|
|
return if $self->{_state} == _ST_CLOSED; |
162
|
0
|
|
|
|
|
|
$self->{_state} = _ST_CLOSED; |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
# Perform callbacks for all outstanding commands |
165
|
0
|
|
|
|
|
|
$self->{_queue}->_flush($frame); |
166
|
0
|
|
|
|
|
|
$self->{_content_queue}->_flush($frame); |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
# Fake nacks of all outstanding publishes |
169
|
0
|
|
|
|
|
|
$_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} }; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
# Report cancelation of all outstanding consumes |
172
|
0
|
|
|
|
|
|
my @tags = keys %{ $self->{_consumer_cbs} }; |
|
0
|
|
|
|
|
|
|
173
|
0
|
|
|
|
|
|
$self->_canceled($_, $frame) for @tags; |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Report close to on_close callback |
176
|
0
|
|
|
|
|
|
{ local $@; |
|
0
|
|
|
|
|
|
|
177
|
0
|
|
|
|
|
|
eval { $self->{on_close}->($frame) }; |
|
0
|
|
|
|
|
|
|
178
|
0
|
0
|
|
|
|
|
warn "Error in channel on_close callback, ignored:\n $@ " if $@; } |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
# Reset state (partly redundant) |
181
|
0
|
|
|
|
|
|
$self->_reset; |
182
|
|
|
|
|
|
|
|
183
|
0
|
|
|
|
|
|
return $self; |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
sub _close_frame { |
187
|
0
|
|
|
0
|
|
|
my $self = shift; |
188
|
0
|
|
|
|
|
|
my ($text,) = @_; |
189
|
|
|
|
|
|
|
|
190
|
0
|
|
|
|
|
|
Net::AMQP::Frame::Method->new( |
191
|
|
|
|
|
|
|
method_frame => Net::AMQP::Protocol::Channel::Close->new( |
192
|
|
|
|
|
|
|
reply_text => $text, |
193
|
|
|
|
|
|
|
), |
194
|
|
|
|
|
|
|
); |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
sub _orphan { |
198
|
0
|
|
|
0
|
|
|
my $self = shift; |
199
|
|
|
|
|
|
|
|
200
|
0
|
0
|
|
|
|
|
if (my $connection = $self->{connection}) { |
201
|
0
|
|
|
|
|
|
$connection->_delete_channel($self); |
202
|
|
|
|
|
|
|
} |
203
|
0
|
|
|
|
|
|
return $self; |
204
|
|
|
|
|
|
|
} |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
sub declare_exchange { |
207
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
208
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
209
|
|
|
|
|
|
|
|
210
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
213
|
|
|
|
|
|
|
'Exchange::Declare', |
214
|
|
|
|
|
|
|
{ |
215
|
|
|
|
|
|
|
type => 'direct', |
216
|
|
|
|
|
|
|
passive => 0, |
217
|
|
|
|
|
|
|
durable => 0, |
218
|
|
|
|
|
|
|
auto_delete => 0, |
219
|
|
|
|
|
|
|
internal => 0, |
220
|
|
|
|
|
|
|
%args, # exchange |
221
|
|
|
|
|
|
|
ticket => 0, |
222
|
|
|
|
|
|
|
nowait => 0, # FIXME |
223
|
|
|
|
|
|
|
}, |
224
|
|
|
|
|
|
|
'Exchange::DeclareOk', |
225
|
|
|
|
|
|
|
$cb, |
226
|
|
|
|
|
|
|
$failure_cb, |
227
|
|
|
|
|
|
|
$self->{id}, |
228
|
0
|
|
|
|
|
|
); |
229
|
|
|
|
|
|
|
|
230
|
0
|
|
|
|
|
|
return $self; |
231
|
|
|
|
|
|
|
} |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
sub bind_exchange { |
234
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
235
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
236
|
|
|
|
|
|
|
|
237
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
240
|
|
|
|
|
|
|
'Exchange::Bind', |
241
|
|
|
|
|
|
|
{ |
242
|
|
|
|
|
|
|
%args, # source, destination, routing_key |
243
|
|
|
|
|
|
|
ticket => 0, |
244
|
|
|
|
|
|
|
nowait => 0, # FIXME |
245
|
|
|
|
|
|
|
}, |
246
|
|
|
|
|
|
|
'Exchange::BindOk', |
247
|
|
|
|
|
|
|
$cb, |
248
|
|
|
|
|
|
|
$failure_cb, |
249
|
|
|
|
|
|
|
$self->{id}, |
250
|
0
|
|
|
|
|
|
); |
251
|
|
|
|
|
|
|
|
252
|
0
|
|
|
|
|
|
return $self; |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
sub unbind_exchange { |
256
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
257
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
258
|
|
|
|
|
|
|
|
259
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
262
|
|
|
|
|
|
|
'Exchange::Unbind', |
263
|
|
|
|
|
|
|
{ |
264
|
|
|
|
|
|
|
%args, # source, destination, routing_key |
265
|
|
|
|
|
|
|
ticket => 0, |
266
|
|
|
|
|
|
|
nowait => 0, # FIXME |
267
|
|
|
|
|
|
|
}, |
268
|
|
|
|
|
|
|
'Exchange::UnbindOk', |
269
|
|
|
|
|
|
|
$cb, |
270
|
|
|
|
|
|
|
$failure_cb, |
271
|
|
|
|
|
|
|
$self->{id}, |
272
|
0
|
|
|
|
|
|
); |
273
|
|
|
|
|
|
|
|
274
|
0
|
|
|
|
|
|
return $self; |
275
|
|
|
|
|
|
|
} |
276
|
|
|
|
|
|
|
|
277
|
|
|
|
|
|
|
sub delete_exchange { |
278
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
279
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
280
|
|
|
|
|
|
|
|
281
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
282
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
284
|
|
|
|
|
|
|
'Exchange::Delete', |
285
|
|
|
|
|
|
|
{ |
286
|
|
|
|
|
|
|
if_unused => 0, |
287
|
|
|
|
|
|
|
%args, # exchange |
288
|
|
|
|
|
|
|
ticket => 0, |
289
|
|
|
|
|
|
|
nowait => 0, # FIXME |
290
|
|
|
|
|
|
|
}, |
291
|
|
|
|
|
|
|
'Exchange::DeleteOk', |
292
|
|
|
|
|
|
|
$cb, |
293
|
|
|
|
|
|
|
$failure_cb, |
294
|
|
|
|
|
|
|
$self->{id}, |
295
|
0
|
|
|
|
|
|
); |
296
|
|
|
|
|
|
|
|
297
|
0
|
|
|
|
|
|
return $self; |
298
|
|
|
|
|
|
|
} |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
sub declare_queue { |
301
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
302
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
303
|
|
|
|
|
|
|
|
304
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
307
|
|
|
|
|
|
|
'Queue::Declare', |
308
|
|
|
|
|
|
|
{ |
309
|
|
|
|
|
|
|
queue => '', |
310
|
|
|
|
|
|
|
passive => 0, |
311
|
|
|
|
|
|
|
durable => 0, |
312
|
|
|
|
|
|
|
exclusive => 0, |
313
|
|
|
|
|
|
|
auto_delete => 0, |
314
|
|
|
|
|
|
|
no_ack => 1, |
315
|
|
|
|
|
|
|
%args, |
316
|
|
|
|
|
|
|
ticket => 0, |
317
|
|
|
|
|
|
|
nowait => 0, # FIXME |
318
|
|
|
|
|
|
|
}, |
319
|
|
|
|
|
|
|
'Queue::DeclareOk', |
320
|
|
|
|
|
|
|
$cb, |
321
|
|
|
|
|
|
|
$failure_cb, |
322
|
|
|
|
|
|
|
$self->{id}, |
323
|
0
|
|
|
|
|
|
); |
324
|
|
|
|
|
|
|
} |
325
|
|
|
|
|
|
|
|
326
|
|
|
|
|
|
|
sub bind_queue { |
327
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
328
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
329
|
|
|
|
|
|
|
|
330
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
331
|
|
|
|
|
|
|
|
332
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
333
|
|
|
|
|
|
|
'Queue::Bind', |
334
|
|
|
|
|
|
|
{ |
335
|
|
|
|
|
|
|
%args, # queue, exchange, routing_key |
336
|
|
|
|
|
|
|
ticket => 0, |
337
|
|
|
|
|
|
|
nowait => 0, # FIXME |
338
|
|
|
|
|
|
|
}, |
339
|
|
|
|
|
|
|
'Queue::BindOk', |
340
|
|
|
|
|
|
|
$cb, |
341
|
|
|
|
|
|
|
$failure_cb, |
342
|
|
|
|
|
|
|
$self->{id}, |
343
|
0
|
|
|
|
|
|
); |
344
|
|
|
|
|
|
|
|
345
|
0
|
|
|
|
|
|
return $self; |
346
|
|
|
|
|
|
|
} |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
sub unbind_queue { |
349
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
350
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
351
|
|
|
|
|
|
|
|
352
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
353
|
|
|
|
|
|
|
|
354
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
355
|
|
|
|
|
|
|
'Queue::Unbind', |
356
|
|
|
|
|
|
|
{ |
357
|
|
|
|
|
|
|
%args, # queue, exchange, routing_key |
358
|
|
|
|
|
|
|
ticket => 0, |
359
|
|
|
|
|
|
|
}, |
360
|
|
|
|
|
|
|
'Queue::UnbindOk', |
361
|
|
|
|
|
|
|
$cb, |
362
|
|
|
|
|
|
|
$failure_cb, |
363
|
|
|
|
|
|
|
$self->{id}, |
364
|
0
|
|
|
|
|
|
); |
365
|
|
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
return $self; |
367
|
|
|
|
|
|
|
} |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
sub purge_queue { |
370
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
371
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
372
|
|
|
|
|
|
|
|
373
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
374
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
376
|
|
|
|
|
|
|
'Queue::Purge', |
377
|
|
|
|
|
|
|
{ |
378
|
|
|
|
|
|
|
%args, # queue |
379
|
|
|
|
|
|
|
ticket => 0, |
380
|
|
|
|
|
|
|
nowait => 0, # FIXME |
381
|
|
|
|
|
|
|
}, |
382
|
|
|
|
|
|
|
'Queue::PurgeOk', |
383
|
|
|
|
|
|
|
$cb, |
384
|
|
|
|
|
|
|
$failure_cb, |
385
|
|
|
|
|
|
|
$self->{id}, |
386
|
0
|
|
|
|
|
|
); |
387
|
|
|
|
|
|
|
|
388
|
0
|
|
|
|
|
|
return $self; |
389
|
|
|
|
|
|
|
} |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
sub delete_queue { |
392
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
393
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
394
|
|
|
|
|
|
|
|
395
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
398
|
|
|
|
|
|
|
'Queue::Delete', |
399
|
|
|
|
|
|
|
{ |
400
|
|
|
|
|
|
|
if_unused => 0, |
401
|
|
|
|
|
|
|
if_empty => 0, |
402
|
|
|
|
|
|
|
%args, # queue |
403
|
|
|
|
|
|
|
ticket => 0, |
404
|
|
|
|
|
|
|
nowait => 0, # FIXME |
405
|
|
|
|
|
|
|
}, |
406
|
|
|
|
|
|
|
'Queue::DeleteOk', |
407
|
|
|
|
|
|
|
$cb, |
408
|
|
|
|
|
|
|
$failure_cb, |
409
|
|
|
|
|
|
|
$self->{id}, |
410
|
0
|
|
|
|
|
|
); |
411
|
|
|
|
|
|
|
|
412
|
0
|
|
|
|
|
|
return $self; |
413
|
|
|
|
|
|
|
} |
414
|
|
|
|
|
|
|
|
415
|
|
|
|
|
|
|
sub publish { |
416
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
417
|
0
|
|
|
|
|
|
my %args = @_; |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
# Docs should advise channel-level callback over this, but still, better to give user an out |
420
|
0
|
0
|
|
|
|
|
unless ($self->{_is_active}) { |
421
|
0
|
0
|
|
|
|
|
if (defined $args{on_inactive}) { |
422
|
0
|
|
|
|
|
|
$args{on_inactive}->(); |
423
|
0
|
|
|
|
|
|
return $self; |
424
|
|
|
|
|
|
|
} |
425
|
0
|
|
|
|
|
|
croak "Can't publish on inactive channel (server flow control); provide on_inactive callback"; |
426
|
|
|
|
|
|
|
} |
427
|
|
|
|
|
|
|
|
428
|
0
|
|
|
|
|
|
my $header_args = delete $args{header}; |
429
|
0
|
|
|
|
|
|
my $body = delete $args{body}; |
430
|
0
|
|
|
|
|
|
my $ack_cb = delete $args{on_ack}; |
431
|
0
|
|
|
|
|
|
my $nack_cb = delete $args{on_nack}; |
432
|
0
|
|
|
|
|
|
my $return_cb = delete $args{on_return}; |
433
|
|
|
|
|
|
|
|
434
|
0
|
0
|
|
|
|
|
defined($header_args) or $header_args = {}; |
435
|
0
|
0
|
|
|
|
|
defined($body) or $body = ''; |
436
|
0
|
0
|
0
|
|
|
|
if ( defined($ack_cb) or defined($nack_cb) or defined($return_cb) ) { |
|
|
|
0
|
|
|
|
|
437
|
|
|
|
|
|
|
cluck "Can't set on_ack/on_nack/on_return callback when not in confirm mode" |
438
|
0
|
0
|
|
|
|
|
unless $self->{_is_confirm}; |
439
|
|
|
|
|
|
|
} |
440
|
|
|
|
|
|
|
|
441
|
0
|
|
|
|
|
|
my $tag; |
442
|
0
|
0
|
|
|
|
|
if ($self->{_is_confirm}) { |
443
|
|
|
|
|
|
|
# yeah, delivery tags in acks are sequential. see Java client |
444
|
0
|
|
|
|
|
|
$tag = ++$self->{_publish_tag}; |
445
|
0
|
0
|
|
|
|
|
if ($return_cb) { |
446
|
0
|
|
|
|
|
|
$header_args = { %$header_args }; |
447
|
0
|
|
|
|
|
|
$header_args->{headers}->{_ar_return} = $tag; # just reuse the same value, why not |
448
|
|
|
|
|
|
|
} |
449
|
0
|
|
|
|
|
|
$self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb]; |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
|
452
|
|
|
|
|
|
|
$self->_publish( |
453
|
0
|
|
|
|
|
|
%args, |
454
|
|
|
|
|
|
|
)->_header( |
455
|
|
|
|
|
|
|
$header_args, $body, |
456
|
|
|
|
|
|
|
)->_body( |
457
|
|
|
|
|
|
|
$body, |
458
|
|
|
|
|
|
|
); |
459
|
|
|
|
|
|
|
|
460
|
0
|
|
|
|
|
|
return $self; |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
sub _publish { |
464
|
0
|
|
|
0
|
|
|
my $self = shift; |
465
|
0
|
|
|
|
|
|
my %args = @_; |
466
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
468
|
|
|
|
|
|
|
Net::AMQP::Protocol::Basic::Publish->new( |
469
|
|
|
|
|
|
|
exchange => '', |
470
|
|
|
|
|
|
|
mandatory => 0, |
471
|
|
|
|
|
|
|
immediate => 0, |
472
|
|
|
|
|
|
|
%args, # routing_key |
473
|
|
|
|
|
|
|
ticket => 0, |
474
|
|
|
|
|
|
|
), |
475
|
|
|
|
|
|
|
$self->{id}, |
476
|
0
|
|
|
|
|
|
); |
477
|
|
|
|
|
|
|
|
478
|
0
|
|
|
|
|
|
return $self; |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
sub _header { |
482
|
0
|
|
|
0
|
|
|
my ($self, $args, $body) = @_; |
483
|
|
|
|
|
|
|
|
484
|
0
|
|
0
|
|
|
|
my $weight = delete $args->{weight} || 0; |
485
|
|
|
|
|
|
|
|
486
|
|
|
|
|
|
|
# user-provided message headers must be strings. protect values that look like numbers. |
487
|
0
|
|
0
|
|
|
|
my $headers = $args->{headers} || {}; |
488
|
0
|
0
|
|
|
|
|
my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers; |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
489
|
0
|
0
|
|
|
|
|
if (@prot) { |
490
|
|
|
|
|
|
|
$headers = { |
491
|
|
|
|
|
|
|
%$headers, |
492
|
0
|
|
|
|
|
|
map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot |
|
0
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
}; |
494
|
|
|
|
|
|
|
} |
495
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
497
|
|
|
|
|
|
|
Net::AMQP::Frame::Header->new( |
498
|
|
|
|
|
|
|
weight => $weight, |
499
|
|
|
|
|
|
|
body_size => length($body), |
500
|
|
|
|
|
|
|
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new( |
501
|
|
|
|
|
|
|
content_type => 'application/octet-stream', |
502
|
|
|
|
|
|
|
content_encoding => undef, |
503
|
|
|
|
|
|
|
delivery_mode => 1, |
504
|
|
|
|
|
|
|
priority => 1, |
505
|
|
|
|
|
|
|
correlation_id => undef, |
506
|
|
|
|
|
|
|
expiration => undef, |
507
|
|
|
|
|
|
|
message_id => undef, |
508
|
|
|
|
|
|
|
timestamp => time, |
509
|
|
|
|
|
|
|
type => undef, |
510
|
|
|
|
|
|
|
user_id => $self->{connection}->login_user, |
511
|
|
|
|
|
|
|
app_id => undef, |
512
|
|
|
|
|
|
|
cluster_id => undef, |
513
|
|
|
|
|
|
|
%$args, |
514
|
|
|
|
|
|
|
headers => $headers, |
515
|
|
|
|
|
|
|
), |
516
|
|
|
|
|
|
|
), |
517
|
|
|
|
|
|
|
$self->{id}, |
518
|
0
|
|
|
|
|
|
); |
519
|
|
|
|
|
|
|
|
520
|
0
|
|
|
|
|
|
return $self; |
521
|
|
|
|
|
|
|
} |
522
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
sub _body { |
524
|
0
|
|
|
0
|
|
|
my ($self, $body,) = @_; |
525
|
|
|
|
|
|
|
|
526
|
0
|
|
0
|
|
|
|
my $body_max = $self->{connection}->{_body_max} || length $body; |
527
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
# chunk up body into segments measured by $frame_max |
529
|
0
|
|
|
|
|
|
while (length $body) { |
530
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
531
|
|
|
|
|
|
|
Net::AMQP::Frame::Body->new( |
532
|
|
|
|
|
|
|
payload => substr($body, 0, $body_max, '')), |
533
|
|
|
|
|
|
|
$self->{id} |
534
|
0
|
|
|
|
|
|
); |
535
|
|
|
|
|
|
|
} |
536
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
|
return $self; |
538
|
|
|
|
|
|
|
} |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
sub consume { |
541
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
542
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
543
|
|
|
|
|
|
|
|
544
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
545
|
|
|
|
|
|
|
|
546
|
0
|
|
0
|
0
|
|
|
my $consumer_cb = delete $args{on_consume} || sub {}; |
547
|
0
|
|
0
|
0
|
|
|
my $cancel_cb = delete $args{on_cancel} || sub {}; |
548
|
0
|
|
0
|
|
|
|
my $no_ack = delete $args{no_ack} // 1; |
549
|
|
|
|
|
|
|
|
550
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
551
|
|
|
|
|
|
|
'Basic::Consume', |
552
|
|
|
|
|
|
|
{ |
553
|
|
|
|
|
|
|
consumer_tag => '', |
554
|
|
|
|
|
|
|
no_local => 0, |
555
|
|
|
|
|
|
|
no_ack => $no_ack, |
556
|
|
|
|
|
|
|
exclusive => 0, |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
%args, # queue |
559
|
|
|
|
|
|
|
ticket => 0, |
560
|
|
|
|
|
|
|
nowait => 0, # FIXME |
561
|
|
|
|
|
|
|
}, |
562
|
|
|
|
|
|
|
'Basic::ConsumeOk', |
563
|
|
|
|
|
|
|
sub { |
564
|
0
|
|
|
0
|
|
|
my $frame = shift; |
565
|
0
|
|
|
|
|
|
my $tag = $frame->method_frame->consumer_tag; |
566
|
0
|
|
|
|
|
|
$self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ]; |
567
|
0
|
|
|
|
|
|
$cb->($frame); |
568
|
|
|
|
|
|
|
}, |
569
|
|
|
|
|
|
|
$failure_cb, |
570
|
|
|
|
|
|
|
$self->{id}, |
571
|
0
|
|
|
|
|
|
); |
572
|
|
|
|
|
|
|
|
573
|
0
|
|
|
|
|
|
return $self; |
574
|
|
|
|
|
|
|
} |
575
|
|
|
|
|
|
|
|
576
|
|
|
|
|
|
|
sub cancel { |
577
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
578
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
579
|
|
|
|
|
|
|
|
580
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
581
|
|
|
|
|
|
|
|
582
|
0
|
0
|
|
|
|
|
if (!defined $args{consumer_tag}) { |
583
|
0
|
|
|
|
|
|
$failure_cb->('consumer_tag is not set'); |
584
|
0
|
|
|
|
|
|
return $self; |
585
|
|
|
|
|
|
|
} |
586
|
|
|
|
|
|
|
|
587
|
0
|
|
|
|
|
|
my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}}; |
588
|
0
|
0
|
|
|
|
|
unless ($cons_cbs) { |
589
|
0
|
|
|
|
|
|
$failure_cb->('Unknown consumer_tag'); |
590
|
0
|
|
|
|
|
|
return $self; |
591
|
|
|
|
|
|
|
} |
592
|
0
|
|
|
|
|
|
push @$cons_cbs, $cb; |
593
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
595
|
|
|
|
|
|
|
Net::AMQP::Protocol::Basic::Cancel->new( |
596
|
|
|
|
|
|
|
%args, # consumer_tag |
597
|
|
|
|
|
|
|
nowait => 0, |
598
|
|
|
|
|
|
|
), |
599
|
|
|
|
|
|
|
$self->{id}, |
600
|
0
|
|
|
|
|
|
); |
601
|
|
|
|
|
|
|
|
602
|
0
|
|
|
|
|
|
return $self; |
603
|
|
|
|
|
|
|
} |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
sub _canceled { |
606
|
0
|
|
|
0
|
|
|
my $self = shift; |
607
|
0
|
|
|
|
|
|
my ($tag, $frame,) = @_; |
608
|
|
|
|
|
|
|
|
609
|
0
|
0
|
|
|
|
|
my $cons_cbs = delete $self->{_consumer_cbs}->{$tag} |
610
|
|
|
|
|
|
|
or return 0; |
611
|
|
|
|
|
|
|
|
612
|
0
|
|
|
|
|
|
shift @$cons_cbs; # no more deliveries |
613
|
0
|
|
|
|
|
|
for my $cb (reverse @$cons_cbs) { |
614
|
0
|
|
|
|
|
|
$cb->($frame); |
615
|
|
|
|
|
|
|
} |
616
|
0
|
|
|
|
|
|
return 1; |
617
|
|
|
|
|
|
|
} |
618
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
sub get { |
620
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
621
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
622
|
|
|
|
|
|
|
|
623
|
0
|
|
0
|
|
|
|
my $no_ack = delete $args{no_ack} // 1; |
624
|
|
|
|
|
|
|
|
625
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
626
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
628
|
|
|
|
|
|
|
'Basic::Get', |
629
|
|
|
|
|
|
|
{ |
630
|
|
|
|
|
|
|
no_ack => $no_ack, |
631
|
|
|
|
|
|
|
%args, # queue |
632
|
|
|
|
|
|
|
ticket => 0, |
633
|
|
|
|
|
|
|
}, |
634
|
|
|
|
|
|
|
[qw(Basic::GetOk Basic::GetEmpty)], |
635
|
|
|
|
|
|
|
sub { |
636
|
0
|
|
|
0
|
|
|
my $frame = shift; |
637
|
0
|
0
|
|
|
|
|
return $cb->({empty => $frame}) |
638
|
|
|
|
|
|
|
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty'); |
639
|
0
|
|
|
|
|
|
$self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb); |
640
|
|
|
|
|
|
|
}, |
641
|
|
|
|
|
|
|
$failure_cb, |
642
|
|
|
|
|
|
|
$self->{id}, |
643
|
0
|
|
|
|
|
|
); |
644
|
|
|
|
|
|
|
|
645
|
0
|
|
|
|
|
|
return $self; |
646
|
|
|
|
|
|
|
} |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
sub ack { |
649
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
650
|
0
|
|
|
|
|
|
my %args = @_; |
651
|
|
|
|
|
|
|
|
652
|
0
|
0
|
|
0
|
|
|
return $self if !$self->_check_open(sub {}); |
653
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
655
|
|
|
|
|
|
|
Net::AMQP::Protocol::Basic::Ack->new( |
656
|
|
|
|
|
|
|
delivery_tag => 0, |
657
|
|
|
|
|
|
|
multiple => ( |
658
|
|
|
|
|
|
|
defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1 |
659
|
|
|
|
|
|
|
), |
660
|
|
|
|
|
|
|
%args, |
661
|
|
|
|
|
|
|
), |
662
|
|
|
|
|
|
|
$self->{id}, |
663
|
0
|
0
|
0
|
|
|
|
); |
664
|
|
|
|
|
|
|
|
665
|
0
|
|
|
|
|
|
return $self; |
666
|
|
|
|
|
|
|
} |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
sub qos { |
669
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
670
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
671
|
|
|
|
|
|
|
|
672
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
673
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
675
|
|
|
|
|
|
|
'Basic::Qos', |
676
|
|
|
|
|
|
|
{ |
677
|
|
|
|
|
|
|
prefetch_count => 1, |
678
|
|
|
|
|
|
|
prefetch_size => 0, |
679
|
|
|
|
|
|
|
global => 0, |
680
|
|
|
|
|
|
|
%args, |
681
|
|
|
|
|
|
|
}, |
682
|
|
|
|
|
|
|
'Basic::QosOk', |
683
|
|
|
|
|
|
|
$cb, |
684
|
|
|
|
|
|
|
$failure_cb, |
685
|
|
|
|
|
|
|
$self->{id}, |
686
|
0
|
|
|
|
|
|
); |
687
|
|
|
|
|
|
|
|
688
|
0
|
|
|
|
|
|
return $self; |
689
|
|
|
|
|
|
|
} |
690
|
|
|
|
|
|
|
|
691
|
|
|
|
|
|
|
sub confirm { |
692
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
693
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
694
|
|
|
|
|
|
|
|
695
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
696
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_version(0, 9, $failure_cb); |
697
|
|
|
|
|
|
|
|
698
|
0
|
|
|
|
|
|
weaken(my $wself = $self); |
699
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
701
|
|
|
|
|
|
|
'Confirm::Select', |
702
|
|
|
|
|
|
|
{ |
703
|
|
|
|
|
|
|
%args, |
704
|
|
|
|
|
|
|
nowait => 0, # FIXME |
705
|
|
|
|
|
|
|
}, |
706
|
|
|
|
|
|
|
'Confirm::SelectOk', |
707
|
|
|
|
|
|
|
sub { |
708
|
0
|
0
|
|
0
|
|
|
my $me = $wself or return; |
709
|
0
|
|
|
|
|
|
$me->{_is_confirm} = 1; |
710
|
0
|
|
|
|
|
|
$cb->(); |
711
|
|
|
|
|
|
|
}, |
712
|
|
|
|
|
|
|
$failure_cb, |
713
|
|
|
|
|
|
|
$self->{id}, |
714
|
0
|
|
|
|
|
|
); |
715
|
|
|
|
|
|
|
|
716
|
0
|
|
|
|
|
|
return $self; |
717
|
|
|
|
|
|
|
} |
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
sub recover { |
720
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
721
|
0
|
|
|
|
|
|
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_); |
722
|
|
|
|
|
|
|
|
723
|
0
|
0
|
|
0
|
|
|
return $self if !$self->_check_open(sub {}); |
724
|
|
|
|
|
|
|
|
725
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
726
|
|
|
|
|
|
|
Net::AMQP::Protocol::Basic::Recover->new( |
727
|
|
|
|
|
|
|
requeue => 1, |
728
|
|
|
|
|
|
|
%args, |
729
|
|
|
|
|
|
|
), |
730
|
|
|
|
|
|
|
$self->{id}, |
731
|
0
|
|
|
|
|
|
); |
732
|
|
|
|
|
|
|
|
733
|
0
|
0
|
0
|
|
|
|
if (!$args{nowait} && $self->_check_version(0, 9)) { |
734
|
|
|
|
|
|
|
$self->{connection}->_push_read_and_valid( |
735
|
|
|
|
|
|
|
'Basic::RecoverOk', |
736
|
|
|
|
|
|
|
$cb, |
737
|
|
|
|
|
|
|
$failure_cb, |
738
|
|
|
|
|
|
|
$self->{id}, |
739
|
0
|
|
|
|
|
|
); |
740
|
|
|
|
|
|
|
} |
741
|
|
|
|
|
|
|
else { |
742
|
0
|
|
|
|
|
|
$cb->(); |
743
|
|
|
|
|
|
|
} |
744
|
|
|
|
|
|
|
|
745
|
0
|
|
|
|
|
|
return $self; |
746
|
|
|
|
|
|
|
} |
747
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
sub reject { |
749
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
750
|
0
|
|
|
|
|
|
my %args = @_; |
751
|
|
|
|
|
|
|
|
752
|
0
|
0
|
|
0
|
|
|
return $self if !$self->_check_open( sub { } ); |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
755
|
|
|
|
|
|
|
Net::AMQP::Protocol::Basic::Reject->new( |
756
|
|
|
|
|
|
|
delivery_tag => 0, |
757
|
|
|
|
|
|
|
requeue => 0, |
758
|
|
|
|
|
|
|
%args, |
759
|
|
|
|
|
|
|
), |
760
|
|
|
|
|
|
|
$self->{id}, |
761
|
0
|
|
|
|
|
|
); |
762
|
|
|
|
|
|
|
|
763
|
0
|
|
|
|
|
|
return $self; |
764
|
|
|
|
|
|
|
} |
765
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
sub select_tx { |
767
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
768
|
0
|
|
|
|
|
|
my ($cb, $failure_cb,) = $self->_delete_cbs(@_); |
769
|
|
|
|
|
|
|
|
770
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
771
|
|
|
|
|
|
|
|
772
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
773
|
|
|
|
|
|
|
'Tx::Select', {}, 'Tx::SelectOk', |
774
|
|
|
|
|
|
|
$cb, |
775
|
|
|
|
|
|
|
$failure_cb, |
776
|
|
|
|
|
|
|
$self->{id}, |
777
|
0
|
|
|
|
|
|
); |
778
|
|
|
|
|
|
|
|
779
|
0
|
|
|
|
|
|
return $self; |
780
|
|
|
|
|
|
|
} |
781
|
|
|
|
|
|
|
|
782
|
|
|
|
|
|
|
sub commit_tx { |
783
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
784
|
0
|
|
|
|
|
|
my ($cb, $failure_cb,) = $self->_delete_cbs(@_); |
785
|
|
|
|
|
|
|
|
786
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
787
|
|
|
|
|
|
|
|
788
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
789
|
|
|
|
|
|
|
'Tx::Commit', {}, 'Tx::CommitOk', |
790
|
|
|
|
|
|
|
$cb, |
791
|
|
|
|
|
|
|
$failure_cb, |
792
|
|
|
|
|
|
|
$self->{id}, |
793
|
0
|
|
|
|
|
|
); |
794
|
|
|
|
|
|
|
|
795
|
0
|
|
|
|
|
|
return $self; |
796
|
|
|
|
|
|
|
} |
797
|
|
|
|
|
|
|
|
798
|
|
|
|
|
|
|
sub rollback_tx { |
799
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
800
|
0
|
|
|
|
|
|
my ($cb, $failure_cb,) = $self->_delete_cbs(@_); |
801
|
|
|
|
|
|
|
|
802
|
0
|
0
|
|
|
|
|
return $self if !$self->_check_open($failure_cb); |
803
|
|
|
|
|
|
|
|
804
|
|
|
|
|
|
|
$self->{connection}->_push_write_and_read( |
805
|
|
|
|
|
|
|
'Tx::Rollback', {}, 'Tx::RollbackOk', |
806
|
|
|
|
|
|
|
$cb, |
807
|
|
|
|
|
|
|
$failure_cb, |
808
|
|
|
|
|
|
|
$self->{id}, |
809
|
0
|
|
|
|
|
|
); |
810
|
|
|
|
|
|
|
|
811
|
0
|
|
|
|
|
|
return $self; |
812
|
|
|
|
|
|
|
} |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
sub push_queue_or_consume { |
815
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
816
|
0
|
|
|
|
|
|
my ($frame, $failure_cb,) = @_; |
817
|
|
|
|
|
|
|
|
818
|
|
|
|
|
|
|
# Note: the spec says that after a party sends Channel::Close, it MUST |
819
|
|
|
|
|
|
|
# discard all frames for that channel other than Close and CloseOk. |
820
|
|
|
|
|
|
|
|
821
|
0
|
0
|
|
|
|
|
if ($frame->isa('Net::AMQP::Frame::Method')) { |
822
|
0
|
|
|
|
|
|
my $method_frame = $frame->method_frame; |
823
|
0
|
0
|
0
|
|
|
|
if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { |
|
|
0
|
0
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
824
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
825
|
|
|
|
|
|
|
Net::AMQP::Protocol::Channel::CloseOk->new(), |
826
|
|
|
|
|
|
|
$self->{id}, |
827
|
0
|
|
|
|
|
|
); |
828
|
0
|
|
|
|
|
|
$self->_closed($frame); |
829
|
0
|
|
|
|
|
|
$self->_orphan(); |
830
|
0
|
|
|
|
|
|
return $self; |
831
|
|
|
|
|
|
|
} elsif ($self->{_state} != _ST_OPEN) { |
832
|
0
|
0
|
0
|
|
|
|
if ($method_frame->isa('Net::AMQP::Protocol::Channel::OpenOk') || |
833
|
|
|
|
|
|
|
$method_frame->isa('Net::AMQP::Protocol::Channel::CloseOk')) { |
834
|
0
|
|
|
|
|
|
$self->{_queue}->push($frame); |
835
|
|
|
|
|
|
|
} |
836
|
0
|
|
|
|
|
|
return $self; |
837
|
|
|
|
|
|
|
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { |
838
|
0
|
|
|
|
|
|
my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag}; |
839
|
0
|
|
0
|
0
|
|
|
my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {}; |
840
|
0
|
|
|
|
|
|
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb); |
841
|
0
|
|
|
|
|
|
return $self; |
842
|
|
|
|
|
|
|
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') || |
843
|
|
|
|
|
|
|
$method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) { |
844
|
|
|
|
|
|
|
# CancelOk means we asked for a cancel. |
845
|
|
|
|
|
|
|
# Cancel means queue was deleted; it is not AMQP, but RMQ supports it. |
846
|
0
|
0
|
0
|
|
|
|
if (!$self->_canceled($method_frame->consumer_tag, $frame) |
847
|
|
|
|
|
|
|
&& $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) { |
848
|
0
|
|
|
|
|
|
$failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag); |
849
|
|
|
|
|
|
|
} |
850
|
0
|
|
|
|
|
|
return $self; |
851
|
|
|
|
|
|
|
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { |
852
|
0
|
|
|
|
|
|
weaken(my $wself = $self); |
853
|
|
|
|
|
|
|
my $cb = sub { |
854
|
0
|
|
|
0
|
|
|
my $ret = shift; |
855
|
0
|
0
|
|
|
|
|
my $me = $wself or return; |
856
|
0
|
|
0
|
|
|
|
my $headers = $ret->{header}->headers || {}; |
857
|
0
|
|
|
|
|
|
my $onret_cb; |
858
|
0
|
0
|
|
|
|
|
if (defined(my $tag = $headers->{_ar_return})) { |
859
|
0
|
|
|
|
|
|
my $cbs = $me->{_publish_cbs}->{$tag}; |
860
|
0
|
0
|
|
|
|
|
$onret_cb = $cbs->[2] if $cbs; |
861
|
|
|
|
|
|
|
} |
862
|
0
|
|
0
|
|
|
|
$onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {}; # oh well |
|
|
|
0
|
|
|
|
|
863
|
0
|
|
|
|
|
|
$onret_cb->($frame); |
864
|
0
|
|
|
|
|
|
}; |
865
|
0
|
|
|
|
|
|
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb); |
866
|
0
|
|
|
|
|
|
return $self; |
867
|
|
|
|
|
|
|
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') || |
868
|
|
|
|
|
|
|
$method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) { |
869
|
0
|
|
|
|
|
|
(my $resp = ref($method_frame)) =~ s/.*:://; |
870
|
0
|
|
|
|
|
|
my $cbs; |
871
|
0
|
0
|
|
|
|
|
if (!$self->{_is_confirm}) { |
872
|
0
|
|
|
|
|
|
$failure_cb->("Received $resp when not in confirm mode"); |
873
|
|
|
|
|
|
|
} |
874
|
|
|
|
|
|
|
else { |
875
|
0
|
|
|
|
|
|
my @tags; |
876
|
0
|
0
|
|
|
|
|
if ($method_frame->{multiple}) { |
877
|
0
|
|
|
|
|
|
@tags = sort { $a <=> $b } |
878
|
0
|
|
|
|
|
|
grep { $_ <= $method_frame->{delivery_tag} } |
879
|
0
|
|
|
|
|
|
keys %{$self->{_publish_cbs}}; |
|
0
|
|
|
|
|
|
|
880
|
|
|
|
|
|
|
} |
881
|
|
|
|
|
|
|
else { |
882
|
0
|
|
|
|
|
|
@tags = ($method_frame->{delivery_tag}); |
883
|
|
|
|
|
|
|
} |
884
|
0
|
0
|
|
|
|
|
my $cbi = ($resp eq 'Ack') ? 0 : 1; |
885
|
0
|
|
|
|
|
|
for my $tag (@tags) { |
886
|
0
|
|
|
|
|
|
my $cbs; |
887
|
0
|
0
|
|
|
|
|
if (not $cbs = delete $self->{_publish_cbs}->{$tag}) { |
|
|
0
|
|
|
|
|
|
888
|
0
|
|
|
|
|
|
$failure_cb->("Received $resp of unknown delivery tag $tag"); |
889
|
|
|
|
|
|
|
} |
890
|
|
|
|
|
|
|
elsif ($cbs->[$cbi]) { |
891
|
0
|
|
|
|
|
|
$cbs->[$cbi]->($frame); |
892
|
|
|
|
|
|
|
} |
893
|
|
|
|
|
|
|
} |
894
|
|
|
|
|
|
|
} |
895
|
0
|
|
|
|
|
|
return $self; |
896
|
|
|
|
|
|
|
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { |
897
|
0
|
|
|
|
|
|
$self->{_is_active} = $method_frame->active; |
898
|
|
|
|
|
|
|
$self->{connection}->_push_write( |
899
|
|
|
|
|
|
|
Net::AMQP::Protocol::Channel::FlowOk->new( |
900
|
|
|
|
|
|
|
active => $method_frame->active, |
901
|
|
|
|
|
|
|
), |
902
|
|
|
|
|
|
|
$self->{id}, |
903
|
0
|
|
|
|
|
|
); |
904
|
0
|
0
|
|
|
|
|
my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive'; |
905
|
0
|
|
0
|
0
|
|
|
my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {}; |
906
|
0
|
|
|
|
|
|
$cb->($frame); |
907
|
0
|
|
|
|
|
|
return $self; |
908
|
|
|
|
|
|
|
} |
909
|
0
|
|
|
|
|
|
$self->{_queue}->push($frame); |
910
|
|
|
|
|
|
|
} else { |
911
|
0
|
|
|
|
|
|
$self->{_content_queue}->push($frame); |
912
|
|
|
|
|
|
|
} |
913
|
|
|
|
|
|
|
|
914
|
0
|
|
|
|
|
|
return $self; |
915
|
|
|
|
|
|
|
} |
916
|
|
|
|
|
|
|
|
917
|
|
|
|
|
|
|
sub _push_read_header_and_body { |
918
|
0
|
|
|
0
|
|
|
my $self = shift; |
919
|
0
|
|
|
|
|
|
my ($type, $frame, $cb, $failure_cb,) = @_; |
920
|
0
|
|
|
|
|
|
my $response = {$type => $frame}; |
921
|
0
|
|
|
|
|
|
my $body_size = 0; |
922
|
0
|
|
|
|
|
|
my $body_payload = ""; |
923
|
|
|
|
|
|
|
|
924
|
0
|
|
|
|
|
|
weaken(my $wcontq = $self->{_content_queue}); |
925
|
0
|
|
|
|
|
|
my $w_body_frame; |
926
|
|
|
|
|
|
|
my $body_frame = sub { |
927
|
0
|
|
|
0
|
|
|
my $frame = shift; |
928
|
|
|
|
|
|
|
|
929
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not body frame') |
930
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Body'); |
931
|
|
|
|
|
|
|
|
932
|
0
|
|
|
|
|
|
$body_payload .= $frame->payload; |
933
|
|
|
|
|
|
|
|
934
|
0
|
0
|
|
|
|
|
if (length($body_payload) < $body_size) { |
935
|
|
|
|
|
|
|
# More to come |
936
|
0
|
0
|
|
|
|
|
my $contq = $wcontq or return; |
937
|
0
|
|
|
|
|
|
$contq->get($w_body_frame); |
938
|
|
|
|
|
|
|
} |
939
|
|
|
|
|
|
|
else { |
940
|
0
|
|
|
|
|
|
$frame->payload($body_payload); |
941
|
0
|
|
|
|
|
|
$response->{body} = $frame; |
942
|
0
|
|
|
|
|
|
$cb->($response); |
943
|
|
|
|
|
|
|
} |
944
|
0
|
|
|
|
|
|
}; |
945
|
0
|
|
|
|
|
|
$w_body_frame = $body_frame; |
946
|
0
|
|
|
|
|
|
weaken($w_body_frame); |
947
|
|
|
|
|
|
|
|
948
|
|
|
|
|
|
|
$self->{_content_queue}->get(sub{ |
949
|
0
|
|
|
0
|
|
|
my $frame = shift; |
950
|
|
|
|
|
|
|
|
951
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not header frame') |
952
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Header'); |
953
|
|
|
|
|
|
|
|
954
|
0
|
|
|
|
|
|
my $header_frame = $frame->header_frame; |
955
|
0
|
0
|
|
|
|
|
return $failure_cb->( |
956
|
|
|
|
|
|
|
'Header is not Protocol::Basic::ContentHeader' |
957
|
|
|
|
|
|
|
. 'Header was ' . ref $header_frame |
958
|
|
|
|
|
|
|
) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); |
959
|
|
|
|
|
|
|
|
960
|
0
|
|
|
|
|
|
$response->{header} = $header_frame; |
961
|
|
|
|
|
|
|
|
962
|
0
|
|
|
|
|
|
$body_size = $frame->body_size; |
963
|
0
|
0
|
|
|
|
|
if ( $body_size ) { |
964
|
0
|
0
|
|
|
|
|
my $contq = $wcontq or return; |
965
|
0
|
|
|
|
|
|
$contq->get($body_frame); |
966
|
|
|
|
|
|
|
} else { |
967
|
0
|
|
|
|
|
|
$response->{body} = undef; |
968
|
0
|
|
|
|
|
|
$cb->($response); |
969
|
|
|
|
|
|
|
} |
970
|
0
|
|
|
|
|
|
}); |
971
|
|
|
|
|
|
|
|
972
|
0
|
|
|
|
|
|
return $self; |
973
|
|
|
|
|
|
|
} |
974
|
|
|
|
|
|
|
|
975
|
|
|
|
|
|
|
sub _delete_cbs { |
976
|
0
|
|
|
0
|
|
|
my $self = shift; |
977
|
0
|
|
|
|
|
|
my %args = @_; |
978
|
|
|
|
|
|
|
|
979
|
0
|
|
0
|
0
|
|
|
my $cb = delete $args{on_success} || sub {}; |
980
|
0
|
|
0
|
0
|
|
|
my $failure_cb = delete $args{on_failure} || sub {die @_}; |
|
0
|
|
|
|
|
|
|
981
|
|
|
|
|
|
|
|
982
|
0
|
|
|
|
|
|
return $cb, $failure_cb, %args; |
983
|
|
|
|
|
|
|
} |
984
|
|
|
|
|
|
|
|
985
|
|
|
|
|
|
|
sub _check_open { |
986
|
0
|
|
|
0
|
|
|
my $self = shift; |
987
|
0
|
|
|
|
|
|
my ($failure_cb) = @_; |
988
|
|
|
|
|
|
|
|
989
|
0
|
0
|
|
|
|
|
return 1 if $self->is_open(); |
990
|
|
|
|
|
|
|
|
991
|
0
|
|
|
|
|
|
$failure_cb->('Channel has already been closed'); |
992
|
0
|
|
|
|
|
|
return 0; |
993
|
|
|
|
|
|
|
} |
994
|
|
|
|
|
|
|
|
995
|
|
|
|
|
|
|
sub _check_version { |
996
|
0
|
|
|
0
|
|
|
my $self = shift; |
997
|
0
|
|
|
|
|
|
my ($major, $minor, $failure_cb) = @_; |
998
|
|
|
|
|
|
|
|
999
|
0
|
|
|
|
|
|
my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR; |
1000
|
0
|
|
|
|
|
|
my $amin = $Net::AMQP::Protocol::VERSION_MINOR; |
1001
|
|
|
|
|
|
|
|
1002
|
0
|
0
|
0
|
|
|
|
return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor; |
|
|
|
0
|
|
|
|
|
1003
|
|
|
|
|
|
|
|
1004
|
0
|
0
|
|
|
|
|
$failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb; |
1005
|
0
|
|
|
|
|
|
return 0; |
1006
|
|
|
|
|
|
|
} |
1007
|
|
|
|
|
|
|
|
1008
|
|
|
|
|
|
|
sub DESTROY { |
1009
|
0
|
|
|
0
|
|
|
my $self = shift; |
1010
|
0
|
0
|
0
|
|
|
|
$self->close() if !in_global_destruction && $self->is_open(); |
1011
|
0
|
|
|
|
|
|
return; |
1012
|
|
|
|
|
|
|
} |
1013
|
|
|
|
|
|
|
|
1014
|
|
|
|
|
|
|
1; |
1015
|
|
|
|
|
|
|
__END__ |