line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Mojo::RabbitMQ::Client::Channel; |
2
|
5
|
|
|
5
|
|
35
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
52
|
|
3
|
|
|
|
|
|
|
|
4
|
5
|
|
|
5
|
|
1024
|
use Mojo::Promise; |
|
5
|
|
|
|
|
12
|
|
|
5
|
|
|
|
|
36
|
|
5
|
5
|
|
|
5
|
|
2551
|
use Mojo::RabbitMQ::Client::LocalQueue; |
|
5
|
|
|
|
|
17
|
|
|
5
|
|
|
|
|
38
|
|
6
|
5
|
|
|
5
|
|
2547
|
use Mojo::RabbitMQ::Client::Method; |
|
5
|
|
|
|
|
16
|
|
|
5
|
|
|
|
|
41
|
|
7
|
5
|
|
|
5
|
|
2573
|
use Mojo::RabbitMQ::Client::Method::Publish; |
|
5
|
|
|
|
|
16
|
|
|
5
|
|
|
|
|
57
|
|
8
|
5
|
|
|
5
|
|
219
|
use Scalar::Util qw(isweak weaken); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
342
|
|
9
|
|
|
|
|
|
|
|
10
|
5
|
|
50
|
5
|
|
31
|
use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
25279
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
has id => 0; |
13
|
|
|
|
|
|
|
has is_open => 0; |
14
|
|
|
|
|
|
|
has is_active => 0; |
15
|
|
|
|
|
|
|
has client => undef; |
16
|
|
|
|
|
|
|
has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
17
|
|
|
|
|
|
|
has content_queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
18
|
|
|
|
|
|
|
has consumer_cbs => sub { {} }; |
19
|
|
|
|
|
|
|
has return_cbs => sub { {} }; |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
sub _open { |
22
|
0
|
|
|
0
|
|
|
warn "Deprecated call to _open on channel"; |
23
|
0
|
|
|
|
|
|
return shift->open(@_); |
24
|
|
|
|
|
|
|
} |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
sub open { |
27
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
28
|
|
|
|
|
|
|
|
29
|
0
|
0
|
|
|
|
|
if ($self->is_open) { |
30
|
0
|
|
|
|
|
|
$self->emit(error => 'Channel has already been opened'); |
31
|
0
|
|
|
|
|
|
return $self; |
32
|
|
|
|
|
|
|
} |
33
|
|
|
|
|
|
|
|
34
|
0
|
|
|
|
|
|
weaken $self; |
35
|
|
|
|
|
|
|
$self->client->_write_expect( |
36
|
|
|
|
|
|
|
'Channel::Open' => {}, |
37
|
|
|
|
|
|
|
'Channel::OpenOk' => sub { |
38
|
0
|
|
|
0
|
|
|
warn "-- Channel::OpenOk\n" if DEBUG; |
39
|
0
|
|
|
|
|
|
$self->is_open(1)->is_active(1)->emit('open'); |
40
|
|
|
|
|
|
|
}, |
41
|
|
|
|
|
|
|
sub { |
42
|
0
|
|
|
0
|
|
|
$self->emit( |
43
|
|
|
|
|
|
|
error => 'Invalid response received while trying to open channel: ' |
44
|
|
|
|
|
|
|
. shift); |
45
|
|
|
|
|
|
|
}, |
46
|
0
|
|
|
|
|
|
$self->id, |
47
|
|
|
|
|
|
|
); |
48
|
|
|
|
|
|
|
|
49
|
0
|
|
|
|
|
|
return $self; |
50
|
|
|
|
|
|
|
} |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
sub _push_queue_or_consume { |
53
|
0
|
|
|
0
|
|
|
my $self = shift; |
54
|
0
|
|
|
|
|
|
my ($frame) = @_; |
55
|
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
|
weaken $self; |
57
|
0
|
0
|
|
|
|
|
if ($frame->isa('Net::AMQP::Frame::Method')) { |
58
|
0
|
|
|
|
|
|
my $method_frame = $frame->method_frame; |
59
|
|
|
|
|
|
|
|
60
|
0
|
0
|
|
|
|
|
if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
61
|
0
|
|
|
|
|
|
$self->client->_write_frame(Net::AMQP::Protocol::Channel::CloseOk->new(), |
62
|
|
|
|
|
|
|
$self->id); |
63
|
0
|
|
|
|
|
|
$self->is_open(0)->is_active(0); |
64
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
65
|
0
|
|
|
|
|
|
$self->emit(close => $frame); |
66
|
|
|
|
|
|
|
|
67
|
0
|
|
|
|
|
|
return $self; |
68
|
|
|
|
|
|
|
} |
69
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { |
70
|
0
|
|
0
|
0
|
|
|
my $cb = $self->consumer_cbs->{$method_frame->consumer_tag} || sub { }; |
71
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
72
|
|
|
|
|
|
|
'deliver', |
73
|
|
|
|
|
|
|
$frame => sub { |
74
|
0
|
|
|
0
|
|
|
$cb->emit(message => @_); |
75
|
|
|
|
|
|
|
}, |
76
|
|
|
|
|
|
|
sub { |
77
|
0
|
|
|
0
|
|
|
$self->emit(error => 'Consumer callback failure: ' . shift); |
78
|
|
|
|
|
|
|
} |
79
|
0
|
|
|
|
|
|
); |
80
|
0
|
|
|
|
|
|
return $self; |
81
|
|
|
|
|
|
|
} |
82
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { |
83
|
|
|
|
|
|
|
my $cb |
84
|
|
|
|
|
|
|
= $self->return_cbs->{$method_frame->exchange . '_' |
85
|
|
|
|
|
|
|
. $method_frame->routing_key} |
86
|
0
|
|
0
|
0
|
|
|
|| sub { }; |
87
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
88
|
|
|
|
|
|
|
'return', |
89
|
|
|
|
|
|
|
$frame => sub { |
90
|
0
|
|
|
0
|
|
|
$cb->emit(reject => @_); |
91
|
|
|
|
|
|
|
}, |
92
|
|
|
|
|
|
|
sub { |
93
|
0
|
|
|
0
|
|
|
$self->emit(error => 'Return callback failure: ' . shift); |
94
|
|
|
|
|
|
|
} |
95
|
0
|
|
|
|
|
|
); |
96
|
0
|
|
|
|
|
|
return $self; |
97
|
|
|
|
|
|
|
} |
98
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { |
99
|
0
|
|
|
|
|
|
$self->is_active($method_frame->active); |
100
|
0
|
|
|
|
|
|
$self->client->_write_frame( |
101
|
|
|
|
|
|
|
Net::AMQP::Protocol::Channel::FlowOk->new( |
102
|
|
|
|
|
|
|
active => $method_frame->active |
103
|
|
|
|
|
|
|
), |
104
|
|
|
|
|
|
|
$self->id |
105
|
|
|
|
|
|
|
); |
106
|
|
|
|
|
|
|
|
107
|
0
|
|
|
|
|
|
return $self; |
108
|
|
|
|
|
|
|
} |
109
|
|
|
|
|
|
|
|
110
|
0
|
|
|
|
|
|
$self->queue->push($frame); |
111
|
|
|
|
|
|
|
} |
112
|
|
|
|
|
|
|
else { |
113
|
0
|
|
|
|
|
|
$self->content_queue->push($frame); |
114
|
|
|
|
|
|
|
} |
115
|
|
|
|
|
|
|
|
116
|
0
|
|
|
|
|
|
return $self; |
117
|
|
|
|
|
|
|
} |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
sub close { |
120
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
121
|
0
|
0
|
|
|
|
|
my $connection = $self->client or return; |
122
|
|
|
|
|
|
|
|
123
|
0
|
0
|
|
|
|
|
return $self if !$self->is_open; |
124
|
|
|
|
|
|
|
|
125
|
0
|
0
|
|
|
|
|
return $self->_close() if 0 == scalar keys %{$self->consumer_cbs}; |
|
0
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
|
127
|
0
|
|
|
|
|
|
for my $consumer_tag (keys %{$self->consumer_cbs}) { |
|
0
|
|
|
|
|
|
|
128
|
0
|
|
|
|
|
|
my $method = $self->cancel(consumer_tag => $consumer_tag); |
129
|
0
|
0
|
|
|
|
|
weaken $self unless isweak $self; |
130
|
|
|
|
|
|
|
$method->on( |
131
|
|
|
|
|
|
|
success => sub { |
132
|
0
|
|
|
0
|
|
|
$self->_close(); |
133
|
|
|
|
|
|
|
} |
134
|
0
|
|
|
|
|
|
); |
135
|
|
|
|
|
|
|
$method->catch( |
136
|
|
|
|
|
|
|
sub { |
137
|
0
|
|
|
0
|
|
|
$self->_close(); |
138
|
0
|
|
|
|
|
|
$self->emit(error => 'Error canceling consumption: ' . shift, @_); |
139
|
|
|
|
|
|
|
} |
140
|
0
|
|
|
|
|
|
); |
141
|
0
|
|
|
|
|
|
$method->deliver(); |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
|
return $self; |
145
|
|
|
|
|
|
|
} |
146
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
sub _close { |
148
|
0
|
|
|
0
|
|
|
my $self = shift; |
149
|
0
|
|
|
|
|
|
my %args = @_; |
150
|
|
|
|
|
|
|
|
151
|
0
|
0
|
|
|
|
|
return unless 0 == scalar keys %{$self->consumer_cbs}; |
|
0
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
$self->client->_write_expect( |
154
|
|
|
|
|
|
|
'Channel::Close' => {}, |
155
|
|
|
|
|
|
|
'Channel::CloseOk' => sub { |
156
|
0
|
|
|
0
|
|
|
warn "-- Channel::CloseOk\n" if DEBUG; |
157
|
0
|
|
|
|
|
|
$self->is_open(0)->is_active(0); |
158
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
159
|
0
|
|
|
|
|
|
$self->emit('close'); |
160
|
|
|
|
|
|
|
}, |
161
|
|
|
|
|
|
|
sub { |
162
|
0
|
|
|
0
|
|
|
$self->is_open(0)->is_active(0); |
163
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
164
|
0
|
|
|
|
|
|
$self->emit(error => 'Failed closing channel: ' . shift); |
165
|
|
|
|
|
|
|
}, |
166
|
0
|
|
|
|
|
|
$self->id, |
167
|
|
|
|
|
|
|
); |
168
|
|
|
|
|
|
|
|
169
|
0
|
|
|
|
|
|
return $self; |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
sub _assert_open { |
173
|
0
|
|
|
0
|
|
|
my $self = shift; |
174
|
|
|
|
|
|
|
|
175
|
0
|
0
|
0
|
|
|
|
return 0 unless $self->is_open and $self->is_active; |
176
|
|
|
|
|
|
|
|
177
|
0
|
|
|
|
|
|
return 1; |
178
|
|
|
|
|
|
|
} |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
sub _prepare_method { |
181
|
0
|
|
|
0
|
|
|
my $self = shift; |
182
|
|
|
|
|
|
|
|
183
|
0
|
|
|
|
|
|
my $method = Mojo::RabbitMQ::Client::Method->new( |
184
|
|
|
|
|
|
|
client => $self->client, |
185
|
|
|
|
|
|
|
channel => $self |
186
|
|
|
|
|
|
|
); |
187
|
0
|
|
|
|
|
|
weaken $method->{channel}; |
188
|
0
|
|
|
|
|
|
weaken $method->{client}; |
189
|
|
|
|
|
|
|
|
190
|
0
|
|
|
|
|
|
return $method->setup(@_); |
191
|
|
|
|
|
|
|
} |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
sub declare_exchange { |
194
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
return $self->_prepare_method( |
197
|
|
|
|
|
|
|
'Exchange::Declare' => { |
198
|
|
|
|
|
|
|
type => 'direct', |
199
|
|
|
|
|
|
|
passive => 0, |
200
|
|
|
|
|
|
|
durable => 0, |
201
|
|
|
|
|
|
|
auto_delete => 0, |
202
|
|
|
|
|
|
|
internal => 0, |
203
|
|
|
|
|
|
|
@_, # exchange |
204
|
|
|
|
|
|
|
ticket => 0, |
205
|
|
|
|
|
|
|
nowait => 0, # FIXME |
206
|
|
|
|
|
|
|
}, |
207
|
|
|
|
|
|
|
'Exchange::DeclareOk' => sub { |
208
|
0
|
|
|
0
|
|
|
warn "-- Exchange::DeclareOk\n" if DEBUG; |
209
|
|
|
|
|
|
|
} |
210
|
0
|
|
|
|
|
|
); |
211
|
|
|
|
|
|
|
} |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
sub declare_exchange_p { |
214
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
215
|
|
|
|
|
|
|
|
216
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
217
|
0
|
|
|
|
|
|
my $method = $self->declare_exchange(@_); |
218
|
0
|
|
|
|
|
|
weaken $self; |
219
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
220
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
221
|
0
|
|
|
|
|
|
$method->deliver; |
222
|
|
|
|
|
|
|
|
223
|
0
|
|
|
|
|
|
return $promise; |
224
|
|
|
|
|
|
|
} |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
sub delete_exchange { |
227
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
return $self->_prepare_method( |
230
|
|
|
|
|
|
|
'Exchange::Delete' => { |
231
|
|
|
|
|
|
|
if_unused => 0, |
232
|
|
|
|
|
|
|
@_, # exchange |
233
|
|
|
|
|
|
|
ticket => 0, |
234
|
|
|
|
|
|
|
nowait => 0, # FIXME |
235
|
|
|
|
|
|
|
}, |
236
|
|
|
|
|
|
|
'Exchange::DeleteOk' => sub { |
237
|
0
|
|
|
0
|
|
|
warn "-- Exchange::DeleteOk\n" if DEBUG; |
238
|
|
|
|
|
|
|
} |
239
|
0
|
|
|
|
|
|
); |
240
|
|
|
|
|
|
|
} |
241
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
sub delete_exchange_p { |
243
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
244
|
|
|
|
|
|
|
|
245
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
246
|
0
|
|
|
|
|
|
my $method = $self->delete_exchange(@_); |
247
|
0
|
|
|
|
|
|
weaken $self; |
248
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
249
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
250
|
0
|
|
|
|
|
|
$method->deliver; |
251
|
|
|
|
|
|
|
|
252
|
0
|
|
|
|
|
|
return $promise; |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
sub declare_queue { |
256
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
return $self->_prepare_method( |
259
|
|
|
|
|
|
|
'Queue::Declare' => { |
260
|
|
|
|
|
|
|
queue => '', |
261
|
|
|
|
|
|
|
passive => 0, |
262
|
|
|
|
|
|
|
durable => 0, |
263
|
|
|
|
|
|
|
exclusive => 0, |
264
|
|
|
|
|
|
|
auto_delete => 0, |
265
|
|
|
|
|
|
|
no_ack => 1, |
266
|
|
|
|
|
|
|
@_, |
267
|
|
|
|
|
|
|
ticket => 0, |
268
|
|
|
|
|
|
|
nowait => 0, # FIXME |
269
|
|
|
|
|
|
|
}, |
270
|
|
|
|
|
|
|
'Queue::DeclareOk' => sub { |
271
|
0
|
|
|
0
|
|
|
warn "-- Queue::DeclareOk\n" if DEBUG; |
272
|
|
|
|
|
|
|
} |
273
|
0
|
|
|
|
|
|
); |
274
|
|
|
|
|
|
|
} |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
sub declare_queue_p { |
277
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
278
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
280
|
0
|
|
|
|
|
|
my $method = $self->declare_queue(@_); |
281
|
0
|
|
|
|
|
|
weaken $self; |
282
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
283
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
284
|
0
|
|
|
|
|
|
$method->deliver; |
285
|
|
|
|
|
|
|
|
286
|
0
|
|
|
|
|
|
return $promise; |
287
|
|
|
|
|
|
|
} |
288
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
sub bind_queue { |
290
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
291
|
|
|
|
|
|
|
|
292
|
|
|
|
|
|
|
return $self->_prepare_method( |
293
|
|
|
|
|
|
|
'Queue::Bind' => { |
294
|
|
|
|
|
|
|
@_, # queue, exchange, routing_key |
295
|
|
|
|
|
|
|
ticket => 0, |
296
|
|
|
|
|
|
|
nowait => 0, # FIXME |
297
|
|
|
|
|
|
|
}, |
298
|
|
|
|
|
|
|
'Queue::BindOk' => sub { |
299
|
0
|
|
|
0
|
|
|
warn "-- Queue::BindOk\n" if DEBUG; |
300
|
|
|
|
|
|
|
} |
301
|
0
|
|
|
|
|
|
); |
302
|
|
|
|
|
|
|
} |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
sub bind_queue_p { |
305
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
306
|
|
|
|
|
|
|
|
307
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
308
|
0
|
|
|
|
|
|
my $method = $self->bind_queue(@_); |
309
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
310
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
311
|
0
|
|
|
|
|
|
$method->deliver; |
312
|
|
|
|
|
|
|
|
313
|
0
|
|
|
|
|
|
return $promise; |
314
|
|
|
|
|
|
|
} |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
sub unbind_queue { |
317
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
return $self->_prepare_method( |
320
|
|
|
|
|
|
|
'Queue::Unbind' => { |
321
|
|
|
|
|
|
|
@_, # queue, exchange, routing_key |
322
|
|
|
|
|
|
|
ticket => 0, |
323
|
|
|
|
|
|
|
}, |
324
|
|
|
|
|
|
|
'Queue::UnbindOk' => sub { |
325
|
0
|
|
|
0
|
|
|
warn "-- Queue::UnbindOk\n" if DEBUG; |
326
|
|
|
|
|
|
|
} |
327
|
0
|
|
|
|
|
|
); |
328
|
|
|
|
|
|
|
} |
329
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
sub unbind_queue_p { |
331
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
332
|
|
|
|
|
|
|
|
333
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
334
|
0
|
|
|
|
|
|
my $method = $self->unbind_queue(@_); |
335
|
0
|
|
|
|
|
|
weaken $self; |
336
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
337
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
338
|
0
|
|
|
|
|
|
$method->deliver; |
339
|
|
|
|
|
|
|
|
340
|
0
|
|
|
|
|
|
return $promise; |
341
|
|
|
|
|
|
|
} |
342
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
sub purge_queue { |
344
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
return $self->_prepare_method( |
347
|
|
|
|
|
|
|
'Queue::Purge' => { |
348
|
|
|
|
|
|
|
@_, # queue |
349
|
|
|
|
|
|
|
ticket => 0, |
350
|
|
|
|
|
|
|
nowait => 0, # FIXME |
351
|
|
|
|
|
|
|
}, |
352
|
|
|
|
|
|
|
'Queue::PurgeOk' => sub { |
353
|
0
|
|
|
0
|
|
|
warn "-- Queue::PurgeOk\n" if DEBUG; |
354
|
|
|
|
|
|
|
} |
355
|
0
|
|
|
|
|
|
); |
356
|
|
|
|
|
|
|
} |
357
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
sub purge_queue_p { |
359
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
360
|
|
|
|
|
|
|
|
361
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
362
|
0
|
|
|
|
|
|
my $method = $self->purge_queue(@_); |
363
|
0
|
|
|
|
|
|
weaken $self; |
364
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
365
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
$method->deliver; |
367
|
|
|
|
|
|
|
|
368
|
0
|
|
|
|
|
|
return $promise; |
369
|
|
|
|
|
|
|
} |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
sub delete_queue { |
372
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
return $self->_prepare_method( |
375
|
|
|
|
|
|
|
'Queue::Delete' => { |
376
|
|
|
|
|
|
|
if_unused => 0, |
377
|
|
|
|
|
|
|
if_empty => 0, |
378
|
|
|
|
|
|
|
@_, # queue |
379
|
|
|
|
|
|
|
ticket => 0, |
380
|
|
|
|
|
|
|
nowait => 0, # FIXME |
381
|
|
|
|
|
|
|
}, |
382
|
|
|
|
|
|
|
'Queue::DeleteOk' => sub { |
383
|
0
|
|
|
0
|
|
|
warn "-- Queue::DeleteOk\n" if DEBUG; |
384
|
|
|
|
|
|
|
} |
385
|
0
|
|
|
|
|
|
); |
386
|
|
|
|
|
|
|
} |
387
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
sub delete_queue_p { |
389
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
390
|
|
|
|
|
|
|
|
391
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
392
|
0
|
|
|
|
|
|
my $method = $self->delete_queue(@_); |
393
|
0
|
|
|
|
|
|
weaken $self; |
394
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
395
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
396
|
0
|
|
|
|
|
|
$method->deliver; |
397
|
|
|
|
|
|
|
|
398
|
0
|
|
|
|
|
|
return $promise; |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
sub publish { |
402
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
403
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
|
return Mojo::RabbitMQ::Client::Method::Publish->new( |
405
|
|
|
|
|
|
|
client => $self->client, |
406
|
|
|
|
|
|
|
channel => $self |
407
|
|
|
|
|
|
|
)->setup(@_); |
408
|
|
|
|
|
|
|
} |
409
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
sub publish_p { |
411
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
412
|
|
|
|
|
|
|
|
413
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
414
|
0
|
|
|
|
|
|
my $method = Mojo::RabbitMQ::Client::Method::Publish->new( |
415
|
|
|
|
|
|
|
client => $self->client, |
416
|
|
|
|
|
|
|
channel => $self |
417
|
|
|
|
|
|
|
); |
418
|
0
|
|
|
|
|
|
weaken $method->{client}; |
419
|
0
|
|
|
|
|
|
weaken $method->{channel}; |
420
|
0
|
|
|
|
|
|
$method->setup(@_); |
421
|
0
|
|
|
|
|
|
weaken $self; |
422
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
423
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
424
|
0
|
|
|
|
|
|
$method->deliver; |
425
|
|
|
|
|
|
|
|
426
|
0
|
|
|
|
|
|
return $promise; |
427
|
|
|
|
|
|
|
} |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
sub consume { |
430
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
431
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
my $method = $self->_prepare_method( |
433
|
|
|
|
|
|
|
'Basic::Consume' => { |
434
|
|
|
|
|
|
|
consumer_tag => '', |
435
|
|
|
|
|
|
|
no_local => 0, |
436
|
|
|
|
|
|
|
no_ack => 1, |
437
|
|
|
|
|
|
|
exclusive => 0, |
438
|
|
|
|
|
|
|
@_, |
439
|
|
|
|
|
|
|
ticket => 0, |
440
|
|
|
|
|
|
|
nowait => 0 |
441
|
|
|
|
|
|
|
}, |
442
|
|
|
|
|
|
|
'Basic::ConsumeOk' => sub { |
443
|
0
|
|
|
0
|
|
|
warn "-- Basic::ConsumeOk\n" if DEBUG; |
444
|
|
|
|
|
|
|
} |
445
|
0
|
|
|
|
|
|
); |
446
|
0
|
|
|
|
|
|
weaken $self; |
447
|
|
|
|
|
|
|
$method->on( |
448
|
|
|
|
|
|
|
success => sub { |
449
|
0
|
|
|
0
|
|
|
my $this = shift; |
450
|
0
|
|
|
|
|
|
my $frame = shift; |
451
|
0
|
|
|
|
|
|
my $tag = $frame->method_frame->consumer_tag; |
452
|
|
|
|
|
|
|
|
453
|
0
|
|
|
|
|
|
$self->consumer_cbs->{$tag} = $this; |
454
|
|
|
|
|
|
|
} |
455
|
0
|
|
|
|
|
|
); |
456
|
|
|
|
|
|
|
|
457
|
0
|
|
|
|
|
|
return $method; |
458
|
|
|
|
|
|
|
} |
459
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
sub cancel { |
461
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
462
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
my $method = $self->_prepare_method( |
464
|
|
|
|
|
|
|
'Basic::Cancel', |
465
|
|
|
|
|
|
|
{ |
466
|
|
|
|
|
|
|
@_, # consumer_tag |
467
|
|
|
|
|
|
|
nowait => 0, |
468
|
|
|
|
|
|
|
}, |
469
|
|
|
|
|
|
|
'Basic::CancelOk' => sub { |
470
|
0
|
|
|
0
|
|
|
warn "-- Basic::CancelOk\n" if DEBUG; |
471
|
|
|
|
|
|
|
} |
472
|
0
|
|
|
|
|
|
); |
473
|
0
|
|
|
|
|
|
weaken $self; |
474
|
|
|
|
|
|
|
$method->on( |
475
|
|
|
|
|
|
|
success => sub { |
476
|
0
|
|
|
0
|
|
|
my $this = shift; |
477
|
0
|
|
|
|
|
|
my $frame = shift; |
478
|
0
|
|
|
|
|
|
delete $self->consumer_cbs->{$frame->method_frame->consumer_tag}; |
479
|
|
|
|
|
|
|
} |
480
|
0
|
|
|
|
|
|
); |
481
|
0
|
|
|
|
|
|
return $method; |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
sub get { |
485
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
486
|
|
|
|
|
|
|
|
487
|
0
|
|
|
|
|
|
my $method = $self->_prepare_method( |
488
|
|
|
|
|
|
|
'Basic::Get', |
489
|
|
|
|
|
|
|
{ |
490
|
|
|
|
|
|
|
no_ack => 1, |
491
|
|
|
|
|
|
|
@_, # queue |
492
|
|
|
|
|
|
|
ticket => 0, |
493
|
|
|
|
|
|
|
}, |
494
|
|
|
|
|
|
|
[qw(Basic::GetOk Basic::GetEmpty)] |
495
|
|
|
|
|
|
|
); |
496
|
0
|
|
|
|
|
|
weaken $self; |
497
|
|
|
|
|
|
|
$method->on( |
498
|
|
|
|
|
|
|
success => sub { |
499
|
0
|
|
|
0
|
|
|
warn "-- Basic::GetOk|GetEmpty\n" if DEBUG; |
500
|
0
|
|
|
|
|
|
my $this = shift; |
501
|
0
|
|
|
|
|
|
my $frame = shift; |
502
|
|
|
|
|
|
|
|
503
|
0
|
0
|
|
|
|
|
$this->emit(empty => $frame) |
504
|
|
|
|
|
|
|
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty'); |
505
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
506
|
|
|
|
|
|
|
'ok', $frame, |
507
|
|
|
|
|
|
|
sub { |
508
|
0
|
|
|
|
|
|
$this->emit(message => $frame, @_); |
509
|
|
|
|
|
|
|
}, |
510
|
|
|
|
|
|
|
sub { |
511
|
0
|
|
|
|
|
|
$this->emit(error => 'Failed to get messages from queue'); |
512
|
|
|
|
|
|
|
} |
513
|
0
|
|
|
|
|
|
); |
514
|
|
|
|
|
|
|
} |
515
|
0
|
|
|
|
|
|
); |
516
|
|
|
|
|
|
|
|
517
|
0
|
|
|
|
|
|
return $method; |
518
|
|
|
|
|
|
|
} |
519
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
sub get_p { |
521
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
522
|
|
|
|
|
|
|
|
523
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
524
|
0
|
|
|
|
|
|
my $method = $self->get(@_); |
525
|
0
|
|
|
|
|
|
weaken $self; |
526
|
0
|
|
|
0
|
|
|
$method->on('message' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
527
|
0
|
|
|
0
|
|
|
$method->on('empty' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
528
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
529
|
0
|
|
|
|
|
|
$method->deliver; |
530
|
|
|
|
|
|
|
|
531
|
0
|
|
|
|
|
|
return $promise; |
532
|
|
|
|
|
|
|
} |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
sub ack { |
535
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
536
|
0
|
|
|
|
|
|
my %args = (); |
537
|
0
|
0
|
|
|
|
|
if (ref($_[0]) eq 'HASH') { |
538
|
0
|
0
|
|
|
|
|
if (defined $_[0]->{ok}) { |
|
|
0
|
|
|
|
|
|
539
|
0
|
|
|
|
|
|
$args{delivery_tag} = $_[0]->{ok}->method_frame->delivery_tag; |
540
|
|
|
|
|
|
|
} elsif (defined $_[0]->{deliver}) { |
541
|
0
|
|
|
|
|
|
$args{delivery_tag} = $_[0]->{deliver}->method_frame->delivery_tag; |
542
|
|
|
|
|
|
|
} |
543
|
|
|
|
|
|
|
} else { |
544
|
0
|
|
|
|
|
|
%args = @_; |
545
|
|
|
|
|
|
|
} |
546
|
|
|
|
|
|
|
|
547
|
0
|
0
|
|
|
|
|
die "ack requires delivery_tag in arguments" unless defined $args{delivery_tag}; |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
return $self->_prepare_method( |
550
|
|
|
|
|
|
|
'Basic::Ack' => { |
551
|
|
|
|
|
|
|
delivery_tag => 0, |
552
|
|
|
|
|
|
|
multiple => |
553
|
0
|
0
|
0
|
|
|
|
(defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1), |
554
|
|
|
|
|
|
|
%args, |
555
|
|
|
|
|
|
|
} |
556
|
|
|
|
|
|
|
); |
557
|
|
|
|
|
|
|
} |
558
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
sub ack_p { |
560
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
561
|
|
|
|
|
|
|
|
562
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
563
|
0
|
|
|
|
|
|
my $method = $self->ack(@_); |
564
|
0
|
|
|
|
|
|
weaken $self; |
565
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
566
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
567
|
0
|
|
|
|
|
|
$method->deliver; |
568
|
|
|
|
|
|
|
|
569
|
0
|
|
|
|
|
|
return $promise; |
570
|
|
|
|
|
|
|
} |
571
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
sub qos { |
573
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
574
|
|
|
|
|
|
|
|
575
|
0
|
|
|
|
|
|
return $self->_prepare_method('Basic::Qos', |
576
|
|
|
|
|
|
|
{prefetch_count => 1, @_, prefetch_size => 0, global => 0,}, |
577
|
|
|
|
|
|
|
'Basic::QosOk'); |
578
|
|
|
|
|
|
|
} |
579
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
sub recover { |
581
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
582
|
|
|
|
|
|
|
|
583
|
0
|
|
|
|
|
|
return $self->_prepare_method('Basic::Recover' => {requeue => 1, @_,}); |
584
|
|
|
|
|
|
|
} |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
sub reject { |
587
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
588
|
|
|
|
|
|
|
|
589
|
0
|
|
|
|
|
|
return $self->_prepare_method( |
590
|
|
|
|
|
|
|
'Basic::Reject' => {delivery_tag => 0, requeue => 0, @_,}); |
591
|
|
|
|
|
|
|
} |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
sub select_tx { |
594
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
595
|
|
|
|
|
|
|
|
596
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Select', {}, 'Tx::SelectOk'); |
597
|
|
|
|
|
|
|
} |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
sub commit_tx { |
600
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
601
|
|
|
|
|
|
|
|
602
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Commit', {}, 'Tx::CommitOk'); |
603
|
|
|
|
|
|
|
} |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
sub rollback_tx { |
606
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
607
|
|
|
|
|
|
|
|
608
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Rollback', {}, 'Tx::RollbackOk'); |
609
|
|
|
|
|
|
|
} |
610
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
sub _push_read_header_and_body { |
612
|
0
|
|
|
0
|
|
|
my $self = shift; |
613
|
0
|
|
|
|
|
|
my ($type, $frame, $cb, $failure_cb) = @_; |
614
|
0
|
|
|
|
|
|
my $response = {$type => $frame}; |
615
|
0
|
|
|
|
|
|
my $body_size = 0; |
616
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
$self->content_queue->get( |
618
|
|
|
|
|
|
|
sub { |
619
|
0
|
|
|
0
|
|
|
my $frame = shift; |
620
|
|
|
|
|
|
|
|
621
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not header frame') |
622
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Header'); |
623
|
|
|
|
|
|
|
|
624
|
0
|
|
|
|
|
|
my $header_frame = $frame->header_frame; |
625
|
0
|
0
|
|
|
|
|
return $failure_cb->('Header is not Protocol::Basic::ContentHeader' |
626
|
|
|
|
|
|
|
. 'Header was ' |
627
|
|
|
|
|
|
|
. ref $header_frame) |
628
|
|
|
|
|
|
|
if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); |
629
|
|
|
|
|
|
|
|
630
|
0
|
|
|
|
|
|
$response->{header} = $header_frame; |
631
|
0
|
|
|
|
|
|
$body_size = $frame->body_size; |
632
|
|
|
|
|
|
|
} |
633
|
0
|
|
|
|
|
|
); |
634
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
|
my $body_payload = ""; |
636
|
0
|
|
|
|
|
|
my $next_frame; |
637
|
|
|
|
|
|
|
$next_frame = sub { |
638
|
0
|
|
|
0
|
|
|
my $frame = shift; |
639
|
|
|
|
|
|
|
|
640
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not body frame') |
641
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Body'); |
642
|
|
|
|
|
|
|
|
643
|
0
|
|
|
|
|
|
$body_payload .= $frame->payload; |
644
|
|
|
|
|
|
|
|
645
|
0
|
0
|
|
|
|
|
if (length($body_payload) < $body_size) { |
646
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
# More to come |
648
|
0
|
|
|
|
|
|
$self->content_queue->get($next_frame); |
649
|
|
|
|
|
|
|
} |
650
|
|
|
|
|
|
|
else { |
651
|
0
|
|
|
|
|
|
$frame->payload($body_payload); |
652
|
0
|
|
|
|
|
|
$response->{body} = $frame; |
653
|
0
|
|
|
|
|
|
$cb->($response); |
654
|
|
|
|
|
|
|
} |
655
|
0
|
|
|
|
|
|
}; |
656
|
|
|
|
|
|
|
|
657
|
0
|
|
|
|
|
|
$self->content_queue->get($next_frame); |
658
|
|
|
|
|
|
|
|
659
|
0
|
|
|
|
|
|
return $self; |
660
|
|
|
|
|
|
|
} |
661
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
sub DESTROY { |
663
|
0
|
|
|
0
|
|
|
my $self = shift; |
664
|
0
|
0
|
|
|
|
|
$self->close() if defined $self; |
665
|
0
|
|
|
|
|
|
return; |
666
|
|
|
|
|
|
|
} |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
1; |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
=encoding utf8 |
671
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
=head1 NAME |
673
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
Mojo::RabbitMQ::Client::Channel - handles all channel related methods |
675
|
|
|
|
|
|
|
|
676
|
|
|
|
|
|
|
=head1 SYNOPSIS |
677
|
|
|
|
|
|
|
|
678
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client::Channel; |
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
$channel->catch(sub { warn "Some channel error occurred: " . $_[1] }); |
683
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
$channel->on( |
685
|
|
|
|
|
|
|
open => sub { |
686
|
|
|
|
|
|
|
my ($channel) = @_; |
687
|
|
|
|
|
|
|
... |
688
|
|
|
|
|
|
|
} |
689
|
|
|
|
|
|
|
); |
690
|
|
|
|
|
|
|
$channel->on(close => sub { warn "Channel closed" }); |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
$client->open_channel($channel); |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
=head1 DESCRIPTION |
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
L allows one to call all channel related methods. |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
=head1 EVENTS |
699
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
L inherits all events from L and can emit the |
701
|
|
|
|
|
|
|
following new ones. |
702
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
=head2 open |
704
|
|
|
|
|
|
|
|
705
|
|
|
|
|
|
|
$channel->on(open => sub { |
706
|
|
|
|
|
|
|
my ($channel) = @_; |
707
|
|
|
|
|
|
|
... |
708
|
|
|
|
|
|
|
}); |
709
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
Emitted when channel receives Open-Ok. |
711
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
=head2 close |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
$channel->on(close=> sub { |
715
|
|
|
|
|
|
|
my ($channel, $frame) = @_; |
716
|
|
|
|
|
|
|
... |
717
|
|
|
|
|
|
|
}); |
718
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
Emitted when channel gets closed, C<$frame> contains close reason. |
720
|
|
|
|
|
|
|
|
721
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
722
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
L has following attributes. |
724
|
|
|
|
|
|
|
|
725
|
|
|
|
|
|
|
=head2 id |
726
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
my $id = $channel->id; |
728
|
|
|
|
|
|
|
$channel->id(20810); |
729
|
|
|
|
|
|
|
|
730
|
|
|
|
|
|
|
If not set, L sets it to next free number when channel is opened. |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=head2 is_open |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
$channel->is_open ? "Channel is open" : "Channel is closed"; |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
=head2 is_active |
737
|
|
|
|
|
|
|
|
738
|
|
|
|
|
|
|
$channel->is_active ? "Channel is active" : "Channel is not active"; |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
This can be modified on reception of Channel-Flow. |
741
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
=head2 client |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
my $client = $channel->client; |
745
|
|
|
|
|
|
|
$channel->client($client); |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
=head1 METHODS |
748
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
L inherits all methods from L and implements |
750
|
|
|
|
|
|
|
the following new ones. |
751
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
=head2 close |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
$channel->close; |
755
|
|
|
|
|
|
|
|
756
|
|
|
|
|
|
|
Cancels all consumers and closes channel afterwards. |
757
|
|
|
|
|
|
|
|
758
|
|
|
|
|
|
|
=head2 declare_exchange |
759
|
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
my $exchange = $channel->declare_exchange( |
761
|
|
|
|
|
|
|
exchange => 'mojo', |
762
|
|
|
|
|
|
|
type => 'fanout', |
763
|
|
|
|
|
|
|
durable => 1, |
764
|
|
|
|
|
|
|
... |
765
|
|
|
|
|
|
|
)->deliver; |
766
|
|
|
|
|
|
|
|
767
|
|
|
|
|
|
|
Verify exchange exists, create if needed. |
768
|
|
|
|
|
|
|
|
769
|
|
|
|
|
|
|
This method creates an exchange if it does not already exist, and if the |
770
|
|
|
|
|
|
|
exchange exists, verifies that it is of the correct and expected class. |
771
|
|
|
|
|
|
|
|
772
|
|
|
|
|
|
|
Following arguments are accepted: |
773
|
|
|
|
|
|
|
|
774
|
|
|
|
|
|
|
=over 2 |
775
|
|
|
|
|
|
|
|
776
|
|
|
|
|
|
|
=item exchange |
777
|
|
|
|
|
|
|
|
778
|
|
|
|
|
|
|
Unique exchange name |
779
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
=item type |
781
|
|
|
|
|
|
|
|
782
|
|
|
|
|
|
|
Each exchange belongs to one of a set of exchange types implemented by the server. The |
783
|
|
|
|
|
|
|
exchange types define the functionality of the exchange - i.e. how messages are routed |
784
|
|
|
|
|
|
|
through it. It is not valid or meaningful to attempt to change the type of an existing |
785
|
|
|
|
|
|
|
exchange. |
786
|
|
|
|
|
|
|
|
787
|
|
|
|
|
|
|
=item passive |
788
|
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
If set, the server will reply with Declare-Ok if the exchange already exists with the same |
790
|
|
|
|
|
|
|
name, and raise an error if not. The client can use this to check whether an exchange |
791
|
|
|
|
|
|
|
exists without modifying the server state. When set, all other method fields except name |
792
|
|
|
|
|
|
|
and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments |
793
|
|
|
|
|
|
|
are compared for semantic equivalence. |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
=item durable |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges |
798
|
|
|
|
|
|
|
remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged |
799
|
|
|
|
|
|
|
if/when a server restarts. |
800
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
=item auto_delete |
802
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
If set, the exchange is deleted when all queues have finished using it. |
804
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
=item internal |
806
|
|
|
|
|
|
|
|
807
|
|
|
|
|
|
|
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. |
808
|
|
|
|
|
|
|
Internal exchanges are used to construct wiring that is not visible to applications. |
809
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
=back |
811
|
|
|
|
|
|
|
|
812
|
|
|
|
|
|
|
=head2 declare_exchange_p |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
$channel->declare_exchange_p( |
817
|
|
|
|
|
|
|
exchange => 'mojo', |
818
|
|
|
|
|
|
|
type => 'fanout', |
819
|
|
|
|
|
|
|
durable => 1, |
820
|
|
|
|
|
|
|
... |
821
|
|
|
|
|
|
|
)->then(sub { |
822
|
|
|
|
|
|
|
say "Exchange declared..."; |
823
|
|
|
|
|
|
|
})->catch(sub { |
824
|
|
|
|
|
|
|
my $err = shift; |
825
|
|
|
|
|
|
|
warn "Exchange declaration error: $err"; |
826
|
|
|
|
|
|
|
})->wait; |
827
|
|
|
|
|
|
|
|
828
|
|
|
|
|
|
|
=head2 delete_exchange |
829
|
|
|
|
|
|
|
|
830
|
|
|
|
|
|
|
$channel->delete_exchange(exchange => 'mojo')->deliver; |
831
|
|
|
|
|
|
|
|
832
|
|
|
|
|
|
|
Delete an exchange. |
833
|
|
|
|
|
|
|
|
834
|
|
|
|
|
|
|
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange |
835
|
|
|
|
|
|
|
are cancelled. |
836
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
Following arguments are accepted: |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
=over 2 |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
=item exchange |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
Exchange name. |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
=item if_unused |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
If set, the server will only delete the exchange if it has no queue bindings. If the exchange has |
848
|
|
|
|
|
|
|
queue bindings the server does not delete it but raises a channel exception instead. |
849
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
=back |
851
|
|
|
|
|
|
|
|
852
|
|
|
|
|
|
|
=head2 delete_exchange_p |
853
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
855
|
|
|
|
|
|
|
|
856
|
|
|
|
|
|
|
$channel->delete_exchange_p( |
857
|
|
|
|
|
|
|
exchange => 'mojo' |
858
|
|
|
|
|
|
|
)->then(sub { |
859
|
|
|
|
|
|
|
say "Exchange deleted..."; |
860
|
|
|
|
|
|
|
})->catch(sub { |
861
|
|
|
|
|
|
|
my $err = shift; |
862
|
|
|
|
|
|
|
warn "Exchange removal error: $err"; |
863
|
|
|
|
|
|
|
})->wait; |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
=head2 declare_queue |
866
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
my $queue = $channel->declare_queue(queue => 'mq', durable => 1)->deliver |
868
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
Declare queue, create if needed. |
870
|
|
|
|
|
|
|
|
871
|
|
|
|
|
|
|
This method creates or checks a queue. When creating a new queue the client can |
872
|
|
|
|
|
|
|
specify various properties that control the durability of the queue and its contents, |
873
|
|
|
|
|
|
|
and the level of sharing for the queue. |
874
|
|
|
|
|
|
|
|
875
|
|
|
|
|
|
|
Following arguments are accepted: |
876
|
|
|
|
|
|
|
|
877
|
|
|
|
|
|
|
=over 2 |
878
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
=item queue |
880
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
The queue name MAY be empty, in which case the server MUST create a new queue with |
882
|
|
|
|
|
|
|
a unique generated name and return this to the client in the Declare-Ok method. |
883
|
|
|
|
|
|
|
|
884
|
|
|
|
|
|
|
=item passive |
885
|
|
|
|
|
|
|
|
886
|
|
|
|
|
|
|
If set, the server will reply with Declare-Ok if the queue already exists with the same |
887
|
|
|
|
|
|
|
name, and raise an error if not. The client can use this to check whether a queue exists |
888
|
|
|
|
|
|
|
without modifying the server state. When set, all other method fields except name and |
889
|
|
|
|
|
|
|
no-wait are ignored. A declare with both passive and no-wait has no effect. |
890
|
|
|
|
|
|
|
Arguments are compared for semantic equivalence. |
891
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
=item durable |
893
|
|
|
|
|
|
|
|
894
|
|
|
|
|
|
|
If set when creating a new queue, the queue will be marked as durable. Durable queues |
895
|
|
|
|
|
|
|
remain active when a server restarts. Non-durable queues (transient queues) are purged |
896
|
|
|
|
|
|
|
if/when a server restarts. Note that durable queues do not necessarily hold persistent |
897
|
|
|
|
|
|
|
messages, although it does not make sense to send persistent messages to a transient queue. |
898
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
=item exclusive |
900
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
Exclusive queues may only be accessed by the current connection, and are deleted when |
902
|
|
|
|
|
|
|
that connection closes. Passive declaration of an exclusive queue by other connections are |
903
|
|
|
|
|
|
|
not allowed. |
904
|
|
|
|
|
|
|
|
905
|
|
|
|
|
|
|
=item auto_delete |
906
|
|
|
|
|
|
|
|
907
|
|
|
|
|
|
|
If set, the queue is deleted when all consumers have finished using it. The last consumer |
908
|
|
|
|
|
|
|
can be cancelled either explicitly or because its channel is closed. If there was no consumer |
909
|
|
|
|
|
|
|
ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues |
910
|
|
|
|
|
|
|
using the Delete method as normal. |
911
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
=back |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
=head2 declare_queue_p |
915
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
$channel->declare_queue_p( |
919
|
|
|
|
|
|
|
queue => 'mq', |
920
|
|
|
|
|
|
|
durable => 1 |
921
|
|
|
|
|
|
|
)->then(sub { |
922
|
|
|
|
|
|
|
say "Queue declared..."; |
923
|
|
|
|
|
|
|
})->catch(sub { |
924
|
|
|
|
|
|
|
my $err = shift; |
925
|
|
|
|
|
|
|
warn "Queue declaration error: $err"; |
926
|
|
|
|
|
|
|
})->wait; |
927
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
=head2 bind_queue |
929
|
|
|
|
|
|
|
|
930
|
|
|
|
|
|
|
$channel->bind_queue( |
931
|
|
|
|
|
|
|
exchange => 'mojo', |
932
|
|
|
|
|
|
|
queue => 'mq', |
933
|
|
|
|
|
|
|
routing_key => '' |
934
|
|
|
|
|
|
|
)->deliver; |
935
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
Bind queue to an exchange. |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
This method binds a queue to an exchange. Until a queue is bound it will |
939
|
|
|
|
|
|
|
not receive any messages. In a classic messaging model, store-and-forward |
940
|
|
|
|
|
|
|
queues are bound to a direct exchange and subscription queues are bound |
941
|
|
|
|
|
|
|
to a topic exchange. |
942
|
|
|
|
|
|
|
|
943
|
|
|
|
|
|
|
Following arguments are accepted: |
944
|
|
|
|
|
|
|
|
945
|
|
|
|
|
|
|
=over 2 |
946
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
=item queue |
948
|
|
|
|
|
|
|
|
949
|
|
|
|
|
|
|
Specifies the name of the queue to bind. |
950
|
|
|
|
|
|
|
|
951
|
|
|
|
|
|
|
=item exchange |
952
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
Name of the exchange to bind to. |
954
|
|
|
|
|
|
|
|
955
|
|
|
|
|
|
|
=item routing_key |
956
|
|
|
|
|
|
|
|
957
|
|
|
|
|
|
|
Specifies the routing key for the binding. The routing key is used for |
958
|
|
|
|
|
|
|
routing messages depending on the exchange configuration. Not all exchanges |
959
|
|
|
|
|
|
|
use a routing key - refer to the specific exchange documentation. If the |
960
|
|
|
|
|
|
|
queue name is empty, the server uses the last queue declared on the channel. |
961
|
|
|
|
|
|
|
If the routing key is also empty, the server uses this queue name for the |
962
|
|
|
|
|
|
|
routing key as well. If the queue name is provided but the routing key is |
963
|
|
|
|
|
|
|
empty, the server does the binding with that empty routing key. The meaning |
964
|
|
|
|
|
|
|
of empty routing keys depends on the exchange implementation. |
965
|
|
|
|
|
|
|
|
966
|
|
|
|
|
|
|
=back |
967
|
|
|
|
|
|
|
|
968
|
|
|
|
|
|
|
=head2 bind_queue_p |
969
|
|
|
|
|
|
|
|
970
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
971
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
$channel->bind_queue_p( |
973
|
|
|
|
|
|
|
exchange => 'mojo', |
974
|
|
|
|
|
|
|
queue => 'mq', |
975
|
|
|
|
|
|
|
routing_key => '' |
976
|
|
|
|
|
|
|
)->then(sub { |
977
|
|
|
|
|
|
|
say "Queue bound..."; |
978
|
|
|
|
|
|
|
})->catch(sub { |
979
|
|
|
|
|
|
|
my $err = shift; |
980
|
|
|
|
|
|
|
warn "Queue binding error: $err"; |
981
|
|
|
|
|
|
|
})->wait; |
982
|
|
|
|
|
|
|
|
983
|
|
|
|
|
|
|
=head2 unbind_queue |
984
|
|
|
|
|
|
|
|
985
|
|
|
|
|
|
|
$channel->unbind_queue( |
986
|
|
|
|
|
|
|
exchange => 'mojo', |
987
|
|
|
|
|
|
|
queue => 'mq', |
988
|
|
|
|
|
|
|
routing_key => '' |
989
|
|
|
|
|
|
|
)->deliver; |
990
|
|
|
|
|
|
|
|
991
|
|
|
|
|
|
|
Unbind a queue from an exchange. |
992
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
This method unbinds a queue from an exchange. |
994
|
|
|
|
|
|
|
|
995
|
|
|
|
|
|
|
Following arguments are accepted: |
996
|
|
|
|
|
|
|
|
997
|
|
|
|
|
|
|
=over 2 |
998
|
|
|
|
|
|
|
|
999
|
|
|
|
|
|
|
=item queue |
1000
|
|
|
|
|
|
|
|
1001
|
|
|
|
|
|
|
Specifies the name of the queue to unbind. |
1002
|
|
|
|
|
|
|
|
1003
|
|
|
|
|
|
|
=item exchange |
1004
|
|
|
|
|
|
|
|
1005
|
|
|
|
|
|
|
The name of the exchange to unbind from. |
1006
|
|
|
|
|
|
|
|
1007
|
|
|
|
|
|
|
=item routing_key |
1008
|
|
|
|
|
|
|
|
1009
|
|
|
|
|
|
|
Specifies the routing key of the binding to unbind. |
1010
|
|
|
|
|
|
|
|
1011
|
|
|
|
|
|
|
=back |
1012
|
|
|
|
|
|
|
|
1013
|
|
|
|
|
|
|
=head2 unbind_queue_p |
1014
|
|
|
|
|
|
|
|
1015
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
1016
|
|
|
|
|
|
|
|
1017
|
|
|
|
|
|
|
$channel->unbind_queue_p( |
1018
|
|
|
|
|
|
|
exchange => 'mojo', |
1019
|
|
|
|
|
|
|
queue => 'mq', |
1020
|
|
|
|
|
|
|
routing_key => '' |
1021
|
|
|
|
|
|
|
)->then(sub { |
1022
|
|
|
|
|
|
|
say "Queue unbound..."; |
1023
|
|
|
|
|
|
|
})->catch(sub { |
1024
|
|
|
|
|
|
|
my $err = shift; |
1025
|
|
|
|
|
|
|
warn "Queue unbinding error: $err"; |
1026
|
|
|
|
|
|
|
})->wait; |
1027
|
|
|
|
|
|
|
|
1028
|
|
|
|
|
|
|
=head2 purge_queue |
1029
|
|
|
|
|
|
|
|
1030
|
|
|
|
|
|
|
$channel->purge_queue(queue => 'mq')->deliver; |
1031
|
|
|
|
|
|
|
|
1032
|
|
|
|
|
|
|
Purge a queue. |
1033
|
|
|
|
|
|
|
|
1034
|
|
|
|
|
|
|
This method removes all messages from a queue which are not awaiting acknowledgment. |
1035
|
|
|
|
|
|
|
|
1036
|
|
|
|
|
|
|
Following arguments are accepted: |
1037
|
|
|
|
|
|
|
|
1038
|
|
|
|
|
|
|
=over 2 |
1039
|
|
|
|
|
|
|
|
1040
|
|
|
|
|
|
|
=item queue |
1041
|
|
|
|
|
|
|
|
1042
|
|
|
|
|
|
|
Specifies the name of the queue to purge. |
1043
|
|
|
|
|
|
|
|
1044
|
|
|
|
|
|
|
=back |
1045
|
|
|
|
|
|
|
|
1046
|
|
|
|
|
|
|
=head2 purge_queue_p |
1047
|
|
|
|
|
|
|
|
1048
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
1049
|
|
|
|
|
|
|
|
1050
|
|
|
|
|
|
|
$channel->purge_queue_p( |
1051
|
|
|
|
|
|
|
queue => 'mq', |
1052
|
|
|
|
|
|
|
)->then(sub { |
1053
|
|
|
|
|
|
|
say "Queue purged..."; |
1054
|
|
|
|
|
|
|
})->catch(sub { |
1055
|
|
|
|
|
|
|
my $err = shift; |
1056
|
|
|
|
|
|
|
warn "Queue purging error: $err"; |
1057
|
|
|
|
|
|
|
})->wait; |
1058
|
|
|
|
|
|
|
|
1059
|
|
|
|
|
|
|
=head2 delete_queue |
1060
|
|
|
|
|
|
|
|
1061
|
|
|
|
|
|
|
$channel->delete_queue(queue => 'mq', if_empty => 1)->deliver; |
1062
|
|
|
|
|
|
|
|
1063
|
|
|
|
|
|
|
Delete a queue. |
1064
|
|
|
|
|
|
|
|
1065
|
|
|
|
|
|
|
This method deletes a queue. When a queue is deleted any pending messages |
1066
|
|
|
|
|
|
|
are sent to a dead-letter queue if this is defined in the server configuration, |
1067
|
|
|
|
|
|
|
and all consumers on the queue are cancelled. |
1068
|
|
|
|
|
|
|
|
1069
|
|
|
|
|
|
|
Following arguments are accepted: |
1070
|
|
|
|
|
|
|
|
1071
|
|
|
|
|
|
|
=over 2 |
1072
|
|
|
|
|
|
|
|
1073
|
|
|
|
|
|
|
=item queue |
1074
|
|
|
|
|
|
|
|
1075
|
|
|
|
|
|
|
Specifies the name of the queue to delete. |
1076
|
|
|
|
|
|
|
|
1077
|
|
|
|
|
|
|
=item if_unused |
1078
|
|
|
|
|
|
|
|
1079
|
|
|
|
|
|
|
If set, the server will only delete the queue if it has no consumers. If the queue |
1080
|
|
|
|
|
|
|
has consumers the server does does not delete it but raises a channel exception instead. |
1081
|
|
|
|
|
|
|
|
1082
|
|
|
|
|
|
|
=item if_empty |
1083
|
|
|
|
|
|
|
|
1084
|
|
|
|
|
|
|
If set, the server will only delete the queue if it has no messages. |
1085
|
|
|
|
|
|
|
|
1086
|
|
|
|
|
|
|
=back |
1087
|
|
|
|
|
|
|
|
1088
|
|
|
|
|
|
|
=head2 delete_queue_p |
1089
|
|
|
|
|
|
|
|
1090
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
1091
|
|
|
|
|
|
|
|
1092
|
|
|
|
|
|
|
$channel->delete_queue_p( |
1093
|
|
|
|
|
|
|
queue => 'mq', |
1094
|
|
|
|
|
|
|
if_empty => 1 |
1095
|
|
|
|
|
|
|
)->then(sub { |
1096
|
|
|
|
|
|
|
say "Queue removed..."; |
1097
|
|
|
|
|
|
|
})->catch(sub { |
1098
|
|
|
|
|
|
|
my $err = shift; |
1099
|
|
|
|
|
|
|
warn "Queue removal error: $err"; |
1100
|
|
|
|
|
|
|
})->wait; |
1101
|
|
|
|
|
|
|
|
1102
|
|
|
|
|
|
|
=head2 publish |
1103
|
|
|
|
|
|
|
|
1104
|
|
|
|
|
|
|
my $message = $channel->publish( |
1105
|
|
|
|
|
|
|
exchange => 'mojo', |
1106
|
|
|
|
|
|
|
routing_key => 'mq', |
1107
|
|
|
|
|
|
|
body => 'simple text body', |
1108
|
|
|
|
|
|
|
); |
1109
|
|
|
|
|
|
|
$message->deliver(); |
1110
|
|
|
|
|
|
|
|
1111
|
|
|
|
|
|
|
Publish a message. |
1112
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
This method publishes a message to a specific exchange. The message will be |
1114
|
|
|
|
|
|
|
routed to queues as defined by the exchange configuration and distributed to |
1115
|
|
|
|
|
|
|
any active consumers when the transaction, if any, is committed. |
1116
|
|
|
|
|
|
|
|
1117
|
|
|
|
|
|
|
Following arguments are accepted: |
1118
|
|
|
|
|
|
|
|
1119
|
|
|
|
|
|
|
=over 2 |
1120
|
|
|
|
|
|
|
|
1121
|
|
|
|
|
|
|
=item exchange |
1122
|
|
|
|
|
|
|
|
1123
|
|
|
|
|
|
|
Specifies the name of the exchange to publish to. The exchange name can be empty, |
1124
|
|
|
|
|
|
|
meaning the default exchange. If the exchange name is specified, and that exchange |
1125
|
|
|
|
|
|
|
does not exist, the server will raise a channel exception. |
1126
|
|
|
|
|
|
|
|
1127
|
|
|
|
|
|
|
=item routing_key |
1128
|
|
|
|
|
|
|
|
1129
|
|
|
|
|
|
|
Specifies the routing key for the message. The routing key is used for routing |
1130
|
|
|
|
|
|
|
messages depending on the exchange configuration. |
1131
|
|
|
|
|
|
|
|
1132
|
|
|
|
|
|
|
=item mandatory |
1133
|
|
|
|
|
|
|
|
1134
|
|
|
|
|
|
|
This flag tells the server how to react if the message cannot be routed to a queue. |
1135
|
|
|
|
|
|
|
If this flag is set, the server will return an unroutable message with a Return method. |
1136
|
|
|
|
|
|
|
If this flag is zero, the server silently drops the message. |
1137
|
|
|
|
|
|
|
|
1138
|
|
|
|
|
|
|
All rejections are emitted as C event. |
1139
|
|
|
|
|
|
|
|
1140
|
|
|
|
|
|
|
$message->on(reject => sub { |
1141
|
|
|
|
|
|
|
my $message = shift; |
1142
|
|
|
|
|
|
|
my $frame = shift; |
1143
|
|
|
|
|
|
|
my $method_frame = $frame->method_frame; |
1144
|
|
|
|
|
|
|
|
1145
|
|
|
|
|
|
|
my $reply_code = $method_frame->reply_code; |
1146
|
|
|
|
|
|
|
my $reply_text = $method_frame->reply_text; |
1147
|
|
|
|
|
|
|
}); |
1148
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
=item immediate |
1150
|
|
|
|
|
|
|
|
1151
|
|
|
|
|
|
|
This flag tells the server how to react if the message cannot be routed to a queue consumer |
1152
|
|
|
|
|
|
|
immediately. If this flag is set, the server will return an undeliverable message with a |
1153
|
|
|
|
|
|
|
Return method. If this flag is zero, the server will queue the message, but with no guarantee |
1154
|
|
|
|
|
|
|
that it will ever be consumed. |
1155
|
|
|
|
|
|
|
|
1156
|
|
|
|
|
|
|
As said above, all rejections are emitted as C event. |
1157
|
|
|
|
|
|
|
|
1158
|
|
|
|
|
|
|
$message->on(reject => sub { ... }); |
1159
|
|
|
|
|
|
|
|
1160
|
|
|
|
|
|
|
=back |
1161
|
|
|
|
|
|
|
|
1162
|
|
|
|
|
|
|
=head2 consume |
1163
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
my $consumer = $channel->consume(queue => 'mq'); |
1165
|
|
|
|
|
|
|
$consumer->on(message => sub { ... }); |
1166
|
|
|
|
|
|
|
$consumer->deliver; |
1167
|
|
|
|
|
|
|
|
1168
|
|
|
|
|
|
|
This method asks the server to start a "consumer", which is a transient request for messages from a |
1169
|
|
|
|
|
|
|
specific queue. Consumers last as long as the channel they were declared on, or until the client cancels |
1170
|
|
|
|
|
|
|
them. |
1171
|
|
|
|
|
|
|
|
1172
|
|
|
|
|
|
|
Following arguments are accepted: |
1173
|
|
|
|
|
|
|
|
1174
|
|
|
|
|
|
|
=over 2 |
1175
|
|
|
|
|
|
|
|
1176
|
|
|
|
|
|
|
=item queue |
1177
|
|
|
|
|
|
|
|
1178
|
|
|
|
|
|
|
Specifies the name of the queue to consume from. |
1179
|
|
|
|
|
|
|
|
1180
|
|
|
|
|
|
|
=item consumer_tag |
1181
|
|
|
|
|
|
|
|
1182
|
|
|
|
|
|
|
Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the |
1183
|
|
|
|
|
|
|
same consumer tags. If this field is empty the server will generate a unique tag. |
1184
|
|
|
|
|
|
|
|
1185
|
|
|
|
|
|
|
$consumer->on(success => sub { |
1186
|
|
|
|
|
|
|
my $consumer = shift; |
1187
|
|
|
|
|
|
|
my $frame = shift; |
1188
|
|
|
|
|
|
|
|
1189
|
|
|
|
|
|
|
my $consumer_tag = $frame->method_frame->consumer_tag; |
1190
|
|
|
|
|
|
|
}); |
1191
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
=item no_local (not implemented in RabbitMQ!) |
1193
|
|
|
|
|
|
|
|
1194
|
|
|
|
|
|
|
If the no-local field is set the server will not send messages to the connection that published them. |
1195
|
|
|
|
|
|
|
|
1196
|
|
|
|
|
|
|
See L |
1197
|
|
|
|
|
|
|
|
1198
|
|
|
|
|
|
|
=item no_ack |
1199
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
If this field is set the server does not expect acknowledgements for messages. That is, when a message |
1201
|
|
|
|
|
|
|
is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. |
1202
|
|
|
|
|
|
|
This functionality may increase performance but at the cost of reliability. Messages can get lost if |
1203
|
|
|
|
|
|
|
a client dies before they are delivered to the application. |
1204
|
|
|
|
|
|
|
|
1205
|
|
|
|
|
|
|
=item exclusive |
1206
|
|
|
|
|
|
|
|
1207
|
|
|
|
|
|
|
Request exclusive consumer access, meaning only this consumer can access the queue. |
1208
|
|
|
|
|
|
|
|
1209
|
|
|
|
|
|
|
=back |
1210
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
=head2 cancel |
1212
|
|
|
|
|
|
|
|
1213
|
|
|
|
|
|
|
$channel->cancel(consumer_tag => 'amq.ctag....')->deliver; |
1214
|
|
|
|
|
|
|
|
1215
|
|
|
|
|
|
|
End a queue consumer. |
1216
|
|
|
|
|
|
|
|
1217
|
|
|
|
|
|
|
This method cancels a consumer. This does not affect already delivered messages, but |
1218
|
|
|
|
|
|
|
it does mean the server will not send any more messages for that consumer. The client |
1219
|
|
|
|
|
|
|
may receive an arbitrary number of messages in between sending the cancel method and |
1220
|
|
|
|
|
|
|
receiving the cancel-ok reply. |
1221
|
|
|
|
|
|
|
|
1222
|
|
|
|
|
|
|
Following arguments are accepted: |
1223
|
|
|
|
|
|
|
|
1224
|
|
|
|
|
|
|
=over 2 |
1225
|
|
|
|
|
|
|
|
1226
|
|
|
|
|
|
|
=item consumer_tag |
1227
|
|
|
|
|
|
|
|
1228
|
|
|
|
|
|
|
Holds the consumer tag specified by the client or provided by the server. |
1229
|
|
|
|
|
|
|
|
1230
|
|
|
|
|
|
|
=back |
1231
|
|
|
|
|
|
|
|
1232
|
|
|
|
|
|
|
=head2 get |
1233
|
|
|
|
|
|
|
|
1234
|
|
|
|
|
|
|
my $get = $channel->get(queue => 'mq') |
1235
|
|
|
|
|
|
|
$get->deliver; |
1236
|
|
|
|
|
|
|
|
1237
|
|
|
|
|
|
|
Direct access to a queue. |
1238
|
|
|
|
|
|
|
|
1239
|
|
|
|
|
|
|
This method provides a direct access to the messages in a queue using |
1240
|
|
|
|
|
|
|
a synchronous dialogue that is designed for specific types of application |
1241
|
|
|
|
|
|
|
where synchronous functionality is more important than performance. |
1242
|
|
|
|
|
|
|
|
1243
|
|
|
|
|
|
|
This is simple event emitter to which you have to subscribe. It can emit: |
1244
|
|
|
|
|
|
|
|
1245
|
|
|
|
|
|
|
=over 2 |
1246
|
|
|
|
|
|
|
|
1247
|
|
|
|
|
|
|
=item message |
1248
|
|
|
|
|
|
|
|
1249
|
|
|
|
|
|
|
Provide client with a message. |
1250
|
|
|
|
|
|
|
|
1251
|
|
|
|
|
|
|
This method delivers a message to the client following a get method. A message |
1252
|
|
|
|
|
|
|
delivered by 'get-ok' must be acknowledged unless the no-ack option was set |
1253
|
|
|
|
|
|
|
in the get method. |
1254
|
|
|
|
|
|
|
|
1255
|
|
|
|
|
|
|
You can access all get-ok reply parameters as below: |
1256
|
|
|
|
|
|
|
|
1257
|
|
|
|
|
|
|
$get->on(message => sub { |
1258
|
|
|
|
|
|
|
my $get = shift; |
1259
|
|
|
|
|
|
|
my $get_ok = shift; |
1260
|
|
|
|
|
|
|
my $message = shift; |
1261
|
|
|
|
|
|
|
|
1262
|
|
|
|
|
|
|
say "Still got: " . $get_ok->method_frame->message_count; |
1263
|
|
|
|
|
|
|
}); |
1264
|
|
|
|
|
|
|
|
1265
|
|
|
|
|
|
|
=item empty |
1266
|
|
|
|
|
|
|
|
1267
|
|
|
|
|
|
|
Indicate no messages available. |
1268
|
|
|
|
|
|
|
|
1269
|
|
|
|
|
|
|
This method tells the client that the queue has no messages available for the |
1270
|
|
|
|
|
|
|
client. |
1271
|
|
|
|
|
|
|
|
1272
|
|
|
|
|
|
|
=back |
1273
|
|
|
|
|
|
|
|
1274
|
|
|
|
|
|
|
Following arguments are accepted: |
1275
|
|
|
|
|
|
|
|
1276
|
|
|
|
|
|
|
=over 2 |
1277
|
|
|
|
|
|
|
|
1278
|
|
|
|
|
|
|
=item queue |
1279
|
|
|
|
|
|
|
|
1280
|
|
|
|
|
|
|
Specifies the name of the queue to get a message from. |
1281
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
=item no_ack |
1283
|
|
|
|
|
|
|
|
1284
|
|
|
|
|
|
|
If this field is set the server does not expect acknowledgements for messages. That is, when a message |
1285
|
|
|
|
|
|
|
is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. |
1286
|
|
|
|
|
|
|
This functionality may increase performance but at the cost of reliability. Messages can get lost if |
1287
|
|
|
|
|
|
|
a client dies before they are delivered to the application. |
1288
|
|
|
|
|
|
|
|
1289
|
|
|
|
|
|
|
=back |
1290
|
|
|
|
|
|
|
|
1291
|
|
|
|
|
|
|
=head2 ack |
1292
|
|
|
|
|
|
|
|
1293
|
|
|
|
|
|
|
$channel->ack(delivery_tag => 1); |
1294
|
|
|
|
|
|
|
|
1295
|
|
|
|
|
|
|
Acknowledge one or more messages. |
1296
|
|
|
|
|
|
|
|
1297
|
|
|
|
|
|
|
When sent by the client, this method acknowledges one or more messages |
1298
|
|
|
|
|
|
|
delivered via the Deliver or Get-Ok methods. When sent by server, this |
1299
|
|
|
|
|
|
|
method acknowledges one or more messages published with the Publish |
1300
|
|
|
|
|
|
|
method on a channel in confirm mode. The acknowledgement can be for |
1301
|
|
|
|
|
|
|
a single message or a set of messages up to and including a specific |
1302
|
|
|
|
|
|
|
message. |
1303
|
|
|
|
|
|
|
|
1304
|
|
|
|
|
|
|
Following arguments are accepted: |
1305
|
|
|
|
|
|
|
|
1306
|
|
|
|
|
|
|
=over 2 |
1307
|
|
|
|
|
|
|
|
1308
|
|
|
|
|
|
|
=item delivery_tag |
1309
|
|
|
|
|
|
|
|
1310
|
|
|
|
|
|
|
Server assigned delivery tag that was received with a message. |
1311
|
|
|
|
|
|
|
|
1312
|
|
|
|
|
|
|
=item multiple |
1313
|
|
|
|
|
|
|
|
1314
|
|
|
|
|
|
|
If set to 1, the delivery tag is treated as "up to and including", so |
1315
|
|
|
|
|
|
|
that multiple messages can be acknowledged with a single method. If set |
1316
|
|
|
|
|
|
|
to zero, the delivery tag refers to a single message. If the multiple |
1317
|
|
|
|
|
|
|
field is 1, and the delivery tag is zero, this indicates acknowledgement |
1318
|
|
|
|
|
|
|
of all outstanding messages. |
1319
|
|
|
|
|
|
|
|
1320
|
|
|
|
|
|
|
=back |
1321
|
|
|
|
|
|
|
|
1322
|
|
|
|
|
|
|
=head2 qos |
1323
|
|
|
|
|
|
|
|
1324
|
|
|
|
|
|
|
$channel->qos(prefetch_count => 1)->deliver; |
1325
|
|
|
|
|
|
|
|
1326
|
|
|
|
|
|
|
Sets specified Quality of Service to channel, or entire connection. Accepts following arguments: |
1327
|
|
|
|
|
|
|
|
1328
|
|
|
|
|
|
|
=over 2 |
1329
|
|
|
|
|
|
|
|
1330
|
|
|
|
|
|
|
=item prefetch_size |
1331
|
|
|
|
|
|
|
|
1332
|
|
|
|
|
|
|
Prefetch window size in octets. |
1333
|
|
|
|
|
|
|
|
1334
|
|
|
|
|
|
|
=item prefetch_count |
1335
|
|
|
|
|
|
|
|
1336
|
|
|
|
|
|
|
Prefetch window in complete messages. |
1337
|
|
|
|
|
|
|
|
1338
|
|
|
|
|
|
|
=item global |
1339
|
|
|
|
|
|
|
|
1340
|
|
|
|
|
|
|
If set all settings will be applied connection wide. |
1341
|
|
|
|
|
|
|
|
1342
|
|
|
|
|
|
|
=back |
1343
|
|
|
|
|
|
|
|
1344
|
|
|
|
|
|
|
=head2 recover |
1345
|
|
|
|
|
|
|
|
1346
|
|
|
|
|
|
|
$channel->recover(requeue => 0)->deliver; |
1347
|
|
|
|
|
|
|
|
1348
|
|
|
|
|
|
|
Redeliver unacknowledged messages. |
1349
|
|
|
|
|
|
|
|
1350
|
|
|
|
|
|
|
This method asks the server to redeliver all unacknowledged messages |
1351
|
|
|
|
|
|
|
on a specified channel. Zero or more messages may be redelivered. |
1352
|
|
|
|
|
|
|
|
1353
|
|
|
|
|
|
|
=over 2 |
1354
|
|
|
|
|
|
|
|
1355
|
|
|
|
|
|
|
=item requeue |
1356
|
|
|
|
|
|
|
|
1357
|
|
|
|
|
|
|
If this field is zero, the message will be redelivered to the original |
1358
|
|
|
|
|
|
|
recipient. If this bit is 1, the server will attempt to requeue the |
1359
|
|
|
|
|
|
|
message, potentially then delivering it to an alternative subscriber. |
1360
|
|
|
|
|
|
|
|
1361
|
|
|
|
|
|
|
=back |
1362
|
|
|
|
|
|
|
|
1363
|
|
|
|
|
|
|
=head2 reject |
1364
|
|
|
|
|
|
|
|
1365
|
|
|
|
|
|
|
$channel->reject(delivery_tag => 1, requeue => 0)->deliver; |
1366
|
|
|
|
|
|
|
|
1367
|
|
|
|
|
|
|
Reject an incoming message. |
1368
|
|
|
|
|
|
|
|
1369
|
|
|
|
|
|
|
This method allows a client to reject a message. It can be |
1370
|
|
|
|
|
|
|
used to interrupt and cancel large incoming messages, or |
1371
|
|
|
|
|
|
|
return untreatable messages to their original queue. |
1372
|
|
|
|
|
|
|
|
1373
|
|
|
|
|
|
|
Following arguments are accepted: |
1374
|
|
|
|
|
|
|
|
1375
|
|
|
|
|
|
|
=over 2 |
1376
|
|
|
|
|
|
|
|
1377
|
|
|
|
|
|
|
=item delivery_tag |
1378
|
|
|
|
|
|
|
|
1379
|
|
|
|
|
|
|
Server assigned delivery tag that was received with a message. |
1380
|
|
|
|
|
|
|
|
1381
|
|
|
|
|
|
|
=item requeue |
1382
|
|
|
|
|
|
|
|
1383
|
|
|
|
|
|
|
If requeue is true, the server will attempt to requeue the message. |
1384
|
|
|
|
|
|
|
If requeue is false or the requeue attempt fails the messages are |
1385
|
|
|
|
|
|
|
discarded or dead-lettered. |
1386
|
|
|
|
|
|
|
|
1387
|
|
|
|
|
|
|
=back |
1388
|
|
|
|
|
|
|
|
1389
|
|
|
|
|
|
|
=head2 select_tx |
1390
|
|
|
|
|
|
|
|
1391
|
|
|
|
|
|
|
Work with transactions. |
1392
|
|
|
|
|
|
|
|
1393
|
|
|
|
|
|
|
The Tx class allows publish and ack operations to be batched into atomic units of work. |
1394
|
|
|
|
|
|
|
The intention is that all publish and ack requests issued within a transaction will |
1395
|
|
|
|
|
|
|
complete successfully or none of them will. Servers SHOULD implement atomic transactions |
1396
|
|
|
|
|
|
|
at least where all publish or ack requests affect a single queue. Transactions that cover |
1397
|
|
|
|
|
|
|
multiple queues may be non-atomic, given that queues can be created and destroyed |
1398
|
|
|
|
|
|
|
asynchronously, and such events do not form part of any transaction. |
1399
|
|
|
|
|
|
|
Further, the behaviour of transactions with respect to the immediate and mandatory flags |
1400
|
|
|
|
|
|
|
on Basic.Publish methods is not defined. |
1401
|
|
|
|
|
|
|
|
1402
|
|
|
|
|
|
|
$channel->select_tx()->deliver; |
1403
|
|
|
|
|
|
|
|
1404
|
|
|
|
|
|
|
Select standard transaction mode. |
1405
|
|
|
|
|
|
|
|
1406
|
|
|
|
|
|
|
This method sets the channel to use standard transactions. The client must use this method |
1407
|
|
|
|
|
|
|
at least once on a channel before using the Commit or Rollback methods. |
1408
|
|
|
|
|
|
|
|
1409
|
|
|
|
|
|
|
=head2 commit_tx |
1410
|
|
|
|
|
|
|
|
1411
|
|
|
|
|
|
|
$channel->commit_tx()->deliver; |
1412
|
|
|
|
|
|
|
|
1413
|
|
|
|
|
|
|
Commit the current transaction. |
1414
|
|
|
|
|
|
|
|
1415
|
|
|
|
|
|
|
This method commits all message publications and acknowledgments performed in the current |
1416
|
|
|
|
|
|
|
transaction. A new transaction starts immediately after a commit. |
1417
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
=head2 rollback_tx |
1419
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
$channel->rollback_tx()->deliver; |
1421
|
|
|
|
|
|
|
|
1422
|
|
|
|
|
|
|
Abandon the current transaction. |
1423
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
This method abandons all message publications and acknowledgments performed in the current |
1425
|
|
|
|
|
|
|
transaction. A new transaction starts immediately after a rollback. Note that unacked messages |
1426
|
|
|
|
|
|
|
will not be automatically redelivered by rollback; if that is required an explicit recover |
1427
|
|
|
|
|
|
|
call should be issued. |
1428
|
|
|
|
|
|
|
|
1429
|
|
|
|
|
|
|
=head1 SEE ALSO |
1430
|
|
|
|
|
|
|
|
1431
|
|
|
|
|
|
|
L, L, L |
1432
|
|
|
|
|
|
|
|
1433
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
1434
|
|
|
|
|
|
|
|
1435
|
|
|
|
|
|
|
Copyright (C) 2015-2017, Sebastian Podjasek and others |
1436
|
|
|
|
|
|
|
|
1437
|
|
|
|
|
|
|
Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >> |
1438
|
|
|
|
|
|
|
|
1439
|
|
|
|
|
|
|
This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0. |
1440
|
|
|
|
|
|
|
|
1441
|
|
|
|
|
|
|
=cut |