| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package Mojo::RabbitMQ::Client::Channel; |
|
2
|
5
|
|
|
5
|
|
35
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
|
5
|
|
|
|
|
13
|
|
|
|
5
|
|
|
|
|
52
|
|
|
3
|
|
|
|
|
|
|
|
|
4
|
5
|
|
|
5
|
|
1024
|
use Mojo::Promise; |
|
|
5
|
|
|
|
|
12
|
|
|
|
5
|
|
|
|
|
36
|
|
|
5
|
5
|
|
|
5
|
|
2551
|
use Mojo::RabbitMQ::Client::LocalQueue; |
|
|
5
|
|
|
|
|
17
|
|
|
|
5
|
|
|
|
|
38
|
|
|
6
|
5
|
|
|
5
|
|
2547
|
use Mojo::RabbitMQ::Client::Method; |
|
|
5
|
|
|
|
|
16
|
|
|
|
5
|
|
|
|
|
41
|
|
|
7
|
5
|
|
|
5
|
|
2573
|
use Mojo::RabbitMQ::Client::Method::Publish; |
|
|
5
|
|
|
|
|
16
|
|
|
|
5
|
|
|
|
|
57
|
|
|
8
|
5
|
|
|
5
|
|
219
|
use Scalar::Util qw(isweak weaken); |
|
|
5
|
|
|
|
|
11
|
|
|
|
5
|
|
|
|
|
342
|
|
|
9
|
|
|
|
|
|
|
|
|
10
|
5
|
|
50
|
5
|
|
31
|
use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0; |
|
|
5
|
|
|
|
|
11
|
|
|
|
5
|
|
|
|
|
25279
|
|
|
11
|
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
has id => 0; |
|
13
|
|
|
|
|
|
|
has is_open => 0; |
|
14
|
|
|
|
|
|
|
has is_active => 0; |
|
15
|
|
|
|
|
|
|
has client => undef; |
|
16
|
|
|
|
|
|
|
has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
|
17
|
|
|
|
|
|
|
has content_queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
|
18
|
|
|
|
|
|
|
has consumer_cbs => sub { {} }; |
|
19
|
|
|
|
|
|
|
has return_cbs => sub { {} }; |
|
20
|
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
sub _open { |
|
22
|
0
|
|
|
0
|
|
|
warn "Deprecated call to _open on channel"; |
|
23
|
0
|
|
|
|
|
|
return shift->open(@_); |
|
24
|
|
|
|
|
|
|
} |
|
25
|
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
sub open { |
|
27
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
28
|
|
|
|
|
|
|
|
|
29
|
0
|
0
|
|
|
|
|
if ($self->is_open) { |
|
30
|
0
|
|
|
|
|
|
$self->emit(error => 'Channel has already been opened'); |
|
31
|
0
|
|
|
|
|
|
return $self; |
|
32
|
|
|
|
|
|
|
} |
|
33
|
|
|
|
|
|
|
|
|
34
|
0
|
|
|
|
|
|
weaken $self; |
|
35
|
|
|
|
|
|
|
$self->client->_write_expect( |
|
36
|
|
|
|
|
|
|
'Channel::Open' => {}, |
|
37
|
|
|
|
|
|
|
'Channel::OpenOk' => sub { |
|
38
|
0
|
|
|
0
|
|
|
warn "-- Channel::OpenOk\n" if DEBUG; |
|
39
|
0
|
|
|
|
|
|
$self->is_open(1)->is_active(1)->emit('open'); |
|
40
|
|
|
|
|
|
|
}, |
|
41
|
|
|
|
|
|
|
sub { |
|
42
|
0
|
|
|
0
|
|
|
$self->emit( |
|
43
|
|
|
|
|
|
|
error => 'Invalid response received while trying to open channel: ' |
|
44
|
|
|
|
|
|
|
. shift); |
|
45
|
|
|
|
|
|
|
}, |
|
46
|
0
|
|
|
|
|
|
$self->id, |
|
47
|
|
|
|
|
|
|
); |
|
48
|
|
|
|
|
|
|
|
|
49
|
0
|
|
|
|
|
|
return $self; |
|
50
|
|
|
|
|
|
|
} |
|
51
|
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
sub _push_queue_or_consume { |
|
53
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
54
|
0
|
|
|
|
|
|
my ($frame) = @_; |
|
55
|
|
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
|
weaken $self; |
|
57
|
0
|
0
|
|
|
|
|
if ($frame->isa('Net::AMQP::Frame::Method')) { |
|
58
|
0
|
|
|
|
|
|
my $method_frame = $frame->method_frame; |
|
59
|
|
|
|
|
|
|
|
|
60
|
0
|
0
|
|
|
|
|
if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) { |
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
61
|
0
|
|
|
|
|
|
$self->client->_write_frame(Net::AMQP::Protocol::Channel::CloseOk->new(), |
|
62
|
|
|
|
|
|
|
$self->id); |
|
63
|
0
|
|
|
|
|
|
$self->is_open(0)->is_active(0); |
|
64
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
|
65
|
0
|
|
|
|
|
|
$self->emit(close => $frame); |
|
66
|
|
|
|
|
|
|
|
|
67
|
0
|
|
|
|
|
|
return $self; |
|
68
|
|
|
|
|
|
|
} |
|
69
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) { |
|
70
|
0
|
|
0
|
0
|
|
|
my $cb = $self->consumer_cbs->{$method_frame->consumer_tag} || sub { }; |
|
71
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
|
72
|
|
|
|
|
|
|
'deliver', |
|
73
|
|
|
|
|
|
|
$frame => sub { |
|
74
|
0
|
|
|
0
|
|
|
$cb->emit(message => @_); |
|
75
|
|
|
|
|
|
|
}, |
|
76
|
|
|
|
|
|
|
sub { |
|
77
|
0
|
|
|
0
|
|
|
$self->emit(error => 'Consumer callback failure: ' . shift); |
|
78
|
|
|
|
|
|
|
} |
|
79
|
0
|
|
|
|
|
|
); |
|
80
|
0
|
|
|
|
|
|
return $self; |
|
81
|
|
|
|
|
|
|
} |
|
82
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) { |
|
83
|
|
|
|
|
|
|
my $cb |
|
84
|
|
|
|
|
|
|
= $self->return_cbs->{$method_frame->exchange . '_' |
|
85
|
|
|
|
|
|
|
. $method_frame->routing_key} |
|
86
|
0
|
|
0
|
0
|
|
|
|| sub { }; |
|
87
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
|
88
|
|
|
|
|
|
|
'return', |
|
89
|
|
|
|
|
|
|
$frame => sub { |
|
90
|
0
|
|
|
0
|
|
|
$cb->emit(reject => @_); |
|
91
|
|
|
|
|
|
|
}, |
|
92
|
|
|
|
|
|
|
sub { |
|
93
|
0
|
|
|
0
|
|
|
$self->emit(error => 'Return callback failure: ' . shift); |
|
94
|
|
|
|
|
|
|
} |
|
95
|
0
|
|
|
|
|
|
); |
|
96
|
0
|
|
|
|
|
|
return $self; |
|
97
|
|
|
|
|
|
|
} |
|
98
|
|
|
|
|
|
|
elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) { |
|
99
|
0
|
|
|
|
|
|
$self->is_active($method_frame->active); |
|
100
|
0
|
|
|
|
|
|
$self->client->_write_frame( |
|
101
|
|
|
|
|
|
|
Net::AMQP::Protocol::Channel::FlowOk->new( |
|
102
|
|
|
|
|
|
|
active => $method_frame->active |
|
103
|
|
|
|
|
|
|
), |
|
104
|
|
|
|
|
|
|
$self->id |
|
105
|
|
|
|
|
|
|
); |
|
106
|
|
|
|
|
|
|
|
|
107
|
0
|
|
|
|
|
|
return $self; |
|
108
|
|
|
|
|
|
|
} |
|
109
|
|
|
|
|
|
|
|
|
110
|
0
|
|
|
|
|
|
$self->queue->push($frame); |
|
111
|
|
|
|
|
|
|
} |
|
112
|
|
|
|
|
|
|
else { |
|
113
|
0
|
|
|
|
|
|
$self->content_queue->push($frame); |
|
114
|
|
|
|
|
|
|
} |
|
115
|
|
|
|
|
|
|
|
|
116
|
0
|
|
|
|
|
|
return $self; |
|
117
|
|
|
|
|
|
|
} |
|
118
|
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
sub close { |
|
120
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
121
|
0
|
0
|
|
|
|
|
my $connection = $self->client or return; |
|
122
|
|
|
|
|
|
|
|
|
123
|
0
|
0
|
|
|
|
|
return $self if !$self->is_open; |
|
124
|
|
|
|
|
|
|
|
|
125
|
0
|
0
|
|
|
|
|
return $self->_close() if 0 == scalar keys %{$self->consumer_cbs}; |
|
|
0
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
|
|
127
|
0
|
|
|
|
|
|
for my $consumer_tag (keys %{$self->consumer_cbs}) { |
|
|
0
|
|
|
|
|
|
|
|
128
|
0
|
|
|
|
|
|
my $method = $self->cancel(consumer_tag => $consumer_tag); |
|
129
|
0
|
0
|
|
|
|
|
weaken $self unless isweak $self; |
|
130
|
|
|
|
|
|
|
$method->on( |
|
131
|
|
|
|
|
|
|
success => sub { |
|
132
|
0
|
|
|
0
|
|
|
$self->_close(); |
|
133
|
|
|
|
|
|
|
} |
|
134
|
0
|
|
|
|
|
|
); |
|
135
|
|
|
|
|
|
|
$method->catch( |
|
136
|
|
|
|
|
|
|
sub { |
|
137
|
0
|
|
|
0
|
|
|
$self->_close(); |
|
138
|
0
|
|
|
|
|
|
$self->emit(error => 'Error canceling consumption: ' . shift, @_); |
|
139
|
|
|
|
|
|
|
} |
|
140
|
0
|
|
|
|
|
|
); |
|
141
|
0
|
|
|
|
|
|
$method->deliver(); |
|
142
|
|
|
|
|
|
|
} |
|
143
|
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
|
return $self; |
|
145
|
|
|
|
|
|
|
} |
|
146
|
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
sub _close { |
|
148
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
149
|
0
|
|
|
|
|
|
my %args = @_; |
|
150
|
|
|
|
|
|
|
|
|
151
|
0
|
0
|
|
|
|
|
return unless 0 == scalar keys %{$self->consumer_cbs}; |
|
|
0
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
$self->client->_write_expect( |
|
154
|
|
|
|
|
|
|
'Channel::Close' => {}, |
|
155
|
|
|
|
|
|
|
'Channel::CloseOk' => sub { |
|
156
|
0
|
|
|
0
|
|
|
warn "-- Channel::CloseOk\n" if DEBUG; |
|
157
|
0
|
|
|
|
|
|
$self->is_open(0)->is_active(0); |
|
158
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
|
159
|
0
|
|
|
|
|
|
$self->emit('close'); |
|
160
|
|
|
|
|
|
|
}, |
|
161
|
|
|
|
|
|
|
sub { |
|
162
|
0
|
|
|
0
|
|
|
$self->is_open(0)->is_active(0); |
|
163
|
0
|
|
|
|
|
|
$self->client->delete_channel($self->id); |
|
164
|
0
|
|
|
|
|
|
$self->emit(error => 'Failed closing channel: ' . shift); |
|
165
|
|
|
|
|
|
|
}, |
|
166
|
0
|
|
|
|
|
|
$self->id, |
|
167
|
|
|
|
|
|
|
); |
|
168
|
|
|
|
|
|
|
|
|
169
|
0
|
|
|
|
|
|
return $self; |
|
170
|
|
|
|
|
|
|
} |
|
171
|
|
|
|
|
|
|
|
|
172
|
|
|
|
|
|
|
sub _assert_open { |
|
173
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
174
|
|
|
|
|
|
|
|
|
175
|
0
|
0
|
0
|
|
|
|
return 0 unless $self->is_open and $self->is_active; |
|
176
|
|
|
|
|
|
|
|
|
177
|
0
|
|
|
|
|
|
return 1; |
|
178
|
|
|
|
|
|
|
} |
|
179
|
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
sub _prepare_method { |
|
181
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
182
|
|
|
|
|
|
|
|
|
183
|
0
|
|
|
|
|
|
my $method = Mojo::RabbitMQ::Client::Method->new( |
|
184
|
|
|
|
|
|
|
client => $self->client, |
|
185
|
|
|
|
|
|
|
channel => $self |
|
186
|
|
|
|
|
|
|
); |
|
187
|
0
|
|
|
|
|
|
weaken $method->{channel}; |
|
188
|
0
|
|
|
|
|
|
weaken $method->{client}; |
|
189
|
|
|
|
|
|
|
|
|
190
|
0
|
|
|
|
|
|
return $method->setup(@_); |
|
191
|
|
|
|
|
|
|
} |
|
192
|
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
sub declare_exchange { |
|
194
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
195
|
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
197
|
|
|
|
|
|
|
'Exchange::Declare' => { |
|
198
|
|
|
|
|
|
|
type => 'direct', |
|
199
|
|
|
|
|
|
|
passive => 0, |
|
200
|
|
|
|
|
|
|
durable => 0, |
|
201
|
|
|
|
|
|
|
auto_delete => 0, |
|
202
|
|
|
|
|
|
|
internal => 0, |
|
203
|
|
|
|
|
|
|
@_, # exchange |
|
204
|
|
|
|
|
|
|
ticket => 0, |
|
205
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
206
|
|
|
|
|
|
|
}, |
|
207
|
|
|
|
|
|
|
'Exchange::DeclareOk' => sub { |
|
208
|
0
|
|
|
0
|
|
|
warn "-- Exchange::DeclareOk\n" if DEBUG; |
|
209
|
|
|
|
|
|
|
} |
|
210
|
0
|
|
|
|
|
|
); |
|
211
|
|
|
|
|
|
|
} |
|
212
|
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
sub declare_exchange_p { |
|
214
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
215
|
|
|
|
|
|
|
|
|
216
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
217
|
0
|
|
|
|
|
|
my $method = $self->declare_exchange(@_); |
|
218
|
0
|
|
|
|
|
|
weaken $self; |
|
219
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
220
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
221
|
0
|
|
|
|
|
|
$method->deliver; |
|
222
|
|
|
|
|
|
|
|
|
223
|
0
|
|
|
|
|
|
return $promise; |
|
224
|
|
|
|
|
|
|
} |
|
225
|
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
sub delete_exchange { |
|
227
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
228
|
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
230
|
|
|
|
|
|
|
'Exchange::Delete' => { |
|
231
|
|
|
|
|
|
|
if_unused => 0, |
|
232
|
|
|
|
|
|
|
@_, # exchange |
|
233
|
|
|
|
|
|
|
ticket => 0, |
|
234
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
235
|
|
|
|
|
|
|
}, |
|
236
|
|
|
|
|
|
|
'Exchange::DeleteOk' => sub { |
|
237
|
0
|
|
|
0
|
|
|
warn "-- Exchange::DeleteOk\n" if DEBUG; |
|
238
|
|
|
|
|
|
|
} |
|
239
|
0
|
|
|
|
|
|
); |
|
240
|
|
|
|
|
|
|
} |
|
241
|
|
|
|
|
|
|
|
|
242
|
|
|
|
|
|
|
sub delete_exchange_p { |
|
243
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
244
|
|
|
|
|
|
|
|
|
245
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
246
|
0
|
|
|
|
|
|
my $method = $self->delete_exchange(@_); |
|
247
|
0
|
|
|
|
|
|
weaken $self; |
|
248
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
249
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
250
|
0
|
|
|
|
|
|
$method->deliver; |
|
251
|
|
|
|
|
|
|
|
|
252
|
0
|
|
|
|
|
|
return $promise; |
|
253
|
|
|
|
|
|
|
} |
|
254
|
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
sub declare_queue { |
|
256
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
257
|
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
259
|
|
|
|
|
|
|
'Queue::Declare' => { |
|
260
|
|
|
|
|
|
|
queue => '', |
|
261
|
|
|
|
|
|
|
passive => 0, |
|
262
|
|
|
|
|
|
|
durable => 0, |
|
263
|
|
|
|
|
|
|
exclusive => 0, |
|
264
|
|
|
|
|
|
|
auto_delete => 0, |
|
265
|
|
|
|
|
|
|
no_ack => 1, |
|
266
|
|
|
|
|
|
|
@_, |
|
267
|
|
|
|
|
|
|
ticket => 0, |
|
268
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
269
|
|
|
|
|
|
|
}, |
|
270
|
|
|
|
|
|
|
'Queue::DeclareOk' => sub { |
|
271
|
0
|
|
|
0
|
|
|
warn "-- Queue::DeclareOk\n" if DEBUG; |
|
272
|
|
|
|
|
|
|
} |
|
273
|
0
|
|
|
|
|
|
); |
|
274
|
|
|
|
|
|
|
} |
|
275
|
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
sub declare_queue_p { |
|
277
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
278
|
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
280
|
0
|
|
|
|
|
|
my $method = $self->declare_queue(@_); |
|
281
|
0
|
|
|
|
|
|
weaken $self; |
|
282
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
283
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
284
|
0
|
|
|
|
|
|
$method->deliver; |
|
285
|
|
|
|
|
|
|
|
|
286
|
0
|
|
|
|
|
|
return $promise; |
|
287
|
|
|
|
|
|
|
} |
|
288
|
|
|
|
|
|
|
|
|
289
|
|
|
|
|
|
|
sub bind_queue { |
|
290
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
291
|
|
|
|
|
|
|
|
|
292
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
293
|
|
|
|
|
|
|
'Queue::Bind' => { |
|
294
|
|
|
|
|
|
|
@_, # queue, exchange, routing_key |
|
295
|
|
|
|
|
|
|
ticket => 0, |
|
296
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
297
|
|
|
|
|
|
|
}, |
|
298
|
|
|
|
|
|
|
'Queue::BindOk' => sub { |
|
299
|
0
|
|
|
0
|
|
|
warn "-- Queue::BindOk\n" if DEBUG; |
|
300
|
|
|
|
|
|
|
} |
|
301
|
0
|
|
|
|
|
|
); |
|
302
|
|
|
|
|
|
|
} |
|
303
|
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
sub bind_queue_p { |
|
305
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
306
|
|
|
|
|
|
|
|
|
307
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
308
|
0
|
|
|
|
|
|
my $method = $self->bind_queue(@_); |
|
309
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
310
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
311
|
0
|
|
|
|
|
|
$method->deliver; |
|
312
|
|
|
|
|
|
|
|
|
313
|
0
|
|
|
|
|
|
return $promise; |
|
314
|
|
|
|
|
|
|
} |
|
315
|
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
sub unbind_queue { |
|
317
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
320
|
|
|
|
|
|
|
'Queue::Unbind' => { |
|
321
|
|
|
|
|
|
|
@_, # queue, exchange, routing_key |
|
322
|
|
|
|
|
|
|
ticket => 0, |
|
323
|
|
|
|
|
|
|
}, |
|
324
|
|
|
|
|
|
|
'Queue::UnbindOk' => sub { |
|
325
|
0
|
|
|
0
|
|
|
warn "-- Queue::UnbindOk\n" if DEBUG; |
|
326
|
|
|
|
|
|
|
} |
|
327
|
0
|
|
|
|
|
|
); |
|
328
|
|
|
|
|
|
|
} |
|
329
|
|
|
|
|
|
|
|
|
330
|
|
|
|
|
|
|
sub unbind_queue_p { |
|
331
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
332
|
|
|
|
|
|
|
|
|
333
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
334
|
0
|
|
|
|
|
|
my $method = $self->unbind_queue(@_); |
|
335
|
0
|
|
|
|
|
|
weaken $self; |
|
336
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
337
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
338
|
0
|
|
|
|
|
|
$method->deliver; |
|
339
|
|
|
|
|
|
|
|
|
340
|
0
|
|
|
|
|
|
return $promise; |
|
341
|
|
|
|
|
|
|
} |
|
342
|
|
|
|
|
|
|
|
|
343
|
|
|
|
|
|
|
sub purge_queue { |
|
344
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
345
|
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
347
|
|
|
|
|
|
|
'Queue::Purge' => { |
|
348
|
|
|
|
|
|
|
@_, # queue |
|
349
|
|
|
|
|
|
|
ticket => 0, |
|
350
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
351
|
|
|
|
|
|
|
}, |
|
352
|
|
|
|
|
|
|
'Queue::PurgeOk' => sub { |
|
353
|
0
|
|
|
0
|
|
|
warn "-- Queue::PurgeOk\n" if DEBUG; |
|
354
|
|
|
|
|
|
|
} |
|
355
|
0
|
|
|
|
|
|
); |
|
356
|
|
|
|
|
|
|
} |
|
357
|
|
|
|
|
|
|
|
|
358
|
|
|
|
|
|
|
sub purge_queue_p { |
|
359
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
360
|
|
|
|
|
|
|
|
|
361
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
362
|
0
|
|
|
|
|
|
my $method = $self->purge_queue(@_); |
|
363
|
0
|
|
|
|
|
|
weaken $self; |
|
364
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
365
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
366
|
0
|
|
|
|
|
|
$method->deliver; |
|
367
|
|
|
|
|
|
|
|
|
368
|
0
|
|
|
|
|
|
return $promise; |
|
369
|
|
|
|
|
|
|
} |
|
370
|
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
sub delete_queue { |
|
372
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
373
|
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
375
|
|
|
|
|
|
|
'Queue::Delete' => { |
|
376
|
|
|
|
|
|
|
if_unused => 0, |
|
377
|
|
|
|
|
|
|
if_empty => 0, |
|
378
|
|
|
|
|
|
|
@_, # queue |
|
379
|
|
|
|
|
|
|
ticket => 0, |
|
380
|
|
|
|
|
|
|
nowait => 0, # FIXME |
|
381
|
|
|
|
|
|
|
}, |
|
382
|
|
|
|
|
|
|
'Queue::DeleteOk' => sub { |
|
383
|
0
|
|
|
0
|
|
|
warn "-- Queue::DeleteOk\n" if DEBUG; |
|
384
|
|
|
|
|
|
|
} |
|
385
|
0
|
|
|
|
|
|
); |
|
386
|
|
|
|
|
|
|
} |
|
387
|
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
sub delete_queue_p { |
|
389
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
390
|
|
|
|
|
|
|
|
|
391
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
392
|
0
|
|
|
|
|
|
my $method = $self->delete_queue(@_); |
|
393
|
0
|
|
|
|
|
|
weaken $self; |
|
394
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
395
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
396
|
0
|
|
|
|
|
|
$method->deliver; |
|
397
|
|
|
|
|
|
|
|
|
398
|
0
|
|
|
|
|
|
return $promise; |
|
399
|
|
|
|
|
|
|
} |
|
400
|
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
sub publish { |
|
402
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
403
|
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
|
return Mojo::RabbitMQ::Client::Method::Publish->new( |
|
405
|
|
|
|
|
|
|
client => $self->client, |
|
406
|
|
|
|
|
|
|
channel => $self |
|
407
|
|
|
|
|
|
|
)->setup(@_); |
|
408
|
|
|
|
|
|
|
} |
|
409
|
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
sub publish_p { |
|
411
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
412
|
|
|
|
|
|
|
|
|
413
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
414
|
0
|
|
|
|
|
|
my $method = Mojo::RabbitMQ::Client::Method::Publish->new( |
|
415
|
|
|
|
|
|
|
client => $self->client, |
|
416
|
|
|
|
|
|
|
channel => $self |
|
417
|
|
|
|
|
|
|
); |
|
418
|
0
|
|
|
|
|
|
weaken $method->{client}; |
|
419
|
0
|
|
|
|
|
|
weaken $method->{channel}; |
|
420
|
0
|
|
|
|
|
|
$method->setup(@_); |
|
421
|
0
|
|
|
|
|
|
weaken $self; |
|
422
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
423
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
424
|
0
|
|
|
|
|
|
$method->deliver; |
|
425
|
|
|
|
|
|
|
|
|
426
|
0
|
|
|
|
|
|
return $promise; |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
sub consume { |
|
430
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
431
|
|
|
|
|
|
|
|
|
432
|
|
|
|
|
|
|
my $method = $self->_prepare_method( |
|
433
|
|
|
|
|
|
|
'Basic::Consume' => { |
|
434
|
|
|
|
|
|
|
consumer_tag => '', |
|
435
|
|
|
|
|
|
|
no_local => 0, |
|
436
|
|
|
|
|
|
|
no_ack => 1, |
|
437
|
|
|
|
|
|
|
exclusive => 0, |
|
438
|
|
|
|
|
|
|
@_, |
|
439
|
|
|
|
|
|
|
ticket => 0, |
|
440
|
|
|
|
|
|
|
nowait => 0 |
|
441
|
|
|
|
|
|
|
}, |
|
442
|
|
|
|
|
|
|
'Basic::ConsumeOk' => sub { |
|
443
|
0
|
|
|
0
|
|
|
warn "-- Basic::ConsumeOk\n" if DEBUG; |
|
444
|
|
|
|
|
|
|
} |
|
445
|
0
|
|
|
|
|
|
); |
|
446
|
0
|
|
|
|
|
|
weaken $self; |
|
447
|
|
|
|
|
|
|
$method->on( |
|
448
|
|
|
|
|
|
|
success => sub { |
|
449
|
0
|
|
|
0
|
|
|
my $this = shift; |
|
450
|
0
|
|
|
|
|
|
my $frame = shift; |
|
451
|
0
|
|
|
|
|
|
my $tag = $frame->method_frame->consumer_tag; |
|
452
|
|
|
|
|
|
|
|
|
453
|
0
|
|
|
|
|
|
$self->consumer_cbs->{$tag} = $this; |
|
454
|
|
|
|
|
|
|
} |
|
455
|
0
|
|
|
|
|
|
); |
|
456
|
|
|
|
|
|
|
|
|
457
|
0
|
|
|
|
|
|
return $method; |
|
458
|
|
|
|
|
|
|
} |
|
459
|
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
sub cancel { |
|
461
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
462
|
|
|
|
|
|
|
|
|
463
|
|
|
|
|
|
|
my $method = $self->_prepare_method( |
|
464
|
|
|
|
|
|
|
'Basic::Cancel', |
|
465
|
|
|
|
|
|
|
{ |
|
466
|
|
|
|
|
|
|
@_, # consumer_tag |
|
467
|
|
|
|
|
|
|
nowait => 0, |
|
468
|
|
|
|
|
|
|
}, |
|
469
|
|
|
|
|
|
|
'Basic::CancelOk' => sub { |
|
470
|
0
|
|
|
0
|
|
|
warn "-- Basic::CancelOk\n" if DEBUG; |
|
471
|
|
|
|
|
|
|
} |
|
472
|
0
|
|
|
|
|
|
); |
|
473
|
0
|
|
|
|
|
|
weaken $self; |
|
474
|
|
|
|
|
|
|
$method->on( |
|
475
|
|
|
|
|
|
|
success => sub { |
|
476
|
0
|
|
|
0
|
|
|
my $this = shift; |
|
477
|
0
|
|
|
|
|
|
my $frame = shift; |
|
478
|
0
|
|
|
|
|
|
delete $self->consumer_cbs->{$frame->method_frame->consumer_tag}; |
|
479
|
|
|
|
|
|
|
} |
|
480
|
0
|
|
|
|
|
|
); |
|
481
|
0
|
|
|
|
|
|
return $method; |
|
482
|
|
|
|
|
|
|
} |
|
483
|
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
sub get { |
|
485
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
486
|
|
|
|
|
|
|
|
|
487
|
0
|
|
|
|
|
|
my $method = $self->_prepare_method( |
|
488
|
|
|
|
|
|
|
'Basic::Get', |
|
489
|
|
|
|
|
|
|
{ |
|
490
|
|
|
|
|
|
|
no_ack => 1, |
|
491
|
|
|
|
|
|
|
@_, # queue |
|
492
|
|
|
|
|
|
|
ticket => 0, |
|
493
|
|
|
|
|
|
|
}, |
|
494
|
|
|
|
|
|
|
[qw(Basic::GetOk Basic::GetEmpty)] |
|
495
|
|
|
|
|
|
|
); |
|
496
|
0
|
|
|
|
|
|
weaken $self; |
|
497
|
|
|
|
|
|
|
$method->on( |
|
498
|
|
|
|
|
|
|
success => sub { |
|
499
|
0
|
|
|
0
|
|
|
warn "-- Basic::GetOk|GetEmpty\n" if DEBUG; |
|
500
|
0
|
|
|
|
|
|
my $this = shift; |
|
501
|
0
|
|
|
|
|
|
my $frame = shift; |
|
502
|
|
|
|
|
|
|
|
|
503
|
0
|
0
|
|
|
|
|
$this->emit(empty => $frame) |
|
504
|
|
|
|
|
|
|
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty'); |
|
505
|
|
|
|
|
|
|
$self->_push_read_header_and_body( |
|
506
|
|
|
|
|
|
|
'ok', $frame, |
|
507
|
|
|
|
|
|
|
sub { |
|
508
|
0
|
|
|
|
|
|
$this->emit(message => $frame, @_); |
|
509
|
|
|
|
|
|
|
}, |
|
510
|
|
|
|
|
|
|
sub { |
|
511
|
0
|
|
|
|
|
|
$this->emit(error => 'Failed to get messages from queue'); |
|
512
|
|
|
|
|
|
|
} |
|
513
|
0
|
|
|
|
|
|
); |
|
514
|
|
|
|
|
|
|
} |
|
515
|
0
|
|
|
|
|
|
); |
|
516
|
|
|
|
|
|
|
|
|
517
|
0
|
|
|
|
|
|
return $method; |
|
518
|
|
|
|
|
|
|
} |
|
519
|
|
|
|
|
|
|
|
|
520
|
|
|
|
|
|
|
sub get_p { |
|
521
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
522
|
|
|
|
|
|
|
|
|
523
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
524
|
0
|
|
|
|
|
|
my $method = $self->get(@_); |
|
525
|
0
|
|
|
|
|
|
weaken $self; |
|
526
|
0
|
|
|
0
|
|
|
$method->on('message' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
527
|
0
|
|
|
0
|
|
|
$method->on('empty' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
528
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
529
|
0
|
|
|
|
|
|
$method->deliver; |
|
530
|
|
|
|
|
|
|
|
|
531
|
0
|
|
|
|
|
|
return $promise; |
|
532
|
|
|
|
|
|
|
} |
|
533
|
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
sub ack { |
|
535
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
536
|
0
|
|
|
|
|
|
my %args = (); |
|
537
|
0
|
0
|
|
|
|
|
if (ref($_[0]) eq 'HASH') { |
|
538
|
0
|
0
|
|
|
|
|
if (defined $_[0]->{ok}) { |
|
|
|
0
|
|
|
|
|
|
|
539
|
0
|
|
|
|
|
|
$args{delivery_tag} = $_[0]->{ok}->method_frame->delivery_tag; |
|
540
|
|
|
|
|
|
|
} elsif (defined $_[0]->{deliver}) { |
|
541
|
0
|
|
|
|
|
|
$args{delivery_tag} = $_[0]->{deliver}->method_frame->delivery_tag; |
|
542
|
|
|
|
|
|
|
} |
|
543
|
|
|
|
|
|
|
} else { |
|
544
|
0
|
|
|
|
|
|
%args = @_; |
|
545
|
|
|
|
|
|
|
} |
|
546
|
|
|
|
|
|
|
|
|
547
|
0
|
0
|
|
|
|
|
die "ack requires delivery_tag in arguments" unless defined $args{delivery_tag}; |
|
548
|
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
return $self->_prepare_method( |
|
550
|
|
|
|
|
|
|
'Basic::Ack' => { |
|
551
|
|
|
|
|
|
|
delivery_tag => 0, |
|
552
|
|
|
|
|
|
|
multiple => |
|
553
|
0
|
0
|
0
|
|
|
|
(defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1), |
|
554
|
|
|
|
|
|
|
%args, |
|
555
|
|
|
|
|
|
|
} |
|
556
|
|
|
|
|
|
|
); |
|
557
|
|
|
|
|
|
|
} |
|
558
|
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
sub ack_p { |
|
560
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
561
|
|
|
|
|
|
|
|
|
562
|
0
|
|
|
|
|
|
my $promise = Mojo::Promise->new; |
|
563
|
0
|
|
|
|
|
|
my $method = $self->ack(@_); |
|
564
|
0
|
|
|
|
|
|
weaken $self; |
|
565
|
0
|
|
|
0
|
|
|
$method->on('success' => sub { shift; $promise->resolve($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
566
|
0
|
|
|
0
|
|
|
$method->on('error' => sub { shift; $promise->reject($self, @_) }); |
|
|
0
|
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
567
|
0
|
|
|
|
|
|
$method->deliver; |
|
568
|
|
|
|
|
|
|
|
|
569
|
0
|
|
|
|
|
|
return $promise; |
|
570
|
|
|
|
|
|
|
} |
|
571
|
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
sub qos { |
|
573
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
574
|
|
|
|
|
|
|
|
|
575
|
0
|
|
|
|
|
|
return $self->_prepare_method('Basic::Qos', |
|
576
|
|
|
|
|
|
|
{prefetch_count => 1, @_, prefetch_size => 0, global => 0,}, |
|
577
|
|
|
|
|
|
|
'Basic::QosOk'); |
|
578
|
|
|
|
|
|
|
} |
|
579
|
|
|
|
|
|
|
|
|
580
|
|
|
|
|
|
|
sub recover { |
|
581
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
582
|
|
|
|
|
|
|
|
|
583
|
0
|
|
|
|
|
|
return $self->_prepare_method('Basic::Recover' => {requeue => 1, @_,}); |
|
584
|
|
|
|
|
|
|
} |
|
585
|
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
sub reject { |
|
587
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
588
|
|
|
|
|
|
|
|
|
589
|
0
|
|
|
|
|
|
return $self->_prepare_method( |
|
590
|
|
|
|
|
|
|
'Basic::Reject' => {delivery_tag => 0, requeue => 0, @_,}); |
|
591
|
|
|
|
|
|
|
} |
|
592
|
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
sub select_tx { |
|
594
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
595
|
|
|
|
|
|
|
|
|
596
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Select', {}, 'Tx::SelectOk'); |
|
597
|
|
|
|
|
|
|
} |
|
598
|
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
sub commit_tx { |
|
600
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
601
|
|
|
|
|
|
|
|
|
602
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Commit', {}, 'Tx::CommitOk'); |
|
603
|
|
|
|
|
|
|
} |
|
604
|
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
sub rollback_tx { |
|
606
|
0
|
|
|
0
|
1
|
|
my $self = shift; |
|
607
|
|
|
|
|
|
|
|
|
608
|
0
|
|
|
|
|
|
return $self->_prepare_method('Tx::Rollback', {}, 'Tx::RollbackOk'); |
|
609
|
|
|
|
|
|
|
} |
|
610
|
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
sub _push_read_header_and_body { |
|
612
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
613
|
0
|
|
|
|
|
|
my ($type, $frame, $cb, $failure_cb) = @_; |
|
614
|
0
|
|
|
|
|
|
my $response = {$type => $frame}; |
|
615
|
0
|
|
|
|
|
|
my $body_size = 0; |
|
616
|
|
|
|
|
|
|
|
|
617
|
|
|
|
|
|
|
$self->content_queue->get( |
|
618
|
|
|
|
|
|
|
sub { |
|
619
|
0
|
|
|
0
|
|
|
my $frame = shift; |
|
620
|
|
|
|
|
|
|
|
|
621
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not header frame') |
|
622
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Header'); |
|
623
|
|
|
|
|
|
|
|
|
624
|
0
|
|
|
|
|
|
my $header_frame = $frame->header_frame; |
|
625
|
0
|
0
|
|
|
|
|
return $failure_cb->('Header is not Protocol::Basic::ContentHeader' |
|
626
|
|
|
|
|
|
|
. 'Header was ' |
|
627
|
|
|
|
|
|
|
. ref $header_frame) |
|
628
|
|
|
|
|
|
|
if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader'); |
|
629
|
|
|
|
|
|
|
|
|
630
|
0
|
|
|
|
|
|
$response->{header} = $header_frame; |
|
631
|
0
|
|
|
|
|
|
$body_size = $frame->body_size; |
|
632
|
|
|
|
|
|
|
} |
|
633
|
0
|
|
|
|
|
|
); |
|
634
|
|
|
|
|
|
|
|
|
635
|
0
|
|
|
|
|
|
my $body_payload = ""; |
|
636
|
0
|
|
|
|
|
|
my $next_frame; |
|
637
|
|
|
|
|
|
|
$next_frame = sub { |
|
638
|
0
|
|
|
0
|
|
|
my $frame = shift; |
|
639
|
|
|
|
|
|
|
|
|
640
|
0
|
0
|
|
|
|
|
return $failure_cb->('Received data is not body frame') |
|
641
|
|
|
|
|
|
|
if !$frame->isa('Net::AMQP::Frame::Body'); |
|
642
|
|
|
|
|
|
|
|
|
643
|
0
|
|
|
|
|
|
$body_payload .= $frame->payload; |
|
644
|
|
|
|
|
|
|
|
|
645
|
0
|
0
|
|
|
|
|
if (length($body_payload) < $body_size) { |
|
646
|
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
# More to come |
|
648
|
0
|
|
|
|
|
|
$self->content_queue->get($next_frame); |
|
649
|
|
|
|
|
|
|
} |
|
650
|
|
|
|
|
|
|
else { |
|
651
|
0
|
|
|
|
|
|
$frame->payload($body_payload); |
|
652
|
0
|
|
|
|
|
|
$response->{body} = $frame; |
|
653
|
0
|
|
|
|
|
|
$cb->($response); |
|
654
|
|
|
|
|
|
|
} |
|
655
|
0
|
|
|
|
|
|
}; |
|
656
|
|
|
|
|
|
|
|
|
657
|
0
|
|
|
|
|
|
$self->content_queue->get($next_frame); |
|
658
|
|
|
|
|
|
|
|
|
659
|
0
|
|
|
|
|
|
return $self; |
|
660
|
|
|
|
|
|
|
} |
|
661
|
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
sub DESTROY { |
|
663
|
0
|
|
|
0
|
|
|
my $self = shift; |
|
664
|
0
|
0
|
|
|
|
|
$self->close() if defined $self; |
|
665
|
0
|
|
|
|
|
|
return; |
|
666
|
|
|
|
|
|
|
} |
|
667
|
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
1; |
|
669
|
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
=encoding utf8 |
|
671
|
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
=head1 NAME |
|
673
|
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
Mojo::RabbitMQ::Client::Channel - handles all channel related methods |
|
675
|
|
|
|
|
|
|
|
|
676
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
677
|
|
|
|
|
|
|
|
|
678
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client::Channel; |
|
679
|
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
|
681
|
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
$channel->catch(sub { warn "Some channel error occurred: " . $_[1] }); |
|
683
|
|
|
|
|
|
|
|
|
684
|
|
|
|
|
|
|
$channel->on( |
|
685
|
|
|
|
|
|
|
open => sub { |
|
686
|
|
|
|
|
|
|
my ($channel) = @_; |
|
687
|
|
|
|
|
|
|
... |
|
688
|
|
|
|
|
|
|
} |
|
689
|
|
|
|
|
|
|
); |
|
690
|
|
|
|
|
|
|
$channel->on(close => sub { warn "Channel closed" }); |
|
691
|
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
$client->open_channel($channel); |
|
693
|
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
695
|
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
L allows one to call all channel related methods. |
|
697
|
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
=head1 EVENTS |
|
699
|
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
L inherits all events from L and can emit the |
|
701
|
|
|
|
|
|
|
following new ones. |
|
702
|
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
=head2 open |
|
704
|
|
|
|
|
|
|
|
|
705
|
|
|
|
|
|
|
$channel->on(open => sub { |
|
706
|
|
|
|
|
|
|
my ($channel) = @_; |
|
707
|
|
|
|
|
|
|
... |
|
708
|
|
|
|
|
|
|
}); |
|
709
|
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
Emitted when channel receives Open-Ok. |
|
711
|
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
=head2 close |
|
713
|
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
$channel->on(close=> sub { |
|
715
|
|
|
|
|
|
|
my ($channel, $frame) = @_; |
|
716
|
|
|
|
|
|
|
... |
|
717
|
|
|
|
|
|
|
}); |
|
718
|
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
Emitted when channel gets closed, C<$frame> contains close reason. |
|
720
|
|
|
|
|
|
|
|
|
721
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
|
722
|
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
L has following attributes. |
|
724
|
|
|
|
|
|
|
|
|
725
|
|
|
|
|
|
|
=head2 id |
|
726
|
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
my $id = $channel->id; |
|
728
|
|
|
|
|
|
|
$channel->id(20810); |
|
729
|
|
|
|
|
|
|
|
|
730
|
|
|
|
|
|
|
If not set, L sets it to next free number when channel is opened. |
|
731
|
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=head2 is_open |
|
733
|
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
$channel->is_open ? "Channel is open" : "Channel is closed"; |
|
735
|
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
=head2 is_active |
|
737
|
|
|
|
|
|
|
|
|
738
|
|
|
|
|
|
|
$channel->is_active ? "Channel is active" : "Channel is not active"; |
|
739
|
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
This can be modified on reception of Channel-Flow. |
|
741
|
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
=head2 client |
|
743
|
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
my $client = $channel->client; |
|
745
|
|
|
|
|
|
|
$channel->client($client); |
|
746
|
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
=head1 METHODS |
|
748
|
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
L inherits all methods from L and implements |
|
750
|
|
|
|
|
|
|
the following new ones. |
|
751
|
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
=head2 close |
|
753
|
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
$channel->close; |
|
755
|
|
|
|
|
|
|
|
|
756
|
|
|
|
|
|
|
Cancels all consumers and closes channel afterwards. |
|
757
|
|
|
|
|
|
|
|
|
758
|
|
|
|
|
|
|
=head2 declare_exchange |
|
759
|
|
|
|
|
|
|
|
|
760
|
|
|
|
|
|
|
my $exchange = $channel->declare_exchange( |
|
761
|
|
|
|
|
|
|
exchange => 'mojo', |
|
762
|
|
|
|
|
|
|
type => 'fanout', |
|
763
|
|
|
|
|
|
|
durable => 1, |
|
764
|
|
|
|
|
|
|
... |
|
765
|
|
|
|
|
|
|
)->deliver; |
|
766
|
|
|
|
|
|
|
|
|
767
|
|
|
|
|
|
|
Verify exchange exists, create if needed. |
|
768
|
|
|
|
|
|
|
|
|
769
|
|
|
|
|
|
|
This method creates an exchange if it does not already exist, and if the |
|
770
|
|
|
|
|
|
|
exchange exists, verifies that it is of the correct and expected class. |
|
771
|
|
|
|
|
|
|
|
|
772
|
|
|
|
|
|
|
Following arguments are accepted: |
|
773
|
|
|
|
|
|
|
|
|
774
|
|
|
|
|
|
|
=over 2 |
|
775
|
|
|
|
|
|
|
|
|
776
|
|
|
|
|
|
|
=item exchange |
|
777
|
|
|
|
|
|
|
|
|
778
|
|
|
|
|
|
|
Unique exchange name |
|
779
|
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
=item type |
|
781
|
|
|
|
|
|
|
|
|
782
|
|
|
|
|
|
|
Each exchange belongs to one of a set of exchange types implemented by the server. The |
|
783
|
|
|
|
|
|
|
exchange types define the functionality of the exchange - i.e. how messages are routed |
|
784
|
|
|
|
|
|
|
through it. It is not valid or meaningful to attempt to change the type of an existing |
|
785
|
|
|
|
|
|
|
exchange. |
|
786
|
|
|
|
|
|
|
|
|
787
|
|
|
|
|
|
|
=item passive |
|
788
|
|
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
If set, the server will reply with Declare-Ok if the exchange already exists with the same |
|
790
|
|
|
|
|
|
|
name, and raise an error if not. The client can use this to check whether an exchange |
|
791
|
|
|
|
|
|
|
exists without modifying the server state. When set, all other method fields except name |
|
792
|
|
|
|
|
|
|
and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments |
|
793
|
|
|
|
|
|
|
are compared for semantic equivalence. |
|
794
|
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
=item durable |
|
796
|
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges |
|
798
|
|
|
|
|
|
|
remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged |
|
799
|
|
|
|
|
|
|
if/when a server restarts. |
|
800
|
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
=item auto_delete |
|
802
|
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
If set, the exchange is deleted when all queues have finished using it. |
|
804
|
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
=item internal |
|
806
|
|
|
|
|
|
|
|
|
807
|
|
|
|
|
|
|
If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. |
|
808
|
|
|
|
|
|
|
Internal exchanges are used to construct wiring that is not visible to applications. |
|
809
|
|
|
|
|
|
|
|
|
810
|
|
|
|
|
|
|
=back |
|
811
|
|
|
|
|
|
|
|
|
812
|
|
|
|
|
|
|
=head2 declare_exchange_p |
|
813
|
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
815
|
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
$channel->declare_exchange_p( |
|
817
|
|
|
|
|
|
|
exchange => 'mojo', |
|
818
|
|
|
|
|
|
|
type => 'fanout', |
|
819
|
|
|
|
|
|
|
durable => 1, |
|
820
|
|
|
|
|
|
|
... |
|
821
|
|
|
|
|
|
|
)->then(sub { |
|
822
|
|
|
|
|
|
|
say "Exchange declared..."; |
|
823
|
|
|
|
|
|
|
})->catch(sub { |
|
824
|
|
|
|
|
|
|
my $err = shift; |
|
825
|
|
|
|
|
|
|
warn "Exchange declaration error: $err"; |
|
826
|
|
|
|
|
|
|
})->wait; |
|
827
|
|
|
|
|
|
|
|
|
828
|
|
|
|
|
|
|
=head2 delete_exchange |
|
829
|
|
|
|
|
|
|
|
|
830
|
|
|
|
|
|
|
$channel->delete_exchange(exchange => 'mojo')->deliver; |
|
831
|
|
|
|
|
|
|
|
|
832
|
|
|
|
|
|
|
Delete an exchange. |
|
833
|
|
|
|
|
|
|
|
|
834
|
|
|
|
|
|
|
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange |
|
835
|
|
|
|
|
|
|
are cancelled. |
|
836
|
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
Following arguments are accepted: |
|
838
|
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
=over 2 |
|
840
|
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
=item exchange |
|
842
|
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
Exchange name. |
|
844
|
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
=item if_unused |
|
846
|
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
If set, the server will only delete the exchange if it has no queue bindings. If the exchange has |
|
848
|
|
|
|
|
|
|
queue bindings the server does not delete it but raises a channel exception instead. |
|
849
|
|
|
|
|
|
|
|
|
850
|
|
|
|
|
|
|
=back |
|
851
|
|
|
|
|
|
|
|
|
852
|
|
|
|
|
|
|
=head2 delete_exchange_p |
|
853
|
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
855
|
|
|
|
|
|
|
|
|
856
|
|
|
|
|
|
|
$channel->delete_exchange_p( |
|
857
|
|
|
|
|
|
|
exchange => 'mojo' |
|
858
|
|
|
|
|
|
|
)->then(sub { |
|
859
|
|
|
|
|
|
|
say "Exchange deleted..."; |
|
860
|
|
|
|
|
|
|
})->catch(sub { |
|
861
|
|
|
|
|
|
|
my $err = shift; |
|
862
|
|
|
|
|
|
|
warn "Exchange removal error: $err"; |
|
863
|
|
|
|
|
|
|
})->wait; |
|
864
|
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
=head2 declare_queue |
|
866
|
|
|
|
|
|
|
|
|
867
|
|
|
|
|
|
|
my $queue = $channel->declare_queue(queue => 'mq', durable => 1)->deliver |
|
868
|
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
Declare queue, create if needed. |
|
870
|
|
|
|
|
|
|
|
|
871
|
|
|
|
|
|
|
This method creates or checks a queue. When creating a new queue the client can |
|
872
|
|
|
|
|
|
|
specify various properties that control the durability of the queue and its contents, |
|
873
|
|
|
|
|
|
|
and the level of sharing for the queue. |
|
874
|
|
|
|
|
|
|
|
|
875
|
|
|
|
|
|
|
Following arguments are accepted: |
|
876
|
|
|
|
|
|
|
|
|
877
|
|
|
|
|
|
|
=over 2 |
|
878
|
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
=item queue |
|
880
|
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
The queue name MAY be empty, in which case the server MUST create a new queue with |
|
882
|
|
|
|
|
|
|
a unique generated name and return this to the client in the Declare-Ok method. |
|
883
|
|
|
|
|
|
|
|
|
884
|
|
|
|
|
|
|
=item passive |
|
885
|
|
|
|
|
|
|
|
|
886
|
|
|
|
|
|
|
If set, the server will reply with Declare-Ok if the queue already exists with the same |
|
887
|
|
|
|
|
|
|
name, and raise an error if not. The client can use this to check whether a queue exists |
|
888
|
|
|
|
|
|
|
without modifying the server state. When set, all other method fields except name and |
|
889
|
|
|
|
|
|
|
no-wait are ignored. A declare with both passive and no-wait has no effect. |
|
890
|
|
|
|
|
|
|
Arguments are compared for semantic equivalence. |
|
891
|
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
=item durable |
|
893
|
|
|
|
|
|
|
|
|
894
|
|
|
|
|
|
|
If set when creating a new queue, the queue will be marked as durable. Durable queues |
|
895
|
|
|
|
|
|
|
remain active when a server restarts. Non-durable queues (transient queues) are purged |
|
896
|
|
|
|
|
|
|
if/when a server restarts. Note that durable queues do not necessarily hold persistent |
|
897
|
|
|
|
|
|
|
messages, although it does not make sense to send persistent messages to a transient queue. |
|
898
|
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
=item exclusive |
|
900
|
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
Exclusive queues may only be accessed by the current connection, and are deleted when |
|
902
|
|
|
|
|
|
|
that connection closes. Passive declaration of an exclusive queue by other connections are |
|
903
|
|
|
|
|
|
|
not allowed. |
|
904
|
|
|
|
|
|
|
|
|
905
|
|
|
|
|
|
|
=item auto_delete |
|
906
|
|
|
|
|
|
|
|
|
907
|
|
|
|
|
|
|
If set, the queue is deleted when all consumers have finished using it. The last consumer |
|
908
|
|
|
|
|
|
|
can be cancelled either explicitly or because its channel is closed. If there was no consumer |
|
909
|
|
|
|
|
|
|
ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues |
|
910
|
|
|
|
|
|
|
using the Delete method as normal. |
|
911
|
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
=back |
|
913
|
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
=head2 declare_queue_p |
|
915
|
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
917
|
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
$channel->declare_queue_p( |
|
919
|
|
|
|
|
|
|
queue => 'mq', |
|
920
|
|
|
|
|
|
|
durable => 1 |
|
921
|
|
|
|
|
|
|
)->then(sub { |
|
922
|
|
|
|
|
|
|
say "Queue declared..."; |
|
923
|
|
|
|
|
|
|
})->catch(sub { |
|
924
|
|
|
|
|
|
|
my $err = shift; |
|
925
|
|
|
|
|
|
|
warn "Queue declaration error: $err"; |
|
926
|
|
|
|
|
|
|
})->wait; |
|
927
|
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
=head2 bind_queue |
|
929
|
|
|
|
|
|
|
|
|
930
|
|
|
|
|
|
|
$channel->bind_queue( |
|
931
|
|
|
|
|
|
|
exchange => 'mojo', |
|
932
|
|
|
|
|
|
|
queue => 'mq', |
|
933
|
|
|
|
|
|
|
routing_key => '' |
|
934
|
|
|
|
|
|
|
)->deliver; |
|
935
|
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
Bind queue to an exchange. |
|
937
|
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
This method binds a queue to an exchange. Until a queue is bound it will |
|
939
|
|
|
|
|
|
|
not receive any messages. In a classic messaging model, store-and-forward |
|
940
|
|
|
|
|
|
|
queues are bound to a direct exchange and subscription queues are bound |
|
941
|
|
|
|
|
|
|
to a topic exchange. |
|
942
|
|
|
|
|
|
|
|
|
943
|
|
|
|
|
|
|
Following arguments are accepted: |
|
944
|
|
|
|
|
|
|
|
|
945
|
|
|
|
|
|
|
=over 2 |
|
946
|
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
=item queue |
|
948
|
|
|
|
|
|
|
|
|
949
|
|
|
|
|
|
|
Specifies the name of the queue to bind. |
|
950
|
|
|
|
|
|
|
|
|
951
|
|
|
|
|
|
|
=item exchange |
|
952
|
|
|
|
|
|
|
|
|
953
|
|
|
|
|
|
|
Name of the exchange to bind to. |
|
954
|
|
|
|
|
|
|
|
|
955
|
|
|
|
|
|
|
=item routing_key |
|
956
|
|
|
|
|
|
|
|
|
957
|
|
|
|
|
|
|
Specifies the routing key for the binding. The routing key is used for |
|
958
|
|
|
|
|
|
|
routing messages depending on the exchange configuration. Not all exchanges |
|
959
|
|
|
|
|
|
|
use a routing key - refer to the specific exchange documentation. If the |
|
960
|
|
|
|
|
|
|
queue name is empty, the server uses the last queue declared on the channel. |
|
961
|
|
|
|
|
|
|
If the routing key is also empty, the server uses this queue name for the |
|
962
|
|
|
|
|
|
|
routing key as well. If the queue name is provided but the routing key is |
|
963
|
|
|
|
|
|
|
empty, the server does the binding with that empty routing key. The meaning |
|
964
|
|
|
|
|
|
|
of empty routing keys depends on the exchange implementation. |
|
965
|
|
|
|
|
|
|
|
|
966
|
|
|
|
|
|
|
=back |
|
967
|
|
|
|
|
|
|
|
|
968
|
|
|
|
|
|
|
=head2 bind_queue_p |
|
969
|
|
|
|
|
|
|
|
|
970
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
971
|
|
|
|
|
|
|
|
|
972
|
|
|
|
|
|
|
$channel->bind_queue_p( |
|
973
|
|
|
|
|
|
|
exchange => 'mojo', |
|
974
|
|
|
|
|
|
|
queue => 'mq', |
|
975
|
|
|
|
|
|
|
routing_key => '' |
|
976
|
|
|
|
|
|
|
)->then(sub { |
|
977
|
|
|
|
|
|
|
say "Queue bound..."; |
|
978
|
|
|
|
|
|
|
})->catch(sub { |
|
979
|
|
|
|
|
|
|
my $err = shift; |
|
980
|
|
|
|
|
|
|
warn "Queue binding error: $err"; |
|
981
|
|
|
|
|
|
|
})->wait; |
|
982
|
|
|
|
|
|
|
|
|
983
|
|
|
|
|
|
|
=head2 unbind_queue |
|
984
|
|
|
|
|
|
|
|
|
985
|
|
|
|
|
|
|
$channel->unbind_queue( |
|
986
|
|
|
|
|
|
|
exchange => 'mojo', |
|
987
|
|
|
|
|
|
|
queue => 'mq', |
|
988
|
|
|
|
|
|
|
routing_key => '' |
|
989
|
|
|
|
|
|
|
)->deliver; |
|
990
|
|
|
|
|
|
|
|
|
991
|
|
|
|
|
|
|
Unbind a queue from an exchange. |
|
992
|
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
This method unbinds a queue from an exchange. |
|
994
|
|
|
|
|
|
|
|
|
995
|
|
|
|
|
|
|
Following arguments are accepted: |
|
996
|
|
|
|
|
|
|
|
|
997
|
|
|
|
|
|
|
=over 2 |
|
998
|
|
|
|
|
|
|
|
|
999
|
|
|
|
|
|
|
=item queue |
|
1000
|
|
|
|
|
|
|
|
|
1001
|
|
|
|
|
|
|
Specifies the name of the queue to unbind. |
|
1002
|
|
|
|
|
|
|
|
|
1003
|
|
|
|
|
|
|
=item exchange |
|
1004
|
|
|
|
|
|
|
|
|
1005
|
|
|
|
|
|
|
The name of the exchange to unbind from. |
|
1006
|
|
|
|
|
|
|
|
|
1007
|
|
|
|
|
|
|
=item routing_key |
|
1008
|
|
|
|
|
|
|
|
|
1009
|
|
|
|
|
|
|
Specifies the routing key of the binding to unbind. |
|
1010
|
|
|
|
|
|
|
|
|
1011
|
|
|
|
|
|
|
=back |
|
1012
|
|
|
|
|
|
|
|
|
1013
|
|
|
|
|
|
|
=head2 unbind_queue_p |
|
1014
|
|
|
|
|
|
|
|
|
1015
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
1016
|
|
|
|
|
|
|
|
|
1017
|
|
|
|
|
|
|
$channel->unbind_queue_p( |
|
1018
|
|
|
|
|
|
|
exchange => 'mojo', |
|
1019
|
|
|
|
|
|
|
queue => 'mq', |
|
1020
|
|
|
|
|
|
|
routing_key => '' |
|
1021
|
|
|
|
|
|
|
)->then(sub { |
|
1022
|
|
|
|
|
|
|
say "Queue unbound..."; |
|
1023
|
|
|
|
|
|
|
})->catch(sub { |
|
1024
|
|
|
|
|
|
|
my $err = shift; |
|
1025
|
|
|
|
|
|
|
warn "Queue unbinding error: $err"; |
|
1026
|
|
|
|
|
|
|
})->wait; |
|
1027
|
|
|
|
|
|
|
|
|
1028
|
|
|
|
|
|
|
=head2 purge_queue |
|
1029
|
|
|
|
|
|
|
|
|
1030
|
|
|
|
|
|
|
$channel->purge_queue(queue => 'mq')->deliver; |
|
1031
|
|
|
|
|
|
|
|
|
1032
|
|
|
|
|
|
|
Purge a queue. |
|
1033
|
|
|
|
|
|
|
|
|
1034
|
|
|
|
|
|
|
This method removes all messages from a queue which are not awaiting acknowledgment. |
|
1035
|
|
|
|
|
|
|
|
|
1036
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1037
|
|
|
|
|
|
|
|
|
1038
|
|
|
|
|
|
|
=over 2 |
|
1039
|
|
|
|
|
|
|
|
|
1040
|
|
|
|
|
|
|
=item queue |
|
1041
|
|
|
|
|
|
|
|
|
1042
|
|
|
|
|
|
|
Specifies the name of the queue to purge. |
|
1043
|
|
|
|
|
|
|
|
|
1044
|
|
|
|
|
|
|
=back |
|
1045
|
|
|
|
|
|
|
|
|
1046
|
|
|
|
|
|
|
=head2 purge_queue_p |
|
1047
|
|
|
|
|
|
|
|
|
1048
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
1049
|
|
|
|
|
|
|
|
|
1050
|
|
|
|
|
|
|
$channel->purge_queue_p( |
|
1051
|
|
|
|
|
|
|
queue => 'mq', |
|
1052
|
|
|
|
|
|
|
)->then(sub { |
|
1053
|
|
|
|
|
|
|
say "Queue purged..."; |
|
1054
|
|
|
|
|
|
|
})->catch(sub { |
|
1055
|
|
|
|
|
|
|
my $err = shift; |
|
1056
|
|
|
|
|
|
|
warn "Queue purging error: $err"; |
|
1057
|
|
|
|
|
|
|
})->wait; |
|
1058
|
|
|
|
|
|
|
|
|
1059
|
|
|
|
|
|
|
=head2 delete_queue |
|
1060
|
|
|
|
|
|
|
|
|
1061
|
|
|
|
|
|
|
$channel->delete_queue(queue => 'mq', if_empty => 1)->deliver; |
|
1062
|
|
|
|
|
|
|
|
|
1063
|
|
|
|
|
|
|
Delete a queue. |
|
1064
|
|
|
|
|
|
|
|
|
1065
|
|
|
|
|
|
|
This method deletes a queue. When a queue is deleted any pending messages |
|
1066
|
|
|
|
|
|
|
are sent to a dead-letter queue if this is defined in the server configuration, |
|
1067
|
|
|
|
|
|
|
and all consumers on the queue are cancelled. |
|
1068
|
|
|
|
|
|
|
|
|
1069
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1070
|
|
|
|
|
|
|
|
|
1071
|
|
|
|
|
|
|
=over 2 |
|
1072
|
|
|
|
|
|
|
|
|
1073
|
|
|
|
|
|
|
=item queue |
|
1074
|
|
|
|
|
|
|
|
|
1075
|
|
|
|
|
|
|
Specifies the name of the queue to delete. |
|
1076
|
|
|
|
|
|
|
|
|
1077
|
|
|
|
|
|
|
=item if_unused |
|
1078
|
|
|
|
|
|
|
|
|
1079
|
|
|
|
|
|
|
If set, the server will only delete the queue if it has no consumers. If the queue |
|
1080
|
|
|
|
|
|
|
has consumers the server does does not delete it but raises a channel exception instead. |
|
1081
|
|
|
|
|
|
|
|
|
1082
|
|
|
|
|
|
|
=item if_empty |
|
1083
|
|
|
|
|
|
|
|
|
1084
|
|
|
|
|
|
|
If set, the server will only delete the queue if it has no messages. |
|
1085
|
|
|
|
|
|
|
|
|
1086
|
|
|
|
|
|
|
=back |
|
1087
|
|
|
|
|
|
|
|
|
1088
|
|
|
|
|
|
|
=head2 delete_queue_p |
|
1089
|
|
|
|
|
|
|
|
|
1090
|
|
|
|
|
|
|
Same as L but auto-delivers method and returns a L object. |
|
1091
|
|
|
|
|
|
|
|
|
1092
|
|
|
|
|
|
|
$channel->delete_queue_p( |
|
1093
|
|
|
|
|
|
|
queue => 'mq', |
|
1094
|
|
|
|
|
|
|
if_empty => 1 |
|
1095
|
|
|
|
|
|
|
)->then(sub { |
|
1096
|
|
|
|
|
|
|
say "Queue removed..."; |
|
1097
|
|
|
|
|
|
|
})->catch(sub { |
|
1098
|
|
|
|
|
|
|
my $err = shift; |
|
1099
|
|
|
|
|
|
|
warn "Queue removal error: $err"; |
|
1100
|
|
|
|
|
|
|
})->wait; |
|
1101
|
|
|
|
|
|
|
|
|
1102
|
|
|
|
|
|
|
=head2 publish |
|
1103
|
|
|
|
|
|
|
|
|
1104
|
|
|
|
|
|
|
my $message = $channel->publish( |
|
1105
|
|
|
|
|
|
|
exchange => 'mojo', |
|
1106
|
|
|
|
|
|
|
routing_key => 'mq', |
|
1107
|
|
|
|
|
|
|
body => 'simple text body', |
|
1108
|
|
|
|
|
|
|
); |
|
1109
|
|
|
|
|
|
|
$message->deliver(); |
|
1110
|
|
|
|
|
|
|
|
|
1111
|
|
|
|
|
|
|
Publish a message. |
|
1112
|
|
|
|
|
|
|
|
|
1113
|
|
|
|
|
|
|
This method publishes a message to a specific exchange. The message will be |
|
1114
|
|
|
|
|
|
|
routed to queues as defined by the exchange configuration and distributed to |
|
1115
|
|
|
|
|
|
|
any active consumers when the transaction, if any, is committed. |
|
1116
|
|
|
|
|
|
|
|
|
1117
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1118
|
|
|
|
|
|
|
|
|
1119
|
|
|
|
|
|
|
=over 2 |
|
1120
|
|
|
|
|
|
|
|
|
1121
|
|
|
|
|
|
|
=item exchange |
|
1122
|
|
|
|
|
|
|
|
|
1123
|
|
|
|
|
|
|
Specifies the name of the exchange to publish to. The exchange name can be empty, |
|
1124
|
|
|
|
|
|
|
meaning the default exchange. If the exchange name is specified, and that exchange |
|
1125
|
|
|
|
|
|
|
does not exist, the server will raise a channel exception. |
|
1126
|
|
|
|
|
|
|
|
|
1127
|
|
|
|
|
|
|
=item routing_key |
|
1128
|
|
|
|
|
|
|
|
|
1129
|
|
|
|
|
|
|
Specifies the routing key for the message. The routing key is used for routing |
|
1130
|
|
|
|
|
|
|
messages depending on the exchange configuration. |
|
1131
|
|
|
|
|
|
|
|
|
1132
|
|
|
|
|
|
|
=item mandatory |
|
1133
|
|
|
|
|
|
|
|
|
1134
|
|
|
|
|
|
|
This flag tells the server how to react if the message cannot be routed to a queue. |
|
1135
|
|
|
|
|
|
|
If this flag is set, the server will return an unroutable message with a Return method. |
|
1136
|
|
|
|
|
|
|
If this flag is zero, the server silently drops the message. |
|
1137
|
|
|
|
|
|
|
|
|
1138
|
|
|
|
|
|
|
All rejections are emitted as C event. |
|
1139
|
|
|
|
|
|
|
|
|
1140
|
|
|
|
|
|
|
$message->on(reject => sub { |
|
1141
|
|
|
|
|
|
|
my $message = shift; |
|
1142
|
|
|
|
|
|
|
my $frame = shift; |
|
1143
|
|
|
|
|
|
|
my $method_frame = $frame->method_frame; |
|
1144
|
|
|
|
|
|
|
|
|
1145
|
|
|
|
|
|
|
my $reply_code = $method_frame->reply_code; |
|
1146
|
|
|
|
|
|
|
my $reply_text = $method_frame->reply_text; |
|
1147
|
|
|
|
|
|
|
}); |
|
1148
|
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
=item immediate |
|
1150
|
|
|
|
|
|
|
|
|
1151
|
|
|
|
|
|
|
This flag tells the server how to react if the message cannot be routed to a queue consumer |
|
1152
|
|
|
|
|
|
|
immediately. If this flag is set, the server will return an undeliverable message with a |
|
1153
|
|
|
|
|
|
|
Return method. If this flag is zero, the server will queue the message, but with no guarantee |
|
1154
|
|
|
|
|
|
|
that it will ever be consumed. |
|
1155
|
|
|
|
|
|
|
|
|
1156
|
|
|
|
|
|
|
As said above, all rejections are emitted as C event. |
|
1157
|
|
|
|
|
|
|
|
|
1158
|
|
|
|
|
|
|
$message->on(reject => sub { ... }); |
|
1159
|
|
|
|
|
|
|
|
|
1160
|
|
|
|
|
|
|
=back |
|
1161
|
|
|
|
|
|
|
|
|
1162
|
|
|
|
|
|
|
=head2 consume |
|
1163
|
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
my $consumer = $channel->consume(queue => 'mq'); |
|
1165
|
|
|
|
|
|
|
$consumer->on(message => sub { ... }); |
|
1166
|
|
|
|
|
|
|
$consumer->deliver; |
|
1167
|
|
|
|
|
|
|
|
|
1168
|
|
|
|
|
|
|
This method asks the server to start a "consumer", which is a transient request for messages from a |
|
1169
|
|
|
|
|
|
|
specific queue. Consumers last as long as the channel they were declared on, or until the client cancels |
|
1170
|
|
|
|
|
|
|
them. |
|
1171
|
|
|
|
|
|
|
|
|
1172
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1173
|
|
|
|
|
|
|
|
|
1174
|
|
|
|
|
|
|
=over 2 |
|
1175
|
|
|
|
|
|
|
|
|
1176
|
|
|
|
|
|
|
=item queue |
|
1177
|
|
|
|
|
|
|
|
|
1178
|
|
|
|
|
|
|
Specifies the name of the queue to consume from. |
|
1179
|
|
|
|
|
|
|
|
|
1180
|
|
|
|
|
|
|
=item consumer_tag |
|
1181
|
|
|
|
|
|
|
|
|
1182
|
|
|
|
|
|
|
Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the |
|
1183
|
|
|
|
|
|
|
same consumer tags. If this field is empty the server will generate a unique tag. |
|
1184
|
|
|
|
|
|
|
|
|
1185
|
|
|
|
|
|
|
$consumer->on(success => sub { |
|
1186
|
|
|
|
|
|
|
my $consumer = shift; |
|
1187
|
|
|
|
|
|
|
my $frame = shift; |
|
1188
|
|
|
|
|
|
|
|
|
1189
|
|
|
|
|
|
|
my $consumer_tag = $frame->method_frame->consumer_tag; |
|
1190
|
|
|
|
|
|
|
}); |
|
1191
|
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
=item no_local (not implemented in RabbitMQ!) |
|
1193
|
|
|
|
|
|
|
|
|
1194
|
|
|
|
|
|
|
If the no-local field is set the server will not send messages to the connection that published them. |
|
1195
|
|
|
|
|
|
|
|
|
1196
|
|
|
|
|
|
|
See L |
|
1197
|
|
|
|
|
|
|
|
|
1198
|
|
|
|
|
|
|
=item no_ack |
|
1199
|
|
|
|
|
|
|
|
|
1200
|
|
|
|
|
|
|
If this field is set the server does not expect acknowledgements for messages. That is, when a message |
|
1201
|
|
|
|
|
|
|
is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. |
|
1202
|
|
|
|
|
|
|
This functionality may increase performance but at the cost of reliability. Messages can get lost if |
|
1203
|
|
|
|
|
|
|
a client dies before they are delivered to the application. |
|
1204
|
|
|
|
|
|
|
|
|
1205
|
|
|
|
|
|
|
=item exclusive |
|
1206
|
|
|
|
|
|
|
|
|
1207
|
|
|
|
|
|
|
Request exclusive consumer access, meaning only this consumer can access the queue. |
|
1208
|
|
|
|
|
|
|
|
|
1209
|
|
|
|
|
|
|
=back |
|
1210
|
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
=head2 cancel |
|
1212
|
|
|
|
|
|
|
|
|
1213
|
|
|
|
|
|
|
$channel->cancel(consumer_tag => 'amq.ctag....')->deliver; |
|
1214
|
|
|
|
|
|
|
|
|
1215
|
|
|
|
|
|
|
End a queue consumer. |
|
1216
|
|
|
|
|
|
|
|
|
1217
|
|
|
|
|
|
|
This method cancels a consumer. This does not affect already delivered messages, but |
|
1218
|
|
|
|
|
|
|
it does mean the server will not send any more messages for that consumer. The client |
|
1219
|
|
|
|
|
|
|
may receive an arbitrary number of messages in between sending the cancel method and |
|
1220
|
|
|
|
|
|
|
receiving the cancel-ok reply. |
|
1221
|
|
|
|
|
|
|
|
|
1222
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1223
|
|
|
|
|
|
|
|
|
1224
|
|
|
|
|
|
|
=over 2 |
|
1225
|
|
|
|
|
|
|
|
|
1226
|
|
|
|
|
|
|
=item consumer_tag |
|
1227
|
|
|
|
|
|
|
|
|
1228
|
|
|
|
|
|
|
Holds the consumer tag specified by the client or provided by the server. |
|
1229
|
|
|
|
|
|
|
|
|
1230
|
|
|
|
|
|
|
=back |
|
1231
|
|
|
|
|
|
|
|
|
1232
|
|
|
|
|
|
|
=head2 get |
|
1233
|
|
|
|
|
|
|
|
|
1234
|
|
|
|
|
|
|
my $get = $channel->get(queue => 'mq') |
|
1235
|
|
|
|
|
|
|
$get->deliver; |
|
1236
|
|
|
|
|
|
|
|
|
1237
|
|
|
|
|
|
|
Direct access to a queue. |
|
1238
|
|
|
|
|
|
|
|
|
1239
|
|
|
|
|
|
|
This method provides a direct access to the messages in a queue using |
|
1240
|
|
|
|
|
|
|
a synchronous dialogue that is designed for specific types of application |
|
1241
|
|
|
|
|
|
|
where synchronous functionality is more important than performance. |
|
1242
|
|
|
|
|
|
|
|
|
1243
|
|
|
|
|
|
|
This is simple event emitter to which you have to subscribe. It can emit: |
|
1244
|
|
|
|
|
|
|
|
|
1245
|
|
|
|
|
|
|
=over 2 |
|
1246
|
|
|
|
|
|
|
|
|
1247
|
|
|
|
|
|
|
=item message |
|
1248
|
|
|
|
|
|
|
|
|
1249
|
|
|
|
|
|
|
Provide client with a message. |
|
1250
|
|
|
|
|
|
|
|
|
1251
|
|
|
|
|
|
|
This method delivers a message to the client following a get method. A message |
|
1252
|
|
|
|
|
|
|
delivered by 'get-ok' must be acknowledged unless the no-ack option was set |
|
1253
|
|
|
|
|
|
|
in the get method. |
|
1254
|
|
|
|
|
|
|
|
|
1255
|
|
|
|
|
|
|
You can access all get-ok reply parameters as below: |
|
1256
|
|
|
|
|
|
|
|
|
1257
|
|
|
|
|
|
|
$get->on(message => sub { |
|
1258
|
|
|
|
|
|
|
my $get = shift; |
|
1259
|
|
|
|
|
|
|
my $get_ok = shift; |
|
1260
|
|
|
|
|
|
|
my $message = shift; |
|
1261
|
|
|
|
|
|
|
|
|
1262
|
|
|
|
|
|
|
say "Still got: " . $get_ok->method_frame->message_count; |
|
1263
|
|
|
|
|
|
|
}); |
|
1264
|
|
|
|
|
|
|
|
|
1265
|
|
|
|
|
|
|
=item empty |
|
1266
|
|
|
|
|
|
|
|
|
1267
|
|
|
|
|
|
|
Indicate no messages available. |
|
1268
|
|
|
|
|
|
|
|
|
1269
|
|
|
|
|
|
|
This method tells the client that the queue has no messages available for the |
|
1270
|
|
|
|
|
|
|
client. |
|
1271
|
|
|
|
|
|
|
|
|
1272
|
|
|
|
|
|
|
=back |
|
1273
|
|
|
|
|
|
|
|
|
1274
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1275
|
|
|
|
|
|
|
|
|
1276
|
|
|
|
|
|
|
=over 2 |
|
1277
|
|
|
|
|
|
|
|
|
1278
|
|
|
|
|
|
|
=item queue |
|
1279
|
|
|
|
|
|
|
|
|
1280
|
|
|
|
|
|
|
Specifies the name of the queue to get a message from. |
|
1281
|
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
=item no_ack |
|
1283
|
|
|
|
|
|
|
|
|
1284
|
|
|
|
|
|
|
If this field is set the server does not expect acknowledgements for messages. That is, when a message |
|
1285
|
|
|
|
|
|
|
is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. |
|
1286
|
|
|
|
|
|
|
This functionality may increase performance but at the cost of reliability. Messages can get lost if |
|
1287
|
|
|
|
|
|
|
a client dies before they are delivered to the application. |
|
1288
|
|
|
|
|
|
|
|
|
1289
|
|
|
|
|
|
|
=back |
|
1290
|
|
|
|
|
|
|
|
|
1291
|
|
|
|
|
|
|
=head2 ack |
|
1292
|
|
|
|
|
|
|
|
|
1293
|
|
|
|
|
|
|
$channel->ack(delivery_tag => 1); |
|
1294
|
|
|
|
|
|
|
|
|
1295
|
|
|
|
|
|
|
Acknowledge one or more messages. |
|
1296
|
|
|
|
|
|
|
|
|
1297
|
|
|
|
|
|
|
When sent by the client, this method acknowledges one or more messages |
|
1298
|
|
|
|
|
|
|
delivered via the Deliver or Get-Ok methods. When sent by server, this |
|
1299
|
|
|
|
|
|
|
method acknowledges one or more messages published with the Publish |
|
1300
|
|
|
|
|
|
|
method on a channel in confirm mode. The acknowledgement can be for |
|
1301
|
|
|
|
|
|
|
a single message or a set of messages up to and including a specific |
|
1302
|
|
|
|
|
|
|
message. |
|
1303
|
|
|
|
|
|
|
|
|
1304
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1305
|
|
|
|
|
|
|
|
|
1306
|
|
|
|
|
|
|
=over 2 |
|
1307
|
|
|
|
|
|
|
|
|
1308
|
|
|
|
|
|
|
=item delivery_tag |
|
1309
|
|
|
|
|
|
|
|
|
1310
|
|
|
|
|
|
|
Server assigned delivery tag that was received with a message. |
|
1311
|
|
|
|
|
|
|
|
|
1312
|
|
|
|
|
|
|
=item multiple |
|
1313
|
|
|
|
|
|
|
|
|
1314
|
|
|
|
|
|
|
If set to 1, the delivery tag is treated as "up to and including", so |
|
1315
|
|
|
|
|
|
|
that multiple messages can be acknowledged with a single method. If set |
|
1316
|
|
|
|
|
|
|
to zero, the delivery tag refers to a single message. If the multiple |
|
1317
|
|
|
|
|
|
|
field is 1, and the delivery tag is zero, this indicates acknowledgement |
|
1318
|
|
|
|
|
|
|
of all outstanding messages. |
|
1319
|
|
|
|
|
|
|
|
|
1320
|
|
|
|
|
|
|
=back |
|
1321
|
|
|
|
|
|
|
|
|
1322
|
|
|
|
|
|
|
=head2 qos |
|
1323
|
|
|
|
|
|
|
|
|
1324
|
|
|
|
|
|
|
$channel->qos(prefetch_count => 1)->deliver; |
|
1325
|
|
|
|
|
|
|
|
|
1326
|
|
|
|
|
|
|
Sets specified Quality of Service to channel, or entire connection. Accepts following arguments: |
|
1327
|
|
|
|
|
|
|
|
|
1328
|
|
|
|
|
|
|
=over 2 |
|
1329
|
|
|
|
|
|
|
|
|
1330
|
|
|
|
|
|
|
=item prefetch_size |
|
1331
|
|
|
|
|
|
|
|
|
1332
|
|
|
|
|
|
|
Prefetch window size in octets. |
|
1333
|
|
|
|
|
|
|
|
|
1334
|
|
|
|
|
|
|
=item prefetch_count |
|
1335
|
|
|
|
|
|
|
|
|
1336
|
|
|
|
|
|
|
Prefetch window in complete messages. |
|
1337
|
|
|
|
|
|
|
|
|
1338
|
|
|
|
|
|
|
=item global |
|
1339
|
|
|
|
|
|
|
|
|
1340
|
|
|
|
|
|
|
If set all settings will be applied connection wide. |
|
1341
|
|
|
|
|
|
|
|
|
1342
|
|
|
|
|
|
|
=back |
|
1343
|
|
|
|
|
|
|
|
|
1344
|
|
|
|
|
|
|
=head2 recover |
|
1345
|
|
|
|
|
|
|
|
|
1346
|
|
|
|
|
|
|
$channel->recover(requeue => 0)->deliver; |
|
1347
|
|
|
|
|
|
|
|
|
1348
|
|
|
|
|
|
|
Redeliver unacknowledged messages. |
|
1349
|
|
|
|
|
|
|
|
|
1350
|
|
|
|
|
|
|
This method asks the server to redeliver all unacknowledged messages |
|
1351
|
|
|
|
|
|
|
on a specified channel. Zero or more messages may be redelivered. |
|
1352
|
|
|
|
|
|
|
|
|
1353
|
|
|
|
|
|
|
=over 2 |
|
1354
|
|
|
|
|
|
|
|
|
1355
|
|
|
|
|
|
|
=item requeue |
|
1356
|
|
|
|
|
|
|
|
|
1357
|
|
|
|
|
|
|
If this field is zero, the message will be redelivered to the original |
|
1358
|
|
|
|
|
|
|
recipient. If this bit is 1, the server will attempt to requeue the |
|
1359
|
|
|
|
|
|
|
message, potentially then delivering it to an alternative subscriber. |
|
1360
|
|
|
|
|
|
|
|
|
1361
|
|
|
|
|
|
|
=back |
|
1362
|
|
|
|
|
|
|
|
|
1363
|
|
|
|
|
|
|
=head2 reject |
|
1364
|
|
|
|
|
|
|
|
|
1365
|
|
|
|
|
|
|
$channel->reject(delivery_tag => 1, requeue => 0)->deliver; |
|
1366
|
|
|
|
|
|
|
|
|
1367
|
|
|
|
|
|
|
Reject an incoming message. |
|
1368
|
|
|
|
|
|
|
|
|
1369
|
|
|
|
|
|
|
This method allows a client to reject a message. It can be |
|
1370
|
|
|
|
|
|
|
used to interrupt and cancel large incoming messages, or |
|
1371
|
|
|
|
|
|
|
return untreatable messages to their original queue. |
|
1372
|
|
|
|
|
|
|
|
|
1373
|
|
|
|
|
|
|
Following arguments are accepted: |
|
1374
|
|
|
|
|
|
|
|
|
1375
|
|
|
|
|
|
|
=over 2 |
|
1376
|
|
|
|
|
|
|
|
|
1377
|
|
|
|
|
|
|
=item delivery_tag |
|
1378
|
|
|
|
|
|
|
|
|
1379
|
|
|
|
|
|
|
Server assigned delivery tag that was received with a message. |
|
1380
|
|
|
|
|
|
|
|
|
1381
|
|
|
|
|
|
|
=item requeue |
|
1382
|
|
|
|
|
|
|
|
|
1383
|
|
|
|
|
|
|
If requeue is true, the server will attempt to requeue the message. |
|
1384
|
|
|
|
|
|
|
If requeue is false or the requeue attempt fails the messages are |
|
1385
|
|
|
|
|
|
|
discarded or dead-lettered. |
|
1386
|
|
|
|
|
|
|
|
|
1387
|
|
|
|
|
|
|
=back |
|
1388
|
|
|
|
|
|
|
|
|
1389
|
|
|
|
|
|
|
=head2 select_tx |
|
1390
|
|
|
|
|
|
|
|
|
1391
|
|
|
|
|
|
|
Work with transactions. |
|
1392
|
|
|
|
|
|
|
|
|
1393
|
|
|
|
|
|
|
The Tx class allows publish and ack operations to be batched into atomic units of work. |
|
1394
|
|
|
|
|
|
|
The intention is that all publish and ack requests issued within a transaction will |
|
1395
|
|
|
|
|
|
|
complete successfully or none of them will. Servers SHOULD implement atomic transactions |
|
1396
|
|
|
|
|
|
|
at least where all publish or ack requests affect a single queue. Transactions that cover |
|
1397
|
|
|
|
|
|
|
multiple queues may be non-atomic, given that queues can be created and destroyed |
|
1398
|
|
|
|
|
|
|
asynchronously, and such events do not form part of any transaction. |
|
1399
|
|
|
|
|
|
|
Further, the behaviour of transactions with respect to the immediate and mandatory flags |
|
1400
|
|
|
|
|
|
|
on Basic.Publish methods is not defined. |
|
1401
|
|
|
|
|
|
|
|
|
1402
|
|
|
|
|
|
|
$channel->select_tx()->deliver; |
|
1403
|
|
|
|
|
|
|
|
|
1404
|
|
|
|
|
|
|
Select standard transaction mode. |
|
1405
|
|
|
|
|
|
|
|
|
1406
|
|
|
|
|
|
|
This method sets the channel to use standard transactions. The client must use this method |
|
1407
|
|
|
|
|
|
|
at least once on a channel before using the Commit or Rollback methods. |
|
1408
|
|
|
|
|
|
|
|
|
1409
|
|
|
|
|
|
|
=head2 commit_tx |
|
1410
|
|
|
|
|
|
|
|
|
1411
|
|
|
|
|
|
|
$channel->commit_tx()->deliver; |
|
1412
|
|
|
|
|
|
|
|
|
1413
|
|
|
|
|
|
|
Commit the current transaction. |
|
1414
|
|
|
|
|
|
|
|
|
1415
|
|
|
|
|
|
|
This method commits all message publications and acknowledgments performed in the current |
|
1416
|
|
|
|
|
|
|
transaction. A new transaction starts immediately after a commit. |
|
1417
|
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
=head2 rollback_tx |
|
1419
|
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
$channel->rollback_tx()->deliver; |
|
1421
|
|
|
|
|
|
|
|
|
1422
|
|
|
|
|
|
|
Abandon the current transaction. |
|
1423
|
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
This method abandons all message publications and acknowledgments performed in the current |
|
1425
|
|
|
|
|
|
|
transaction. A new transaction starts immediately after a rollback. Note that unacked messages |
|
1426
|
|
|
|
|
|
|
will not be automatically redelivered by rollback; if that is required an explicit recover |
|
1427
|
|
|
|
|
|
|
call should be issued. |
|
1428
|
|
|
|
|
|
|
|
|
1429
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
1430
|
|
|
|
|
|
|
|
|
1431
|
|
|
|
|
|
|
L, L, L |
|
1432
|
|
|
|
|
|
|
|
|
1433
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
|
1434
|
|
|
|
|
|
|
|
|
1435
|
|
|
|
|
|
|
Copyright (C) 2015-2017, Sebastian Podjasek and others |
|
1436
|
|
|
|
|
|
|
|
|
1437
|
|
|
|
|
|
|
Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >> |
|
1438
|
|
|
|
|
|
|
|
|
1439
|
|
|
|
|
|
|
This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0. |
|
1440
|
|
|
|
|
|
|
|
|
1441
|
|
|
|
|
|
|
=cut |