line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Mojo::RabbitMQ::Client; |
2
|
5
|
|
|
5
|
|
169549
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
5
|
|
|
|
|
802118
|
|
|
5
|
|
|
|
|
46
|
|
3
|
|
|
|
|
|
|
|
4
|
5
|
|
|
5
|
|
8619
|
use Carp qw(croak confess); |
|
5
|
|
|
|
|
14
|
|
|
5
|
|
|
|
|
305
|
|
5
|
5
|
|
|
5
|
|
2336
|
use Mojo::URL; |
|
5
|
|
|
|
|
37751
|
|
|
5
|
|
|
|
|
40
|
|
6
|
5
|
|
|
5
|
|
2384
|
use Mojo::Home; |
|
5
|
|
|
|
|
155982
|
|
|
5
|
|
|
|
|
265
|
|
7
|
5
|
|
|
5
|
|
2743
|
use Mojo::IOLoop; |
|
5
|
|
|
|
|
556511
|
|
|
5
|
|
|
|
|
37
|
|
8
|
5
|
|
|
5
|
|
292
|
use Mojo::Parameters; |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
53
|
|
9
|
5
|
|
|
5
|
|
159
|
use Mojo::Promise; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
48
|
|
10
|
5
|
|
|
5
|
|
176
|
use Mojo::Util qw(url_unescape dumper); |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
272
|
|
11
|
5
|
|
|
5
|
|
39
|
use List::Util qw(none); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
276
|
|
12
|
5
|
|
|
5
|
|
74
|
use Scalar::Util qw(blessed weaken); |
|
5
|
|
|
|
|
12
|
|
|
5
|
|
|
|
|
223
|
|
13
|
5
|
|
|
5
|
|
33
|
use File::Basename 'dirname'; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
318
|
|
14
|
5
|
|
|
5
|
|
2551
|
use File::ShareDir qw(dist_file); |
|
5
|
|
|
|
|
119289
|
|
|
5
|
|
|
|
|
292
|
|
15
|
|
|
|
|
|
|
|
16
|
5
|
|
|
5
|
|
2246
|
use Net::AMQP; |
|
5
|
|
|
|
|
286856
|
|
|
5
|
|
|
|
|
189
|
|
17
|
5
|
|
|
5
|
|
46
|
use Net::AMQP::Common qw(:all); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
1114
|
|
18
|
|
|
|
|
|
|
|
19
|
5
|
|
|
5
|
|
3543
|
use Mojo::RabbitMQ::Client::Channel; |
|
5
|
|
|
|
|
19
|
|
|
5
|
|
|
|
|
43
|
|
20
|
5
|
|
|
5
|
|
240
|
use Mojo::RabbitMQ::Client::LocalQueue; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
30
|
|
21
|
|
|
|
|
|
|
require Mojo::RabbitMQ::Client::Consumer; |
22
|
|
|
|
|
|
|
require Mojo::RabbitMQ::Client::Publisher; |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
our $VERSION = "0.2.4"; |
25
|
|
|
|
|
|
|
|
26
|
5
|
|
50
|
5
|
|
403
|
use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
22730
|
|
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
has is_open => 0; |
29
|
|
|
|
|
|
|
has url => undef; |
30
|
|
|
|
|
|
|
has tls => sub { shift->_uri_handler('tls') }; |
31
|
|
|
|
|
|
|
has user => sub { shift->_uri_handler('user') }; |
32
|
|
|
|
|
|
|
has pass => sub { shift->_uri_handler('pass') }; |
33
|
|
|
|
|
|
|
has host => sub { shift->_uri_handler('host') }; |
34
|
|
|
|
|
|
|
has port => sub { shift->_uri_handler('port') }; |
35
|
|
|
|
|
|
|
has vhost => sub { shift->_uri_handler('vhost') }; |
36
|
|
|
|
|
|
|
has params => sub { shift->_uri_handler('params') // Mojo::Parameters->new }; |
37
|
|
|
|
|
|
|
has connect_timeout => sub { $ENV{MOJO_CONNECT_TIMEOUT} // 10 }; |
38
|
|
|
|
|
|
|
has heartbeat_timeout => 60; |
39
|
|
|
|
|
|
|
has heartbeat_received => 0; # When did we receive last heartbeat |
40
|
|
|
|
|
|
|
has heartbeat_sent => 0; # When did we sent last heartbeat |
41
|
|
|
|
|
|
|
has ioloop => sub { Mojo::IOLoop->singleton }; |
42
|
|
|
|
|
|
|
has max_buffer_size => 16384; |
43
|
|
|
|
|
|
|
has max_channels => 0; |
44
|
|
|
|
|
|
|
has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
45
|
|
|
|
|
|
|
has channels => sub { {} }; |
46
|
|
|
|
|
|
|
has stream_id => undef; |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub connect { |
49
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
50
|
0
|
|
|
|
|
0
|
$self->{buffer} = ''; |
51
|
|
|
|
|
|
|
|
52
|
0
|
|
|
|
|
0
|
my $id; |
53
|
0
|
|
|
0
|
|
0
|
$id = $self->_connect(sub { $self->_connected($id) }); |
|
0
|
|
|
|
|
0
|
|
54
|
0
|
|
|
|
|
0
|
$self->stream_id($id); |
55
|
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
0
|
return $id; |
57
|
|
|
|
|
|
|
} |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
sub connect_p { |
60
|
0
|
|
|
0
|
0
|
0
|
my $self = shift; |
61
|
0
|
|
|
|
|
0
|
my $promise = Mojo::Promise->new; |
62
|
|
|
|
|
|
|
|
63
|
0
|
|
|
|
|
0
|
my $id; |
64
|
|
|
|
|
|
|
|
65
|
0
|
|
|
|
|
0
|
weaken $self; |
66
|
|
|
|
|
|
|
my $handler = sub { |
67
|
0
|
|
|
0
|
|
0
|
my ($err) = @_; |
68
|
0
|
0
|
|
|
|
0
|
if (defined $err) { |
69
|
0
|
|
|
|
|
0
|
return $promise->reject($err); |
70
|
|
|
|
|
|
|
} |
71
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
0
|
return $promise->resolve($self); |
73
|
0
|
|
|
|
|
0
|
}; |
74
|
|
|
|
|
|
|
|
75
|
0
|
|
|
0
|
|
0
|
$id = $self->_connect(sub { $self->_connected($id, $handler) }); |
|
0
|
|
|
|
|
0
|
|
76
|
0
|
|
|
|
|
0
|
$self->stream_id($id); |
77
|
|
|
|
|
|
|
|
78
|
0
|
|
|
|
|
0
|
return $promise; |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sub consumer { |
82
|
2
|
|
|
2
|
1
|
1420
|
my ($class, @params) = @_; |
83
|
2
|
100
|
|
|
|
21
|
croak "consumer is a static method" if ref $class; |
84
|
|
|
|
|
|
|
|
85
|
1
|
|
|
|
|
11
|
return Mojo::RabbitMQ::Client::Consumer->new(@params); |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
sub publisher { |
89
|
2
|
|
|
2
|
1
|
1386
|
my ($class, @params) = @_; |
90
|
2
|
100
|
|
|
|
15
|
croak "publisher is a static method" if ref $class; |
91
|
|
|
|
|
|
|
|
92
|
1
|
|
|
|
|
7
|
return Mojo::RabbitMQ::Client::Publisher->new(@params); |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub param { |
96
|
6
|
|
|
6
|
1
|
11989
|
my $self = shift; |
97
|
6
|
50
|
|
|
|
19
|
return undef unless defined $self->params; |
98
|
6
|
|
|
|
|
63
|
return $self->params->param(@_); |
99
|
|
|
|
|
|
|
} |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
sub add_channel { |
102
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
103
|
0
|
|
|
|
|
0
|
my $channel = shift; |
104
|
|
|
|
|
|
|
|
105
|
0
|
|
|
|
|
0
|
my $id = $channel->id; |
106
|
0
|
0
|
0
|
|
|
0
|
if ($id and $self->channels->{$id}) { |
107
|
0
|
|
|
|
|
0
|
return $channel->emit( |
108
|
|
|
|
|
|
|
error => 'Channel with id: ' . $id . ' already defined'); |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
0
|
0
|
0
|
|
|
0
|
if ($self->max_channels > 0 |
112
|
0
|
|
|
|
|
0
|
and scalar keys %{$self->channels} >= $self->max_channels) |
113
|
|
|
|
|
|
|
{ |
114
|
0
|
|
|
|
|
0
|
return $channel->emit(error => 'Maximum number of channels reached'); |
115
|
|
|
|
|
|
|
} |
116
|
|
|
|
|
|
|
|
117
|
0
|
0
|
|
|
|
0
|
if (not $id) { |
118
|
0
|
|
|
|
|
0
|
for my $candidate_id (1 .. (2**16 - 1)) { |
119
|
0
|
0
|
|
|
|
0
|
next if defined $self->channels->{$candidate_id}; |
120
|
0
|
|
|
|
|
0
|
$id = $candidate_id; |
121
|
0
|
|
|
|
|
0
|
last; |
122
|
|
|
|
|
|
|
} |
123
|
0
|
0
|
|
|
|
0
|
unless ($id) { |
124
|
0
|
|
|
|
|
0
|
return $channel->emit(error => 'Ran out of channel ids'); |
125
|
|
|
|
|
|
|
} |
126
|
|
|
|
|
|
|
} |
127
|
|
|
|
|
|
|
|
128
|
0
|
|
|
|
|
0
|
$self->channels->{$id} = $channel->id($id)->client($self); |
129
|
0
|
|
|
|
|
0
|
weaken $channel->{client}; |
130
|
|
|
|
|
|
|
|
131
|
0
|
|
|
|
|
0
|
return $channel; |
132
|
|
|
|
|
|
|
} |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
sub acquire_channel_p { |
135
|
0
|
|
|
0
|
0
|
0
|
my $self = shift; |
136
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
0
|
my $promise = Mojo::Promise->new; |
138
|
|
|
|
|
|
|
|
139
|
0
|
|
|
|
|
0
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
140
|
0
|
|
|
0
|
|
0
|
$channel->catch(sub { $promise->reject(@_); undef $promise }); |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
141
|
0
|
|
|
0
|
|
0
|
$channel->on(close => sub { warn "Channel closed" }); |
|
0
|
|
|
|
|
0
|
|
142
|
0
|
|
|
0
|
|
0
|
$channel->on(open => sub { $promise->resolve(@_); undef $promise }); |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
143
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
0
|
$self->open_channel($channel); |
145
|
|
|
|
|
|
|
|
146
|
0
|
|
|
|
|
0
|
return $promise; |
147
|
|
|
|
|
|
|
} |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
sub open_channel { |
150
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
151
|
0
|
|
|
|
|
0
|
my $channel = shift; |
152
|
|
|
|
|
|
|
|
153
|
0
|
0
|
|
|
|
0
|
return $channel->emit(error => 'Client connection not opened') |
154
|
|
|
|
|
|
|
unless $self->is_open; |
155
|
|
|
|
|
|
|
|
156
|
0
|
|
|
|
|
0
|
$self->add_channel($channel)->open; |
157
|
|
|
|
|
|
|
|
158
|
0
|
|
|
|
|
0
|
return $self; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
sub delete_channel { |
162
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
163
|
0
|
|
|
|
|
0
|
return delete $self->channels->{shift}; |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
sub close { |
167
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
168
|
|
|
|
|
|
|
|
169
|
0
|
|
|
|
|
0
|
weaken $self; |
170
|
|
|
|
|
|
|
$self->_write_expect( |
171
|
|
|
|
|
|
|
'Connection::Close' => {}, |
172
|
|
|
|
|
|
|
'Connection::CloseOk' => sub { |
173
|
0
|
|
|
0
|
|
0
|
warn "-- Connection::CloseOk\n" if DEBUG; |
174
|
0
|
|
|
|
|
0
|
$self->emit('close'); |
175
|
0
|
|
|
|
|
0
|
$self->_close; |
176
|
|
|
|
|
|
|
}, |
177
|
|
|
|
|
|
|
sub { |
178
|
0
|
|
|
0
|
|
0
|
$self->_close; |
179
|
|
|
|
|
|
|
} |
180
|
0
|
|
|
|
|
0
|
); |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
0
|
|
|
0
|
|
0
|
sub _loop { $_[0]->ioloop } |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub _error { |
186
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $err) = @_; |
187
|
|
|
|
|
|
|
|
188
|
0
|
|
|
|
|
0
|
$self->emit(error => $err); |
189
|
|
|
|
|
|
|
} |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
sub _uri_handler { |
192
|
20
|
|
|
20
|
|
37
|
my $self = shift; |
193
|
20
|
|
|
|
|
35
|
my $attr = shift; |
194
|
|
|
|
|
|
|
|
195
|
20
|
100
|
|
|
|
49
|
return undef unless defined $self->url; |
196
|
|
|
|
|
|
|
|
197
|
18
|
50
|
33
|
|
|
107
|
$self->url(Mojo::URL->new($self->url)) |
198
|
|
|
|
|
|
|
unless blessed $self->url && $self->url->isa('Mojo::URL'); |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
# Set some defaults |
201
|
18
|
|
|
|
|
3482
|
my %defaults = ( |
202
|
|
|
|
|
|
|
tls => 0, |
203
|
|
|
|
|
|
|
user => undef, |
204
|
|
|
|
|
|
|
pass => undef, |
205
|
|
|
|
|
|
|
host => 'localhost', |
206
|
|
|
|
|
|
|
port => 5672, |
207
|
|
|
|
|
|
|
vhost => '/', |
208
|
|
|
|
|
|
|
params => undef |
209
|
|
|
|
|
|
|
); |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
# Check secure scheme in url |
212
|
18
|
100
|
|
|
|
41
|
$defaults{tls} = 1 |
213
|
|
|
|
|
|
|
if $self->url->scheme |
214
|
|
|
|
|
|
|
=~ /^(amqp|rabbitmq)s$/; # Fallback support for rabbitmq scheme name |
215
|
18
|
100
|
|
|
|
159
|
$defaults{port} = 5671 if $defaults{tls}; |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
# Get host & port |
218
|
18
|
100
|
66
|
|
|
139
|
$defaults{host} = $self->url->host |
219
|
|
|
|
|
|
|
if defined $self->url->host && $self->url->host ne ''; |
220
|
18
|
100
|
|
|
|
315
|
$defaults{port} = $self->url->port if defined $self->url->port; |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
# Get user & password |
223
|
18
|
|
|
|
|
137
|
my $userinfo = $self->url->userinfo; |
224
|
18
|
100
|
|
|
|
121
|
if (defined $userinfo) { |
225
|
5
|
|
|
|
|
24
|
my ($user, $pass) = split /:/, $userinfo; |
226
|
5
|
|
|
|
|
11
|
$defaults{user} = $user; |
227
|
5
|
|
|
|
|
10
|
$defaults{pass} = $pass; |
228
|
|
|
|
|
|
|
} |
229
|
|
|
|
|
|
|
|
230
|
18
|
|
|
|
|
36
|
my $vhost = url_unescape $self->url->path; |
231
|
18
|
|
|
|
|
1770
|
$vhost =~ s|^/(.+)$|$1|; |
232
|
18
|
100
|
66
|
|
|
808
|
$defaults{vhost} = $vhost if defined $vhost && $vhost ne ''; |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
# Query params |
235
|
18
|
|
|
|
|
866
|
my $params = $defaults{params} = $self->url->query; |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
# Handle common aliases to internal names |
238
|
18
|
|
|
|
|
393
|
my %aliases = ( |
239
|
|
|
|
|
|
|
cacertfile => 'ca', |
240
|
|
|
|
|
|
|
certfile => 'cert', |
241
|
|
|
|
|
|
|
keyfile => 'key', |
242
|
|
|
|
|
|
|
fail_if_no_peer_cert => 'verify', |
243
|
|
|
|
|
|
|
connection_timeout => 'timeout' |
244
|
|
|
|
|
|
|
); |
245
|
|
|
|
|
|
|
$params->param($aliases{$_}, $params->param($_)) |
246
|
18
|
|
|
|
|
59
|
foreach grep { defined $params->param($_) } keys %aliases; |
|
90
|
|
|
|
|
2114
|
|
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
# Some query parameters are translated to attribute values |
249
|
18
|
|
|
|
|
755
|
my %attributes = ( |
250
|
|
|
|
|
|
|
heartbeat_timeout => 'heartbeat', |
251
|
|
|
|
|
|
|
connect_timeout => 'timeout', |
252
|
|
|
|
|
|
|
max_channels => 'channel_max' |
253
|
|
|
|
|
|
|
); |
254
|
|
|
|
|
|
|
$self->$_($params->param($attributes{$_})) |
255
|
18
|
|
|
|
|
48
|
foreach grep { defined $params->param($attributes{$_}) } keys %attributes; |
|
54
|
|
|
|
|
682
|
|
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
# Set all |
258
|
18
|
|
|
|
|
499
|
$self->$_($defaults{$_}) foreach keys %defaults; |
259
|
|
|
|
|
|
|
|
260
|
18
|
|
|
|
|
702
|
return $self->$attr; |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
sub _close { |
264
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
265
|
0
|
|
|
|
|
0
|
$self->_loop->stream($self->stream_id)->close_gracefully; |
266
|
|
|
|
|
|
|
} |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
sub _handle { |
269
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $close) = @_; |
270
|
|
|
|
|
|
|
|
271
|
0
|
|
|
|
|
0
|
$self->emit('disconnect'); |
272
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
0
|
$self->_loop->remove($id); |
274
|
|
|
|
|
|
|
} |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
sub _read { |
277
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $chunk) = @_; |
278
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
0
|
warn "<- @{[dumper $chunk]}" if DEBUG; |
280
|
0
|
|
|
|
|
0
|
$self->{buffer} .= $chunk; |
281
|
0
|
|
|
|
|
0
|
$self->_parse_frames; |
282
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
0
|
return; |
284
|
|
|
|
|
|
|
} |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
sub _parse_frames { |
287
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
288
|
|
|
|
|
|
|
|
289
|
0
|
|
|
|
|
0
|
for my $frame (Net::AMQP->parse_raw_frames(\$self->{buffer})) { |
290
|
|
|
|
|
|
|
|
291
|
0
|
0
|
0
|
|
|
0
|
if ($frame->isa('Net::AMQP::Frame::Heartbeat')) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
292
|
0
|
|
|
|
|
0
|
$self->heartbeat_received(time()); |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
elsif ($frame->isa('Net::AMQP::Frame::Method') |
295
|
|
|
|
|
|
|
and $frame->method_frame->isa('Net::AMQP::Protocol::Connection::Close')) |
296
|
|
|
|
|
|
|
{ |
297
|
0
|
|
|
|
|
0
|
$self->is_open(0); |
298
|
|
|
|
|
|
|
|
299
|
0
|
|
|
|
|
0
|
$self->_write_frame(Net::AMQP::Protocol::Connection::CloseOk->new()); |
300
|
|
|
|
|
|
|
$self->emit(disconnect => "Server side disconnection: " |
301
|
0
|
|
|
|
|
0
|
. $frame->method_frame->{reply_text}); |
302
|
|
|
|
|
|
|
} |
303
|
|
|
|
|
|
|
elsif ($frame->channel == 0) { |
304
|
0
|
|
|
|
|
0
|
$self->queue->push($frame); |
305
|
|
|
|
|
|
|
} |
306
|
|
|
|
|
|
|
else { |
307
|
0
|
|
|
|
|
0
|
my $channel = $self->channels->{$frame->channel}; |
308
|
0
|
0
|
|
|
|
0
|
if (defined $channel) { |
309
|
0
|
|
|
|
|
0
|
$channel->_push_queue_or_consume($frame); |
310
|
|
|
|
|
|
|
} |
311
|
|
|
|
|
|
|
else { |
312
|
0
|
|
0
|
|
|
0
|
$self->emit( |
313
|
|
|
|
|
|
|
error => "Unknown channel id received: " |
314
|
|
|
|
|
|
|
. ($frame->channel // '(undef)'), |
315
|
|
|
|
|
|
|
$frame |
316
|
|
|
|
|
|
|
); |
317
|
|
|
|
|
|
|
} |
318
|
|
|
|
|
|
|
} |
319
|
|
|
|
|
|
|
} |
320
|
|
|
|
|
|
|
} |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
sub _connect { |
323
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
# Options |
326
|
|
|
|
|
|
|
# Parse according to (https://www.rabbitmq.com/uri-spec.html) |
327
|
0
|
|
|
|
|
0
|
my $options = { |
328
|
|
|
|
|
|
|
address => $self->host, |
329
|
|
|
|
|
|
|
port => $self->port, |
330
|
|
|
|
|
|
|
timeout => $self->connect_timeout, |
331
|
|
|
|
|
|
|
tls => $self->tls, |
332
|
|
|
|
|
|
|
tls_ca => scalar $self->param('ca'), |
333
|
|
|
|
|
|
|
tls_cert => scalar $self->param('cert'), |
334
|
|
|
|
|
|
|
tls_key => scalar $self->param('key') |
335
|
|
|
|
|
|
|
}; |
336
|
0
|
|
|
|
|
0
|
my $verify = $self->param('verify'); |
337
|
0
|
0
|
|
|
|
0
|
$options->{tls_verify} = hex $verify if defined $verify; |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
# Connect |
340
|
0
|
|
|
|
|
0
|
weaken $self; |
341
|
0
|
|
|
|
|
0
|
my $id; |
342
|
|
|
|
|
|
|
return $id = $self->_loop->client( |
343
|
|
|
|
|
|
|
$options => sub { |
344
|
0
|
|
|
0
|
|
0
|
my ($loop, $err, $stream) = @_; |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
# Connection error |
347
|
0
|
0
|
|
|
|
0
|
return unless $self; |
348
|
0
|
0
|
|
|
|
0
|
return $self->_error($id, $err) if $err; |
349
|
|
|
|
|
|
|
|
350
|
0
|
|
|
|
|
0
|
$self->emit(connect => $stream); |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
# Connection established |
353
|
0
|
|
|
|
|
0
|
$stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') }); |
|
0
|
|
|
|
|
0
|
|
354
|
0
|
0
|
|
|
|
0
|
$stream->on(close => sub { $self && $self->_handle($id, 1) }); |
|
0
|
|
|
|
|
0
|
|
355
|
0
|
0
|
|
|
|
0
|
$stream->on(error => sub { $self && $self->_error($id, pop) }); |
|
0
|
|
|
|
|
0
|
|
356
|
0
|
0
|
|
|
|
0
|
$stream->on(read => sub { $self && $self->_read($id, pop) }); |
|
0
|
|
|
|
|
0
|
|
357
|
0
|
|
|
|
|
0
|
$cb->(); |
358
|
|
|
|
|
|
|
} |
359
|
0
|
|
|
|
|
0
|
); |
360
|
|
|
|
|
|
|
} |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
sub _connected { |
363
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $cb) = @_; |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
# Inactivity timeout |
366
|
0
|
|
|
|
|
0
|
my $stream = $self->_loop->stream($id)->timeout(0); |
367
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
# Store connection information in transaction |
369
|
0
|
|
|
|
|
0
|
my $handle = $stream->handle; |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# Detect that xml spec was already loaded |
372
|
0
|
|
|
|
|
0
|
my $loaded = eval { Net::AMQP::Protocol::Connection::StartOk->new; 1 }; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
373
|
0
|
0
|
|
|
|
0
|
unless ($loaded) { # Load AMQP specs |
374
|
0
|
|
|
|
|
0
|
my $file = "amqp0-9-1.stripped.extended.xml"; |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
# Original spec is in "fixed_amqp0-8.xml" |
377
|
0
|
|
|
|
|
0
|
my $share = dist_file('Mojo-RabbitMQ-Client', $file); |
378
|
0
|
|
|
|
|
0
|
Net::AMQP::Protocol->load_xml_spec($share); |
379
|
|
|
|
|
|
|
} |
380
|
|
|
|
|
|
|
|
381
|
0
|
|
|
|
|
0
|
$self->_write($id => Net::AMQP::Protocol->header); |
382
|
|
|
|
|
|
|
|
383
|
0
|
|
|
|
|
0
|
weaken $self; |
384
|
|
|
|
|
|
|
$self->_expect( |
385
|
|
|
|
|
|
|
'Connection::Start' => sub { |
386
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
387
|
|
|
|
|
|
|
|
388
|
0
|
|
|
|
|
0
|
my @mechanisms = split /\s/, $frame->method_frame->mechanisms; |
389
|
|
|
|
|
|
|
return $self->emit(error => 'AMQPLAIN is not found in mechanisms') |
390
|
0
|
0
|
|
|
|
0
|
if none { $_ eq 'AMQPLAIN' } @mechanisms; |
|
0
|
|
|
|
|
0
|
|
391
|
|
|
|
|
|
|
|
392
|
0
|
|
|
|
|
0
|
my @locales = split /\s/, $frame->method_frame->locales; |
393
|
|
|
|
|
|
|
return $self->emit(error => 'en_US is not found in locales') |
394
|
0
|
0
|
|
|
|
0
|
if none { $_ eq 'en_US' } @locales; |
|
0
|
|
|
|
|
0
|
|
395
|
|
|
|
|
|
|
|
396
|
0
|
|
|
|
|
0
|
$self->{_server_properties} = $frame->method_frame->server_properties; |
397
|
|
|
|
|
|
|
|
398
|
0
|
|
|
|
|
0
|
warn "-- Connection::Start {product: " . $self->{_server_properties}->{product} . ", version: " . $self->{_server_properties}->{version} . "}\n" if DEBUG; |
399
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
400
|
|
|
|
|
|
|
Net::AMQP::Protocol::Connection::StartOk->new( |
401
|
|
|
|
|
|
|
client_properties => { |
402
|
|
|
|
|
|
|
platform => 'Perl', |
403
|
|
|
|
|
|
|
product => __PACKAGE__, |
404
|
|
|
|
|
|
|
information => 'https://github.com/inway/mojo-rabbitmq-client', |
405
|
|
|
|
|
|
|
version => __PACKAGE__->VERSION, |
406
|
|
|
|
|
|
|
}, |
407
|
|
|
|
|
|
|
mechanism => 'AMQPLAIN', |
408
|
|
|
|
|
|
|
response => {LOGIN => $self->user, PASSWORD => $self->pass}, |
409
|
|
|
|
|
|
|
locale => 'en_US', |
410
|
|
|
|
|
|
|
), |
411
|
|
|
|
|
|
|
); |
412
|
|
|
|
|
|
|
|
413
|
0
|
|
|
|
|
0
|
$self->_tune($id, $cb); |
414
|
|
|
|
|
|
|
}, |
415
|
|
|
|
|
|
|
sub { |
416
|
0
|
|
|
0
|
|
0
|
$self->emit(error => 'Unable to start connection: ' . shift); |
417
|
|
|
|
|
|
|
} |
418
|
0
|
|
|
|
|
0
|
); |
419
|
|
|
|
|
|
|
} |
420
|
|
|
|
|
|
|
|
421
|
|
|
|
|
|
|
sub _tune { |
422
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $cb) = @_; |
423
|
|
|
|
|
|
|
|
424
|
0
|
|
|
|
|
0
|
weaken $self; |
425
|
|
|
|
|
|
|
$self->_expect( |
426
|
|
|
|
|
|
|
'Connection::Tune' => sub { |
427
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
428
|
|
|
|
|
|
|
|
429
|
0
|
|
|
|
|
0
|
my $method_frame = $frame->method_frame; |
430
|
0
|
|
|
|
|
0
|
$self->max_buffer_size($method_frame->frame_max); |
431
|
|
|
|
|
|
|
|
432
|
0
|
|
0
|
|
|
0
|
my $heartbeat = $self->heartbeat_timeout || $method_frame->heartbeat; |
433
|
|
|
|
|
|
|
|
434
|
0
|
|
|
|
|
0
|
warn "-- Connection::Tune {frame_max: " . $method_frame->frame_max . ", heartbeat: " . $method_frame->heartbeat . "}\n" if DEBUG; |
435
|
|
|
|
|
|
|
# Confirm |
436
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
437
|
|
|
|
|
|
|
Net::AMQP::Protocol::Connection::TuneOk->new( |
438
|
|
|
|
|
|
|
channel_max => $method_frame->channel_max, |
439
|
|
|
|
|
|
|
frame_max => $method_frame->frame_max, |
440
|
|
|
|
|
|
|
heartbeat => $heartbeat, |
441
|
|
|
|
|
|
|
), |
442
|
|
|
|
|
|
|
); |
443
|
|
|
|
|
|
|
|
444
|
|
|
|
|
|
|
# According to https://www.rabbitmq.com/amqp-0-9-1-errata.html |
445
|
|
|
|
|
|
|
# The client should start sending heartbeats after receiving a Connection.Tune |
446
|
|
|
|
|
|
|
# method, and start monitoring heartbeats after sending Connection.Open. |
447
|
|
|
|
|
|
|
# -and- |
448
|
|
|
|
|
|
|
# Heartbeat frames are sent about every timeout / 2 seconds. After two missed |
449
|
|
|
|
|
|
|
# heartbeats, the peer is considered to be unreachable. |
450
|
|
|
|
|
|
|
$self->{heartbeat_tid} = $self->_loop->recurring( |
451
|
|
|
|
|
|
|
$heartbeat / 2 => sub { |
452
|
0
|
0
|
|
|
|
0
|
return unless time() - $self->heartbeat_sent > $heartbeat / 2; |
453
|
0
|
|
|
|
|
0
|
$self->_write_frame(Net::AMQP::Frame::Heartbeat->new()); |
454
|
0
|
|
|
|
|
0
|
$self->heartbeat_sent(time()); |
455
|
|
|
|
|
|
|
} |
456
|
0
|
0
|
|
|
|
0
|
) if $heartbeat; |
457
|
|
|
|
|
|
|
|
458
|
|
|
|
|
|
|
$self->_write_expect( |
459
|
|
|
|
|
|
|
'Connection::Open' => |
460
|
|
|
|
|
|
|
{virtual_host => $self->vhost, capabilities => '', insist => 1,}, |
461
|
|
|
|
|
|
|
'Connection::OpenOk' => sub { |
462
|
0
|
|
|
|
|
0
|
warn "-- Connection::OpenOk\n" if DEBUG; |
463
|
|
|
|
|
|
|
|
464
|
0
|
|
|
|
|
0
|
$self->is_open(1); |
465
|
0
|
|
|
|
|
0
|
$self->emit('open'); |
466
|
0
|
0
|
|
|
|
0
|
$cb->() if defined $cb; |
467
|
|
|
|
|
|
|
}, |
468
|
|
|
|
|
|
|
sub { |
469
|
0
|
|
|
|
|
0
|
my $err = shift; |
470
|
0
|
|
|
|
|
0
|
$self->emit(error => 'Unable to open connection: ' . $err); |
471
|
0
|
0
|
|
|
|
0
|
$cb->($err) if defined $cb; |
472
|
|
|
|
|
|
|
} |
473
|
0
|
|
|
|
|
0
|
); |
474
|
|
|
|
|
|
|
}, |
475
|
|
|
|
|
|
|
sub { |
476
|
0
|
|
|
0
|
|
0
|
$self->emit(error => 'Unable to tune connection: ' . shift); |
477
|
|
|
|
|
|
|
} |
478
|
0
|
|
|
|
|
0
|
); |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
sub _write_expect { |
482
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
483
|
0
|
|
|
|
|
0
|
my ($method, $args, $exp, $cb, $failure_cb, $channel_id) = @_; |
484
|
0
|
|
|
|
|
0
|
$method = 'Net::AMQP::Protocol::' . $method; |
485
|
|
|
|
|
|
|
|
486
|
0
|
|
0
|
|
|
0
|
$channel_id ||= 0; |
487
|
|
|
|
|
|
|
|
488
|
0
|
|
|
|
|
0
|
my $method_frame = Net::AMQP::Frame::Method->new( |
489
|
|
|
|
|
|
|
method_frame => $method->new(%$args) |
490
|
|
|
|
|
|
|
); |
491
|
|
|
|
|
|
|
|
492
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
493
|
|
|
|
|
|
|
$method_frame, |
494
|
|
|
|
|
|
|
$channel_id |
495
|
|
|
|
|
|
|
); |
496
|
|
|
|
|
|
|
|
497
|
0
|
|
|
|
|
0
|
return $self->_expect($exp, $cb, $failure_cb, $channel_id); |
498
|
|
|
|
|
|
|
} |
499
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
sub _expect { |
501
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
502
|
0
|
|
|
|
|
0
|
my ($exp, $cb, $failure_cb, $channel_id) = @_; |
503
|
0
|
0
|
|
|
|
0
|
my @expected = ref($exp) eq 'ARRAY' ? @$exp : ($exp); |
504
|
|
|
|
|
|
|
|
505
|
0
|
|
0
|
|
|
0
|
$channel_id ||= 0; |
506
|
|
|
|
|
|
|
|
507
|
0
|
|
|
|
|
0
|
my $queue; |
508
|
0
|
0
|
|
|
|
0
|
if (!$channel_id) { |
509
|
0
|
|
|
|
|
0
|
$queue = $self->queue; |
510
|
|
|
|
|
|
|
} |
511
|
|
|
|
|
|
|
else { |
512
|
0
|
|
|
|
|
0
|
my $channel = $self->channels->{$channel_id}; |
513
|
0
|
0
|
|
|
|
0
|
if (defined $channel) { |
514
|
0
|
|
|
|
|
0
|
$queue = $channel->queue; |
515
|
|
|
|
|
|
|
} |
516
|
|
|
|
|
|
|
else { |
517
|
0
|
|
0
|
|
|
0
|
$failure_cb->( |
518
|
|
|
|
|
|
|
"Unknown channel id received: " . ($channel_id // '(undef)')); |
519
|
|
|
|
|
|
|
} |
520
|
|
|
|
|
|
|
} |
521
|
|
|
|
|
|
|
|
522
|
0
|
0
|
|
|
|
0
|
return unless $queue; |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
$queue->get( |
525
|
|
|
|
|
|
|
sub { |
526
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
527
|
|
|
|
|
|
|
|
528
|
0
|
0
|
|
|
|
0
|
return $failure_cb->("Received data is not method frame") |
529
|
|
|
|
|
|
|
if not $frame->isa("Net::AMQP::Frame::Method"); |
530
|
|
|
|
|
|
|
|
531
|
0
|
|
|
|
|
0
|
my $method_frame = $frame->method_frame; |
532
|
0
|
|
|
|
|
0
|
for my $exp (@expected) { |
533
|
0
|
0
|
|
|
|
0
|
return $cb->($frame) |
534
|
|
|
|
|
|
|
if $method_frame->isa("Net::AMQP::Protocol::" . $exp); |
535
|
|
|
|
|
|
|
} |
536
|
|
|
|
|
|
|
|
537
|
0
|
|
|
|
|
0
|
$failure_cb->("Method is not " |
538
|
|
|
|
|
|
|
. join(', ', @expected) |
539
|
|
|
|
|
|
|
. ". It's " |
540
|
|
|
|
|
|
|
. ref($method_frame)); |
541
|
|
|
|
|
|
|
} |
542
|
0
|
|
|
|
|
0
|
); |
543
|
|
|
|
|
|
|
} |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
sub _write_frame { |
546
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
547
|
0
|
|
|
|
|
0
|
my $id = $self->stream_id; |
548
|
0
|
|
|
|
|
0
|
my ($out, $channel, $cb) = @_; |
549
|
|
|
|
|
|
|
|
550
|
0
|
0
|
|
|
|
0
|
if ($out->isa('Net::AMQP::Protocol::Base')) { |
551
|
0
|
|
|
|
|
0
|
$out = $out->frame_wrap; |
552
|
|
|
|
|
|
|
} |
553
|
0
|
|
0
|
|
|
0
|
$out->channel($channel // 0); |
554
|
|
|
|
|
|
|
|
555
|
0
|
|
|
|
|
0
|
return $self->_write($id, $out->to_raw_frame, $cb); |
556
|
|
|
|
|
|
|
} |
557
|
|
|
|
|
|
|
|
558
|
|
|
|
|
|
|
sub _write { |
559
|
0
|
|
|
0
|
|
0
|
my $self = shift @_; |
560
|
0
|
|
|
|
|
0
|
my $id = shift @_; |
561
|
0
|
|
|
|
|
0
|
my $frame = shift @_; |
562
|
0
|
|
|
|
|
0
|
my $cb = shift @_; |
563
|
|
|
|
|
|
|
|
564
|
0
|
|
|
|
|
0
|
warn "-> @{[dumper $frame]}" if DEBUG; |
565
|
|
|
|
|
|
|
|
566
|
0
|
|
|
|
|
0
|
utf8::downgrade($frame); |
567
|
0
|
0
|
|
|
|
0
|
$self->_loop->stream($id)->write($frame => $cb) |
568
|
|
|
|
|
|
|
if defined $self->_loop->stream($id); |
569
|
|
|
|
|
|
|
} |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
sub DESTROY { |
572
|
20
|
|
|
20
|
|
52194
|
my $self = shift; |
573
|
20
|
50
|
|
|
|
67
|
my $ioloop = $self->ioloop or return; |
574
|
20
|
|
|
|
|
137
|
my $heartbeat_tid = $self->{heartbeat_tid}; |
575
|
|
|
|
|
|
|
|
576
|
20
|
50
|
|
|
|
302
|
$ioloop->remove($heartbeat_tid) if $heartbeat_tid; |
577
|
|
|
|
|
|
|
} |
578
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
1; |
580
|
|
|
|
|
|
|
|
581
|
|
|
|
|
|
|
=encoding utf8 |
582
|
|
|
|
|
|
|
|
583
|
|
|
|
|
|
|
=head1 NAME |
584
|
|
|
|
|
|
|
|
585
|
|
|
|
|
|
|
Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client |
586
|
|
|
|
|
|
|
|
587
|
|
|
|
|
|
|
=head1 SYNOPSIS |
588
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
# Supply URL according to (https://www.rabbitmq.com/uri-spec.html) |
592
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->new( |
593
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/'); |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
# Catch all client related errors |
596
|
|
|
|
|
|
|
$client->catch(sub { warn "Some error caught in client"; }); |
597
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
# When connection is in Open state, open new channel |
599
|
|
|
|
|
|
|
$client->on( |
600
|
|
|
|
|
|
|
open => sub { |
601
|
|
|
|
|
|
|
my ($client) = @_; |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
# Create a new channel with auto-assigned id |
604
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
$channel->catch(sub { warn "Error on channel received"; }); |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
$channel->on( |
609
|
|
|
|
|
|
|
open => sub { |
610
|
|
|
|
|
|
|
my ($channel) = @_; |
611
|
|
|
|
|
|
|
$channel->qos(prefetch_count => 1)->deliver; |
612
|
|
|
|
|
|
|
|
613
|
|
|
|
|
|
|
# Publish some example message to test_queue |
614
|
|
|
|
|
|
|
my $publish = $channel->publish( |
615
|
|
|
|
|
|
|
exchange => 'test', |
616
|
|
|
|
|
|
|
routing_key => 'test_queue', |
617
|
|
|
|
|
|
|
body => 'Test message', |
618
|
|
|
|
|
|
|
mandatory => 0, |
619
|
|
|
|
|
|
|
immediate => 0, |
620
|
|
|
|
|
|
|
header => {} |
621
|
|
|
|
|
|
|
); |
622
|
|
|
|
|
|
|
# Deliver this message to server |
623
|
|
|
|
|
|
|
$publish->deliver; |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
# Start consuming messages from test_queue |
626
|
|
|
|
|
|
|
my $consumer = $channel->consume(queue => 'test_queue'); |
627
|
|
|
|
|
|
|
$consumer->on(message => sub { say "Got a message" }); |
628
|
|
|
|
|
|
|
$consumer->deliver; |
629
|
|
|
|
|
|
|
} |
630
|
|
|
|
|
|
|
); |
631
|
|
|
|
|
|
|
$channel->on(close => sub { $log->error('Channel closed') }); |
632
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
$client->open_channel($channel); |
634
|
|
|
|
|
|
|
} |
635
|
|
|
|
|
|
|
); |
636
|
|
|
|
|
|
|
|
637
|
|
|
|
|
|
|
# Start connection |
638
|
|
|
|
|
|
|
$client->connect(); |
639
|
|
|
|
|
|
|
|
640
|
|
|
|
|
|
|
# Start Mojo::IOLoop if not running already |
641
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
=head2 CONSUMER |
644
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
646
|
|
|
|
|
|
|
my $consumer = Mojo::RabbitMQ::Client->consumer( |
647
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo', |
648
|
|
|
|
|
|
|
defaults => { |
649
|
|
|
|
|
|
|
qos => {prefetch_count => 1}, |
650
|
|
|
|
|
|
|
queue => {durable => 1}, |
651
|
|
|
|
|
|
|
consumer => {no_ack => 0}, |
652
|
|
|
|
|
|
|
} |
653
|
|
|
|
|
|
|
); |
654
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
$consumer->catch(sub { die "Some error caught in Consumer" } ); |
656
|
|
|
|
|
|
|
$consumer->on('success' => sub { say "Consumer ready" }); |
657
|
|
|
|
|
|
|
$consumer->on( |
658
|
|
|
|
|
|
|
'message' => sub { |
659
|
|
|
|
|
|
|
my ($consumer, $message) = @_; |
660
|
|
|
|
|
|
|
|
661
|
|
|
|
|
|
|
$consumer->channel->ack($message)->deliver; |
662
|
|
|
|
|
|
|
} |
663
|
|
|
|
|
|
|
); |
664
|
|
|
|
|
|
|
$consumer->start(); |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
667
|
|
|
|
|
|
|
|
668
|
|
|
|
|
|
|
=head2 PUBLISHER |
669
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
671
|
|
|
|
|
|
|
my $publisher = Mojo::RabbitMQ::Client->publisher( |
672
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo' |
673
|
|
|
|
|
|
|
); |
674
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
$publisher->publish('plain text'); |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
$publisher->publish( |
678
|
|
|
|
|
|
|
{encode => { to => 'json'}}, |
679
|
|
|
|
|
|
|
routing_key => 'mojo_mq' |
680
|
|
|
|
|
|
|
)->then(sub { |
681
|
|
|
|
|
|
|
say "Message published"; |
682
|
|
|
|
|
|
|
})->catch(sub { |
683
|
|
|
|
|
|
|
die "Publishing failed" |
684
|
|
|
|
|
|
|
})->wait; |
685
|
|
|
|
|
|
|
|
686
|
|
|
|
|
|
|
=head1 DESCRIPTION |
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
L is a rewrite of L to work on top of L. |
689
|
|
|
|
|
|
|
|
690
|
|
|
|
|
|
|
=head1 EVENTS |
691
|
|
|
|
|
|
|
|
692
|
|
|
|
|
|
|
L inherits all events from L and can emit the |
693
|
|
|
|
|
|
|
following new ones. |
694
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
=head2 connect |
696
|
|
|
|
|
|
|
|
697
|
|
|
|
|
|
|
$client->on(connect => sub { |
698
|
|
|
|
|
|
|
my ($client, $stream) = @_; |
699
|
|
|
|
|
|
|
... |
700
|
|
|
|
|
|
|
}); |
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
Emitted when TCP/IP connection with RabbitMQ server is established. |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
=head2 open |
705
|
|
|
|
|
|
|
|
706
|
|
|
|
|
|
|
$client->on(open => sub { |
707
|
|
|
|
|
|
|
my ($client) = @_; |
708
|
|
|
|
|
|
|
... |
709
|
|
|
|
|
|
|
}); |
710
|
|
|
|
|
|
|
|
711
|
|
|
|
|
|
|
Emitted AMQP protocol Connection.Open-Ok method is received. |
712
|
|
|
|
|
|
|
|
713
|
|
|
|
|
|
|
=head2 close |
714
|
|
|
|
|
|
|
|
715
|
|
|
|
|
|
|
$client->on(close => sub { |
716
|
|
|
|
|
|
|
my ($client) = @_; |
717
|
|
|
|
|
|
|
... |
718
|
|
|
|
|
|
|
}); |
719
|
|
|
|
|
|
|
|
720
|
|
|
|
|
|
|
Emitted on reception of Connection.Close-Ok method. |
721
|
|
|
|
|
|
|
|
722
|
|
|
|
|
|
|
=head2 disconnect |
723
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
$client->on(close => sub { |
725
|
|
|
|
|
|
|
my ($client) = @_; |
726
|
|
|
|
|
|
|
... |
727
|
|
|
|
|
|
|
}); |
728
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
Emitted when TCP/IP connection gets disconnected. |
730
|
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
732
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
L has following attributes. |
734
|
|
|
|
|
|
|
|
735
|
|
|
|
|
|
|
=head2 tls |
736
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
my $tls = $client->tls; |
738
|
|
|
|
|
|
|
$client = $client->tls(1) |
739
|
|
|
|
|
|
|
|
740
|
|
|
|
|
|
|
Force secure connection. Default is disabled (C<0>). |
741
|
|
|
|
|
|
|
|
742
|
|
|
|
|
|
|
=head2 user |
743
|
|
|
|
|
|
|
|
744
|
|
|
|
|
|
|
my $user = $client->user; |
745
|
|
|
|
|
|
|
$client = $client->user('guest') |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
Sets username for authorization, by default it's not defined. |
748
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
=head2 pass |
750
|
|
|
|
|
|
|
|
751
|
|
|
|
|
|
|
my $pass = $client->pass; |
752
|
|
|
|
|
|
|
$client = $client->pass('secret') |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
Sets user password for authorization, by default it's not defined. |
755
|
|
|
|
|
|
|
|
756
|
|
|
|
|
|
|
=head2 host |
757
|
|
|
|
|
|
|
|
758
|
|
|
|
|
|
|
my $host = $client->host; |
759
|
|
|
|
|
|
|
$client = $client->host('localhost') |
760
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
Hostname or IP address of RabbitMQ server. Defaults to C. |
762
|
|
|
|
|
|
|
|
763
|
|
|
|
|
|
|
=head2 port |
764
|
|
|
|
|
|
|
|
765
|
|
|
|
|
|
|
my $port = $client->port; |
766
|
|
|
|
|
|
|
$client = $client->port(1234) |
767
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
Port on which RabbitMQ server listens for new connections. |
769
|
|
|
|
|
|
|
Defaults to C<5672>, which is standard RabbitMQ server listen port. |
770
|
|
|
|
|
|
|
|
771
|
|
|
|
|
|
|
=head2 vhost |
772
|
|
|
|
|
|
|
|
773
|
|
|
|
|
|
|
my $vhost = $client->vhost; |
774
|
|
|
|
|
|
|
$client = $client->vhost('/') |
775
|
|
|
|
|
|
|
|
776
|
|
|
|
|
|
|
RabbitMQ virtual server to user. Default is C>. |
777
|
|
|
|
|
|
|
|
778
|
|
|
|
|
|
|
=head2 params |
779
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
my $params = $client->params; |
781
|
|
|
|
|
|
|
$client = $client->params(Mojo::Parameters->new('verify=1')) |
782
|
|
|
|
|
|
|
|
783
|
|
|
|
|
|
|
Sets additional parameters for connection. Default is not defined. |
784
|
|
|
|
|
|
|
|
785
|
|
|
|
|
|
|
For list of supported parameters see L"SUPPORTED QUERY PARAMETERS">. |
786
|
|
|
|
|
|
|
|
787
|
|
|
|
|
|
|
=head2 url |
788
|
|
|
|
|
|
|
|
789
|
|
|
|
|
|
|
my $url = $client->url; |
790
|
|
|
|
|
|
|
$client = $client->url('amqp://...'); |
791
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
Sets all connection parameters in one string, according to specification from |
793
|
|
|
|
|
|
|
L. |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ] |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ] |
798
|
|
|
|
|
|
|
|
799
|
|
|
|
|
|
|
amqp_userinfo = username [ ":" password ] |
800
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
username = *( unreserved / pct-encoded / sub-delims ) |
802
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
password = *( unreserved / pct-encoded / sub-delims ) |
804
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
vhost = segment |
806
|
|
|
|
|
|
|
|
807
|
|
|
|
|
|
|
=head2 heartbeat_timeout |
808
|
|
|
|
|
|
|
|
809
|
|
|
|
|
|
|
my $timeout = $client->heartbeat_timeout; |
810
|
|
|
|
|
|
|
$client = $client->heartbeat_timeout(180); |
811
|
|
|
|
|
|
|
|
812
|
|
|
|
|
|
|
Heartbeats are use to monitor peer reachability in AMQP. |
813
|
|
|
|
|
|
|
Default value is C<60> seconds, if set to C<0> no heartbeats will be sent. |
814
|
|
|
|
|
|
|
|
815
|
|
|
|
|
|
|
=head2 connect_timeout |
816
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
my $timeout = $client->connect_timeout; |
818
|
|
|
|
|
|
|
$client = $client->connect_timeout(5); |
819
|
|
|
|
|
|
|
|
820
|
|
|
|
|
|
|
Connection timeout used by L. |
821
|
|
|
|
|
|
|
Defaults to environment variable C or C<10> seconds |
822
|
|
|
|
|
|
|
if nothing else is set. |
823
|
|
|
|
|
|
|
|
824
|
|
|
|
|
|
|
=head2 max_channels |
825
|
|
|
|
|
|
|
|
826
|
|
|
|
|
|
|
my $max_channels = $client->max_channels; |
827
|
|
|
|
|
|
|
$client = $client->max_channels(10); |
828
|
|
|
|
|
|
|
|
829
|
|
|
|
|
|
|
Maximum number of channels allowed to be active. Defaults to C<0> which |
830
|
|
|
|
|
|
|
means no implicit limit. |
831
|
|
|
|
|
|
|
|
832
|
|
|
|
|
|
|
When you try to call C over limit an C will be |
833
|
|
|
|
|
|
|
emitted on channel saying that: I. |
834
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
=head1 STATIC METHODS |
836
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
=head2 consumer |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->consumer(...) |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
Shortcut for creating L. |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
=head2 publisher |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->publisher(...) |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
Shortcut for creating L. |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
=head1 METHODS |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
L inherits all methods from L and implements |
852
|
|
|
|
|
|
|
the following new ones. |
853
|
|
|
|
|
|
|
|
854
|
|
|
|
|
|
|
=head2 connect |
855
|
|
|
|
|
|
|
|
856
|
|
|
|
|
|
|
$client->connect(); |
857
|
|
|
|
|
|
|
|
858
|
|
|
|
|
|
|
Tries to connect to RabbitMQ server and negotiate AMQP protocol. |
859
|
|
|
|
|
|
|
|
860
|
|
|
|
|
|
|
=head2 close |
861
|
|
|
|
|
|
|
|
862
|
|
|
|
|
|
|
$client->close(); |
863
|
|
|
|
|
|
|
|
864
|
|
|
|
|
|
|
=head2 param |
865
|
|
|
|
|
|
|
|
866
|
|
|
|
|
|
|
my $param = $client->param('name'); |
867
|
|
|
|
|
|
|
$client = $client->param(name => 'value'); |
868
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
=head2 add_channel |
870
|
|
|
|
|
|
|
|
871
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
872
|
|
|
|
|
|
|
... |
873
|
|
|
|
|
|
|
$channel = $client->add_channel($channel); |
874
|
|
|
|
|
|
|
$channel->open; |
875
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
=head2 open_channel |
877
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
879
|
|
|
|
|
|
|
... |
880
|
|
|
|
|
|
|
$client->open_channel($channel); |
881
|
|
|
|
|
|
|
|
882
|
|
|
|
|
|
|
=head2 delete_channel |
883
|
|
|
|
|
|
|
|
884
|
|
|
|
|
|
|
my $removed = $client->delete_channel($channel->id); |
885
|
|
|
|
|
|
|
|
886
|
|
|
|
|
|
|
=head1 SUPPORTED QUERY PARAMETERS |
887
|
|
|
|
|
|
|
|
888
|
|
|
|
|
|
|
There's no formal specification, nevertheless a list of common parameters |
889
|
|
|
|
|
|
|
recognized by officially supported RabbitMQ clients is maintained here: |
890
|
|
|
|
|
|
|
L. |
891
|
|
|
|
|
|
|
|
892
|
|
|
|
|
|
|
Some shortcuts are also supported, you'll find them in parenthesis. |
893
|
|
|
|
|
|
|
|
894
|
|
|
|
|
|
|
Aliases are less significant, so when both are specified only primary |
895
|
|
|
|
|
|
|
value will be used. |
896
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
=head2 cacertfile (I) |
898
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
Path to Certificate Authority file for TLS. |
900
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
=head2 certfile (I) |
902
|
|
|
|
|
|
|
|
903
|
|
|
|
|
|
|
Path to the client certificate file for TLS. |
904
|
|
|
|
|
|
|
|
905
|
|
|
|
|
|
|
=head2 keyfile (I) |
906
|
|
|
|
|
|
|
|
907
|
|
|
|
|
|
|
Path to the client certificate private key file for TLS. |
908
|
|
|
|
|
|
|
|
909
|
|
|
|
|
|
|
=head2 fail_if_no_peer_cert (I) |
910
|
|
|
|
|
|
|
|
911
|
|
|
|
|
|
|
TLS verification mode, defaults to 0x01 on the client-side if a certificate |
912
|
|
|
|
|
|
|
authority file has been provided, or 0x00 otherwise. |
913
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
=head2 auth_mechanism |
915
|
|
|
|
|
|
|
|
916
|
|
|
|
|
|
|
Currently only AMQPLAIN is supported, B. |
917
|
|
|
|
|
|
|
|
918
|
|
|
|
|
|
|
=head2 heartbeat |
919
|
|
|
|
|
|
|
|
920
|
|
|
|
|
|
|
Sets requested heartbeat timeout, just like C attribute. |
921
|
|
|
|
|
|
|
|
922
|
|
|
|
|
|
|
=head2 connection_timeout (I) |
923
|
|
|
|
|
|
|
|
924
|
|
|
|
|
|
|
Sets connection timeout - see L attribute. |
925
|
|
|
|
|
|
|
|
926
|
|
|
|
|
|
|
=head2 channel_max |
927
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
Sets maximum number of channels - see L attribute. |
929
|
|
|
|
|
|
|
|
930
|
|
|
|
|
|
|
=head1 SEE ALSO |
931
|
|
|
|
|
|
|
|
932
|
|
|
|
|
|
|
L, L, L |
933
|
|
|
|
|
|
|
|
934
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
935
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
Copyright (C) 2015-2017, Sebastian Podjasek and others |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >> |
939
|
|
|
|
|
|
|
|
940
|
|
|
|
|
|
|
This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0. |
941
|
|
|
|
|
|
|
|
942
|
|
|
|
|
|
|
=cut |