line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Mojo::RabbitMQ::Client; |
2
|
5
|
|
|
5
|
|
171462
|
use Mojo::Base 'Mojo::EventEmitter'; |
|
5
|
|
|
|
|
798642
|
|
|
5
|
|
|
|
|
47
|
|
3
|
|
|
|
|
|
|
|
4
|
5
|
|
|
5
|
|
8630
|
use Carp qw(croak confess); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
259
|
|
5
|
5
|
|
|
5
|
|
2268
|
use Mojo::URL; |
|
5
|
|
|
|
|
37041
|
|
|
5
|
|
|
|
|
41
|
|
6
|
5
|
|
|
5
|
|
2304
|
use Mojo::Home; |
|
5
|
|
|
|
|
154564
|
|
|
5
|
|
|
|
|
276
|
|
7
|
5
|
|
|
5
|
|
2571
|
use Mojo::IOLoop; |
|
5
|
|
|
|
|
551333
|
|
|
5
|
|
|
|
|
38
|
|
8
|
5
|
|
|
5
|
|
292
|
use Mojo::Parameters; |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
38
|
|
9
|
5
|
|
|
5
|
|
169
|
use Mojo::Promise; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
50
|
|
10
|
5
|
|
|
5
|
|
170
|
use Mojo::Util qw(url_unescape dumper); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
260
|
|
11
|
5
|
|
|
5
|
|
31
|
use List::Util qw(none); |
|
5
|
|
|
|
|
12
|
|
|
5
|
|
|
|
|
325
|
|
12
|
5
|
|
|
5
|
|
73
|
use Scalar::Util qw(blessed weaken); |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
219
|
|
13
|
5
|
|
|
5
|
|
37
|
use File::Basename 'dirname'; |
|
5
|
|
|
|
|
14
|
|
|
5
|
|
|
|
|
353
|
|
14
|
5
|
|
|
5
|
|
2504
|
use File::ShareDir qw(dist_file); |
|
5
|
|
|
|
|
118722
|
|
|
5
|
|
|
|
|
314
|
|
15
|
|
|
|
|
|
|
|
16
|
5
|
|
|
5
|
|
2335
|
use Net::AMQP; |
|
5
|
|
|
|
|
287196
|
|
|
5
|
|
|
|
|
204
|
|
17
|
5
|
|
|
5
|
|
45
|
use Net::AMQP::Common qw(:all); |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
1150
|
|
18
|
|
|
|
|
|
|
|
19
|
5
|
|
|
5
|
|
3449
|
use Mojo::RabbitMQ::Client::Channel; |
|
5
|
|
|
|
|
14
|
|
|
5
|
|
|
|
|
38
|
|
20
|
5
|
|
|
5
|
|
231
|
use Mojo::RabbitMQ::Client::LocalQueue; |
|
5
|
|
|
|
|
11
|
|
|
5
|
|
|
|
|
33
|
|
21
|
|
|
|
|
|
|
require Mojo::RabbitMQ::Client::Consumer; |
22
|
|
|
|
|
|
|
require Mojo::RabbitMQ::Client::Publisher; |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
our $VERSION = "0.3.0"; |
25
|
|
|
|
|
|
|
|
26
|
5
|
|
50
|
5
|
|
468
|
use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0; |
|
5
|
|
|
|
|
13
|
|
|
5
|
|
|
|
|
23477
|
|
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
has is_open => 0; |
29
|
|
|
|
|
|
|
has url => undef; |
30
|
|
|
|
|
|
|
has tls => sub { shift->_uri_handler('tls') }; |
31
|
|
|
|
|
|
|
has user => sub { shift->_uri_handler('user') }; |
32
|
|
|
|
|
|
|
has pass => sub { shift->_uri_handler('pass') }; |
33
|
|
|
|
|
|
|
has host => sub { shift->_uri_handler('host') }; |
34
|
|
|
|
|
|
|
has port => sub { shift->_uri_handler('port') }; |
35
|
|
|
|
|
|
|
has vhost => sub { shift->_uri_handler('vhost') }; |
36
|
|
|
|
|
|
|
has params => sub { shift->_uri_handler('params') // Mojo::Parameters->new }; |
37
|
|
|
|
|
|
|
has connect_timeout => sub { $ENV{MOJO_CONNECT_TIMEOUT} // 10 }; |
38
|
|
|
|
|
|
|
has heartbeat_timeout => 60; |
39
|
|
|
|
|
|
|
has heartbeat_received => 0; # When did we receive last heartbeat |
40
|
|
|
|
|
|
|
has heartbeat_sent => 0; # When did we sent last heartbeat |
41
|
|
|
|
|
|
|
has ioloop => sub { Mojo::IOLoop->singleton }; |
42
|
|
|
|
|
|
|
has max_buffer_size => 16384; |
43
|
|
|
|
|
|
|
has max_channels => 0; |
44
|
|
|
|
|
|
|
has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new }; |
45
|
|
|
|
|
|
|
has channels => sub { {} }; |
46
|
|
|
|
|
|
|
has stream_id => undef; |
47
|
|
|
|
|
|
|
|
48
|
|
|
|
|
|
|
sub connect { |
49
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
50
|
0
|
|
|
|
|
0
|
$self->{buffer} = ''; |
51
|
|
|
|
|
|
|
|
52
|
0
|
|
|
|
|
0
|
my $id; |
53
|
0
|
|
|
0
|
|
0
|
$id = $self->_connect(sub { $self->_connected($id) }); |
|
0
|
|
|
|
|
0
|
|
54
|
0
|
|
|
|
|
0
|
$self->stream_id($id); |
55
|
|
|
|
|
|
|
|
56
|
0
|
|
|
|
|
0
|
return $id; |
57
|
|
|
|
|
|
|
} |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
sub connect_p { |
60
|
0
|
|
|
0
|
0
|
0
|
my $self = shift; |
61
|
0
|
|
|
|
|
0
|
my $promise = Mojo::Promise->new; |
62
|
|
|
|
|
|
|
|
63
|
0
|
|
|
|
|
0
|
my $id; |
64
|
|
|
|
|
|
|
|
65
|
0
|
|
|
|
|
0
|
weaken $self; |
66
|
|
|
|
|
|
|
my $handler = sub { |
67
|
0
|
|
|
0
|
|
0
|
my ($err) = @_; |
68
|
0
|
0
|
|
|
|
0
|
if (defined $err) { |
69
|
0
|
|
|
|
|
0
|
return $promise->reject($err); |
70
|
|
|
|
|
|
|
} |
71
|
|
|
|
|
|
|
|
72
|
0
|
|
|
|
|
0
|
return $promise->resolve($self); |
73
|
0
|
|
|
|
|
0
|
}; |
74
|
|
|
|
|
|
|
|
75
|
0
|
|
|
0
|
|
0
|
$id = $self->_connect(sub { $self->_connected($id, $handler) }); |
|
0
|
|
|
|
|
0
|
|
76
|
0
|
|
|
|
|
0
|
$self->stream_id($id); |
77
|
|
|
|
|
|
|
|
78
|
0
|
|
|
|
|
0
|
return $promise; |
79
|
|
|
|
|
|
|
} |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
sub consumer { |
82
|
2
|
|
|
2
|
1
|
1451
|
my ($class, @params) = @_; |
83
|
2
|
100
|
|
|
|
24
|
croak "consumer is a static method" if ref $class; |
84
|
|
|
|
|
|
|
|
85
|
1
|
|
|
|
|
13
|
return Mojo::RabbitMQ::Client::Consumer->new(@params); |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
sub publisher { |
89
|
2
|
|
|
2
|
1
|
1390
|
my ($class, @params) = @_; |
90
|
2
|
100
|
|
|
|
17
|
croak "publisher is a static method" if ref $class; |
91
|
|
|
|
|
|
|
|
92
|
1
|
|
|
|
|
8
|
return Mojo::RabbitMQ::Client::Publisher->new(@params); |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
sub param { |
96
|
6
|
|
|
6
|
1
|
10635
|
my $self = shift; |
97
|
6
|
50
|
|
|
|
18
|
return undef unless defined $self->params; |
98
|
6
|
|
|
|
|
57
|
return $self->params->param(@_); |
99
|
|
|
|
|
|
|
} |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
sub add_channel { |
102
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
103
|
0
|
|
|
|
|
0
|
my $channel = shift; |
104
|
|
|
|
|
|
|
|
105
|
0
|
|
|
|
|
0
|
my $id = $channel->id; |
106
|
0
|
0
|
0
|
|
|
0
|
if ($id and $self->channels->{$id}) { |
107
|
0
|
|
|
|
|
0
|
return $channel->emit( |
108
|
|
|
|
|
|
|
error => 'Channel with id: ' . $id . ' already defined'); |
109
|
|
|
|
|
|
|
} |
110
|
|
|
|
|
|
|
|
111
|
0
|
0
|
0
|
|
|
0
|
if ($self->max_channels > 0 |
112
|
0
|
|
|
|
|
0
|
and scalar keys %{$self->channels} >= $self->max_channels) |
113
|
|
|
|
|
|
|
{ |
114
|
0
|
|
|
|
|
0
|
return $channel->emit(error => 'Maximum number of channels reached'); |
115
|
|
|
|
|
|
|
} |
116
|
|
|
|
|
|
|
|
117
|
0
|
0
|
|
|
|
0
|
if (not $id) { |
118
|
0
|
|
|
|
|
0
|
for my $candidate_id (1 .. (2**16 - 1)) { |
119
|
0
|
0
|
|
|
|
0
|
next if defined $self->channels->{$candidate_id}; |
120
|
0
|
|
|
|
|
0
|
$id = $candidate_id; |
121
|
0
|
|
|
|
|
0
|
last; |
122
|
|
|
|
|
|
|
} |
123
|
0
|
0
|
|
|
|
0
|
unless ($id) { |
124
|
0
|
|
|
|
|
0
|
return $channel->emit(error => 'Ran out of channel ids'); |
125
|
|
|
|
|
|
|
} |
126
|
|
|
|
|
|
|
} |
127
|
|
|
|
|
|
|
|
128
|
0
|
|
|
|
|
0
|
$self->channels->{$id} = $channel->id($id)->client($self); |
129
|
0
|
|
|
|
|
0
|
weaken $channel->{client}; |
130
|
|
|
|
|
|
|
|
131
|
0
|
|
|
|
|
0
|
return $channel; |
132
|
|
|
|
|
|
|
} |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
sub acquire_channel_p { |
135
|
0
|
|
|
0
|
0
|
0
|
my $self = shift; |
136
|
|
|
|
|
|
|
|
137
|
0
|
|
|
|
|
0
|
my $promise = Mojo::Promise->new; |
138
|
|
|
|
|
|
|
|
139
|
0
|
|
|
|
|
0
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
140
|
0
|
|
|
0
|
|
0
|
$channel->catch(sub { $promise->reject(@_); undef $promise }); |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
141
|
0
|
|
|
0
|
|
0
|
$channel->on(close => sub { warn "Channel closed" }); |
|
0
|
|
|
|
|
0
|
|
142
|
0
|
|
|
0
|
|
0
|
$channel->on(open => sub { $promise->resolve(@_); undef $promise }); |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
143
|
|
|
|
|
|
|
|
144
|
0
|
|
|
|
|
0
|
$self->open_channel($channel); |
145
|
|
|
|
|
|
|
|
146
|
0
|
|
|
|
|
0
|
return $promise; |
147
|
|
|
|
|
|
|
} |
148
|
|
|
|
|
|
|
|
149
|
|
|
|
|
|
|
sub open_channel { |
150
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
151
|
0
|
|
|
|
|
0
|
my $channel = shift; |
152
|
|
|
|
|
|
|
|
153
|
0
|
0
|
|
|
|
0
|
return $channel->emit(error => 'Client connection not opened') |
154
|
|
|
|
|
|
|
unless $self->is_open; |
155
|
|
|
|
|
|
|
|
156
|
0
|
|
|
|
|
0
|
$self->add_channel($channel)->open; |
157
|
|
|
|
|
|
|
|
158
|
0
|
|
|
|
|
0
|
return $self; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
sub delete_channel { |
162
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
163
|
0
|
|
|
|
|
0
|
return delete $self->channels->{shift}; |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
sub close { |
167
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
168
|
|
|
|
|
|
|
|
169
|
0
|
|
|
|
|
0
|
weaken $self; |
170
|
|
|
|
|
|
|
$self->_write_expect( |
171
|
|
|
|
|
|
|
'Connection::Close' => {}, |
172
|
|
|
|
|
|
|
'Connection::CloseOk' => sub { |
173
|
0
|
|
|
0
|
|
0
|
warn "-- Connection::CloseOk\n" if DEBUG; |
174
|
0
|
|
|
|
|
0
|
$self->emit('close'); |
175
|
0
|
|
|
|
|
0
|
$self->_close; |
176
|
|
|
|
|
|
|
}, |
177
|
|
|
|
|
|
|
sub { |
178
|
0
|
|
|
0
|
|
0
|
$self->_close; |
179
|
|
|
|
|
|
|
} |
180
|
0
|
|
|
|
|
0
|
); |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
0
|
|
|
0
|
|
0
|
sub _loop { $_[0]->ioloop } |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub _error { |
186
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $err) = @_; |
187
|
|
|
|
|
|
|
|
188
|
0
|
|
|
|
|
0
|
$self->emit(error => $err); |
189
|
|
|
|
|
|
|
} |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
sub _uri_handler { |
192
|
20
|
|
|
20
|
|
48
|
my $self = shift; |
193
|
20
|
|
|
|
|
36
|
my $attr = shift; |
194
|
|
|
|
|
|
|
|
195
|
20
|
100
|
|
|
|
44
|
return undef unless defined $self->url; |
196
|
|
|
|
|
|
|
|
197
|
18
|
50
|
33
|
|
|
100
|
$self->url(Mojo::URL->new($self->url)) |
198
|
|
|
|
|
|
|
unless blessed $self->url && $self->url->isa('Mojo::URL'); |
199
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
# Set some defaults |
201
|
18
|
|
|
|
|
3562
|
my %defaults = ( |
202
|
|
|
|
|
|
|
tls => 0, |
203
|
|
|
|
|
|
|
user => undef, |
204
|
|
|
|
|
|
|
pass => undef, |
205
|
|
|
|
|
|
|
host => 'localhost', |
206
|
|
|
|
|
|
|
port => 5672, |
207
|
|
|
|
|
|
|
vhost => '/', |
208
|
|
|
|
|
|
|
params => undef |
209
|
|
|
|
|
|
|
); |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
# Check secure scheme in url |
212
|
18
|
100
|
|
|
|
42
|
$defaults{tls} = 1 |
213
|
|
|
|
|
|
|
if $self->url->scheme |
214
|
|
|
|
|
|
|
=~ /^(amqp|rabbitmq)s$/; # Fallback support for rabbitmq scheme name |
215
|
18
|
100
|
|
|
|
157
|
$defaults{port} = 5671 if $defaults{tls}; |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
# Get host & port |
218
|
18
|
100
|
66
|
|
|
38
|
$defaults{host} = $self->url->host |
219
|
|
|
|
|
|
|
if defined $self->url->host && $self->url->host ne ''; |
220
|
18
|
100
|
|
|
|
279
|
$defaults{port} = $self->url->port if defined $self->url->port; |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
# Get user & password |
223
|
18
|
|
|
|
|
137
|
my $userinfo = $self->url->userinfo; |
224
|
18
|
100
|
|
|
|
111
|
if (defined $userinfo) { |
225
|
5
|
|
|
|
|
23
|
my ($user, $pass) = split /:/, $userinfo; |
226
|
5
|
|
|
|
|
11
|
$defaults{user} = $user; |
227
|
5
|
|
|
|
|
11
|
$defaults{pass} = $pass; |
228
|
|
|
|
|
|
|
} |
229
|
|
|
|
|
|
|
|
230
|
18
|
|
|
|
|
40
|
my $vhost = url_unescape $self->url->path; |
231
|
18
|
|
|
|
|
1661
|
$vhost =~ s|^/(.+)$|$1|; |
232
|
18
|
100
|
66
|
|
|
824
|
$defaults{vhost} = $vhost if defined $vhost && $vhost ne ''; |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
# Query params |
235
|
18
|
|
|
|
|
543
|
my $params = $defaults{params} = $self->url->query; |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
# Handle common aliases to internal names |
238
|
18
|
|
|
|
|
370
|
my %aliases = ( |
239
|
|
|
|
|
|
|
cacertfile => 'ca', |
240
|
|
|
|
|
|
|
certfile => 'cert', |
241
|
|
|
|
|
|
|
keyfile => 'key', |
242
|
|
|
|
|
|
|
fail_if_no_peer_cert => 'verify', |
243
|
|
|
|
|
|
|
connection_timeout => 'timeout' |
244
|
|
|
|
|
|
|
); |
245
|
|
|
|
|
|
|
$params->param($aliases{$_}, $params->param($_)) |
246
|
18
|
|
|
|
|
60
|
foreach grep { defined $params->param($_) } keys %aliases; |
|
90
|
|
|
|
|
2027
|
|
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
# Some query parameters are translated to attribute values |
249
|
18
|
|
|
|
|
751
|
my %attributes = ( |
250
|
|
|
|
|
|
|
heartbeat_timeout => 'heartbeat', |
251
|
|
|
|
|
|
|
connect_timeout => 'timeout', |
252
|
|
|
|
|
|
|
max_channels => 'channel_max' |
253
|
|
|
|
|
|
|
); |
254
|
|
|
|
|
|
|
$self->$_($params->param($attributes{$_})) |
255
|
18
|
|
|
|
|
57
|
foreach grep { defined $params->param($attributes{$_}) } keys %attributes; |
|
54
|
|
|
|
|
704
|
|
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
# Set all |
258
|
18
|
|
|
|
|
496
|
$self->$_($defaults{$_}) foreach keys %defaults; |
259
|
|
|
|
|
|
|
|
260
|
18
|
|
|
|
|
706
|
return $self->$attr; |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
sub _close { |
264
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
265
|
0
|
|
|
|
|
0
|
$self->_loop->stream($self->stream_id)->close_gracefully; |
266
|
|
|
|
|
|
|
} |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
sub _handle { |
269
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $close) = @_; |
270
|
|
|
|
|
|
|
|
271
|
0
|
|
|
|
|
0
|
$self->emit('disconnect'); |
272
|
|
|
|
|
|
|
|
273
|
0
|
|
|
|
|
0
|
$self->_loop->remove($id); |
274
|
|
|
|
|
|
|
} |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
sub _read { |
277
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $chunk) = @_; |
278
|
|
|
|
|
|
|
|
279
|
0
|
|
|
|
|
0
|
warn "<- @{[dumper $chunk]}" if DEBUG; |
280
|
0
|
|
|
|
|
0
|
$self->{buffer} .= $chunk; |
281
|
0
|
|
|
|
|
0
|
$self->_parse_frames; |
282
|
|
|
|
|
|
|
|
283
|
0
|
|
|
|
|
0
|
return; |
284
|
|
|
|
|
|
|
} |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
sub _parse_frames { |
287
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
288
|
|
|
|
|
|
|
|
289
|
0
|
|
|
|
|
0
|
for my $frame (Net::AMQP->parse_raw_frames(\$self->{buffer})) { |
290
|
|
|
|
|
|
|
|
291
|
0
|
0
|
0
|
|
|
0
|
if ($frame->isa('Net::AMQP::Frame::Heartbeat')) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
292
|
0
|
|
|
|
|
0
|
$self->heartbeat_received(time()); |
293
|
|
|
|
|
|
|
} |
294
|
|
|
|
|
|
|
elsif ($frame->isa('Net::AMQP::Frame::Method') |
295
|
|
|
|
|
|
|
and $frame->method_frame->isa('Net::AMQP::Protocol::Connection::Close')) |
296
|
|
|
|
|
|
|
{ |
297
|
0
|
|
|
|
|
0
|
$self->is_open(0); |
298
|
|
|
|
|
|
|
|
299
|
0
|
|
|
|
|
0
|
$self->_write_frame(Net::AMQP::Protocol::Connection::CloseOk->new()); |
300
|
|
|
|
|
|
|
$self->emit(disconnect => "Server side disconnection: " |
301
|
0
|
|
|
|
|
0
|
. $frame->method_frame->{reply_text}); |
302
|
|
|
|
|
|
|
} |
303
|
|
|
|
|
|
|
elsif ($frame->channel == 0) { |
304
|
0
|
|
|
|
|
0
|
$self->queue->push($frame); |
305
|
|
|
|
|
|
|
} |
306
|
|
|
|
|
|
|
else { |
307
|
0
|
|
|
|
|
0
|
my $channel = $self->channels->{$frame->channel}; |
308
|
0
|
0
|
|
|
|
0
|
if (defined $channel) { |
309
|
0
|
|
|
|
|
0
|
$channel->_push_queue_or_consume($frame); |
310
|
|
|
|
|
|
|
} |
311
|
|
|
|
|
|
|
else { |
312
|
0
|
|
0
|
|
|
0
|
$self->emit( |
313
|
|
|
|
|
|
|
error => "Unknown channel id received: " |
314
|
|
|
|
|
|
|
. ($frame->channel // '(undef)'), |
315
|
|
|
|
|
|
|
$frame |
316
|
|
|
|
|
|
|
); |
317
|
|
|
|
|
|
|
} |
318
|
|
|
|
|
|
|
} |
319
|
|
|
|
|
|
|
} |
320
|
|
|
|
|
|
|
} |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
sub _connect { |
323
|
0
|
|
|
0
|
|
0
|
my ($self, $cb) = @_; |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
# Options |
326
|
|
|
|
|
|
|
# Parse according to (https://www.rabbitmq.com/uri-spec.html) |
327
|
0
|
|
|
|
|
0
|
my $options = { |
328
|
|
|
|
|
|
|
address => $self->host, |
329
|
|
|
|
|
|
|
port => $self->port, |
330
|
|
|
|
|
|
|
timeout => $self->connect_timeout, |
331
|
|
|
|
|
|
|
tls => $self->tls, |
332
|
|
|
|
|
|
|
tls_ca => scalar $self->param('ca'), |
333
|
|
|
|
|
|
|
tls_cert => scalar $self->param('cert'), |
334
|
|
|
|
|
|
|
tls_key => scalar $self->param('key') |
335
|
|
|
|
|
|
|
}; |
336
|
0
|
|
|
|
|
0
|
my $verify = $self->param('verify'); |
337
|
0
|
0
|
|
|
|
0
|
$options->{tls_verify} = hex $verify if defined $verify; |
338
|
|
|
|
|
|
|
|
339
|
|
|
|
|
|
|
# Connect |
340
|
0
|
|
|
|
|
0
|
weaken $self; |
341
|
0
|
|
|
|
|
0
|
my $id; |
342
|
|
|
|
|
|
|
return $id = $self->_loop->client( |
343
|
|
|
|
|
|
|
$options => sub { |
344
|
0
|
|
|
0
|
|
0
|
my ($loop, $err, $stream) = @_; |
345
|
|
|
|
|
|
|
|
346
|
|
|
|
|
|
|
# Connection error |
347
|
0
|
0
|
|
|
|
0
|
return unless $self; |
348
|
0
|
0
|
|
|
|
0
|
return $self->_error($id, $err) if $err; |
349
|
|
|
|
|
|
|
|
350
|
0
|
|
|
|
|
0
|
$self->emit(connect => $stream); |
351
|
|
|
|
|
|
|
|
352
|
|
|
|
|
|
|
# Connection established |
353
|
0
|
|
|
|
|
0
|
$stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') }); |
|
0
|
|
|
|
|
0
|
|
354
|
0
|
0
|
|
|
|
0
|
$stream->on(close => sub { $self && $self->_handle($id, 1) }); |
|
0
|
|
|
|
|
0
|
|
355
|
0
|
0
|
|
|
|
0
|
$stream->on(error => sub { $self && $self->_error($id, pop) }); |
|
0
|
|
|
|
|
0
|
|
356
|
0
|
0
|
|
|
|
0
|
$stream->on(read => sub { $self && $self->_read($id, pop) }); |
|
0
|
|
|
|
|
0
|
|
357
|
0
|
|
|
|
|
0
|
$cb->(); |
358
|
|
|
|
|
|
|
} |
359
|
0
|
|
|
|
|
0
|
); |
360
|
|
|
|
|
|
|
} |
361
|
|
|
|
|
|
|
|
362
|
|
|
|
|
|
|
sub _connected { |
363
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $cb) = @_; |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
# Inactivity timeout |
366
|
0
|
|
|
|
|
0
|
my $stream = $self->_loop->stream($id)->timeout(0); |
367
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
# Store connection information in transaction |
369
|
0
|
|
|
|
|
0
|
my $handle = $stream->handle; |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# Detect that xml spec was already loaded |
372
|
0
|
|
|
|
|
0
|
my $loaded = eval { Net::AMQP::Protocol::Connection::StartOk->new; 1 }; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
373
|
0
|
0
|
|
|
|
0
|
unless ($loaded) { # Load AMQP specs |
374
|
0
|
|
|
|
|
0
|
my $file = "amqp0-9-1.stripped.extended.xml"; |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
# Original spec is in "fixed_amqp0-8.xml" |
377
|
0
|
|
|
|
|
0
|
my $share = dist_file('Mojo-RabbitMQ-Client', $file); |
378
|
0
|
|
|
|
|
0
|
Net::AMQP::Protocol->load_xml_spec($share); |
379
|
|
|
|
|
|
|
} |
380
|
|
|
|
|
|
|
|
381
|
0
|
|
|
|
|
0
|
$self->_write($id => Net::AMQP::Protocol->header); |
382
|
|
|
|
|
|
|
|
383
|
0
|
|
|
|
|
0
|
weaken $self; |
384
|
|
|
|
|
|
|
$self->_expect( |
385
|
|
|
|
|
|
|
'Connection::Start' => sub { |
386
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
387
|
|
|
|
|
|
|
|
388
|
0
|
|
|
|
|
0
|
my @server_mechanisms = split /\s/, $frame->method_frame->mechanisms; |
389
|
0
|
|
0
|
|
|
0
|
my $param_mechanism = $self->param('auth_mechanism') // ''; |
390
|
0
|
|
|
|
|
0
|
my @client_mechanisms = ('AMQPLAIN', 'EXTERNAL'); |
391
|
0
|
0
|
|
|
|
0
|
@client_mechanisms = ($param_mechanism) if ($param_mechanism); |
392
|
0
|
|
|
|
|
0
|
warn "-- Server mechanisms: @server_mechanisms\n" if DEBUG; |
393
|
0
|
|
|
|
|
0
|
warn "-- Client mechanisms: @client_mechanisms\n" if DEBUG; |
394
|
0
|
|
|
|
|
0
|
my $mechanism; |
395
|
0
|
|
|
|
|
0
|
for my $cand (@client_mechanisms) { |
396
|
0
|
0
|
|
|
|
0
|
if (grep { $_ eq $cand } @server_mechanisms) { |
|
0
|
|
|
|
|
0
|
|
397
|
0
|
|
|
|
|
0
|
$mechanism = $cand; |
398
|
0
|
|
|
|
|
0
|
last; |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
} |
401
|
0
|
0
|
|
|
|
0
|
return $self->emit(error => 'No authentication mechanism could be negotiated') |
402
|
|
|
|
|
|
|
unless $mechanism; |
403
|
|
|
|
|
|
|
|
404
|
0
|
|
|
|
|
0
|
my @locales = split /\s/, $frame->method_frame->locales; |
405
|
|
|
|
|
|
|
return $self->emit(error => 'en_US is not found in locales') |
406
|
0
|
0
|
|
|
|
0
|
if none { $_ eq 'en_US' } @locales; |
|
0
|
|
|
|
|
0
|
|
407
|
|
|
|
|
|
|
|
408
|
0
|
|
|
|
|
0
|
$self->{_server_properties} = $frame->method_frame->server_properties; |
409
|
|
|
|
|
|
|
|
410
|
0
|
|
|
|
|
0
|
warn "-- Connection::Start {product: " . $self->{_server_properties}->{product} . ", version: " . $self->{_server_properties}->{version} . "}\n" if DEBUG; |
411
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
412
|
|
|
|
|
|
|
Net::AMQP::Protocol::Connection::StartOk->new( |
413
|
|
|
|
|
|
|
client_properties => { |
414
|
|
|
|
|
|
|
platform => 'Perl', |
415
|
|
|
|
|
|
|
product => __PACKAGE__, |
416
|
|
|
|
|
|
|
information => 'https://github.com/inway/mojo-rabbitmq-client', |
417
|
|
|
|
|
|
|
version => __PACKAGE__->VERSION, |
418
|
|
|
|
|
|
|
}, |
419
|
|
|
|
|
|
|
mechanism => $mechanism, |
420
|
|
|
|
|
|
|
response => {LOGIN => $self->user, PASSWORD => $self->pass}, |
421
|
|
|
|
|
|
|
locale => 'en_US', |
422
|
|
|
|
|
|
|
), |
423
|
|
|
|
|
|
|
); |
424
|
|
|
|
|
|
|
|
425
|
0
|
|
|
|
|
0
|
$self->_tune($id, $cb); |
426
|
|
|
|
|
|
|
}, |
427
|
|
|
|
|
|
|
sub { |
428
|
0
|
|
|
0
|
|
0
|
$self->emit(error => 'Unable to start connection: ' . shift); |
429
|
|
|
|
|
|
|
} |
430
|
0
|
|
|
|
|
0
|
); |
431
|
|
|
|
|
|
|
} |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
sub _tune { |
434
|
0
|
|
|
0
|
|
0
|
my ($self, $id, $cb) = @_; |
435
|
|
|
|
|
|
|
|
436
|
0
|
|
|
|
|
0
|
weaken $self; |
437
|
|
|
|
|
|
|
$self->_expect( |
438
|
|
|
|
|
|
|
'Connection::Tune' => sub { |
439
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
440
|
|
|
|
|
|
|
|
441
|
0
|
|
|
|
|
0
|
my $method_frame = $frame->method_frame; |
442
|
0
|
|
|
|
|
0
|
$self->max_buffer_size($method_frame->frame_max); |
443
|
|
|
|
|
|
|
|
444
|
0
|
|
0
|
|
|
0
|
my $heartbeat = $self->heartbeat_timeout || $method_frame->heartbeat; |
445
|
|
|
|
|
|
|
|
446
|
0
|
|
|
|
|
0
|
warn "-- Connection::Tune {frame_max: " . $method_frame->frame_max . ", heartbeat: " . $method_frame->heartbeat . "}\n" if DEBUG; |
447
|
|
|
|
|
|
|
# Confirm |
448
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
449
|
|
|
|
|
|
|
Net::AMQP::Protocol::Connection::TuneOk->new( |
450
|
|
|
|
|
|
|
channel_max => $method_frame->channel_max, |
451
|
|
|
|
|
|
|
frame_max => $method_frame->frame_max, |
452
|
|
|
|
|
|
|
heartbeat => $heartbeat, |
453
|
|
|
|
|
|
|
), |
454
|
|
|
|
|
|
|
); |
455
|
|
|
|
|
|
|
|
456
|
|
|
|
|
|
|
# According to https://www.rabbitmq.com/amqp-0-9-1-errata.html |
457
|
|
|
|
|
|
|
# The client should start sending heartbeats after receiving a Connection.Tune |
458
|
|
|
|
|
|
|
# method, and start monitoring heartbeats after sending Connection.Open. |
459
|
|
|
|
|
|
|
# -and- |
460
|
|
|
|
|
|
|
# Heartbeat frames are sent about every timeout / 2 seconds. After two missed |
461
|
|
|
|
|
|
|
# heartbeats, the peer is considered to be unreachable. |
462
|
|
|
|
|
|
|
$self->{heartbeat_tid} = $self->_loop->recurring( |
463
|
|
|
|
|
|
|
$heartbeat / 2 => sub { |
464
|
0
|
0
|
|
|
|
0
|
return unless time() - $self->heartbeat_sent > $heartbeat / 2; |
465
|
0
|
|
|
|
|
0
|
$self->_write_frame(Net::AMQP::Frame::Heartbeat->new()); |
466
|
0
|
|
|
|
|
0
|
$self->heartbeat_sent(time()); |
467
|
|
|
|
|
|
|
} |
468
|
0
|
0
|
|
|
|
0
|
) if $heartbeat; |
469
|
|
|
|
|
|
|
|
470
|
|
|
|
|
|
|
$self->_write_expect( |
471
|
|
|
|
|
|
|
'Connection::Open' => |
472
|
|
|
|
|
|
|
{virtual_host => $self->vhost, capabilities => '', insist => 1,}, |
473
|
|
|
|
|
|
|
'Connection::OpenOk' => sub { |
474
|
0
|
|
|
|
|
0
|
warn "-- Connection::OpenOk\n" if DEBUG; |
475
|
|
|
|
|
|
|
|
476
|
0
|
|
|
|
|
0
|
$self->is_open(1); |
477
|
0
|
|
|
|
|
0
|
$self->emit('open'); |
478
|
0
|
0
|
|
|
|
0
|
$cb->() if defined $cb; |
479
|
|
|
|
|
|
|
}, |
480
|
|
|
|
|
|
|
sub { |
481
|
0
|
|
|
|
|
0
|
my $err = shift; |
482
|
0
|
|
|
|
|
0
|
$self->emit(error => 'Unable to open connection: ' . $err); |
483
|
0
|
0
|
|
|
|
0
|
$cb->($err) if defined $cb; |
484
|
|
|
|
|
|
|
} |
485
|
0
|
|
|
|
|
0
|
); |
486
|
|
|
|
|
|
|
}, |
487
|
|
|
|
|
|
|
sub { |
488
|
0
|
|
|
0
|
|
0
|
$self->emit(error => 'Unable to tune connection: ' . shift); |
489
|
|
|
|
|
|
|
} |
490
|
0
|
|
|
|
|
0
|
); |
491
|
|
|
|
|
|
|
} |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
sub _write_expect { |
494
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
495
|
0
|
|
|
|
|
0
|
my ($method, $args, $exp, $cb, $failure_cb, $channel_id) = @_; |
496
|
0
|
|
|
|
|
0
|
$method = 'Net::AMQP::Protocol::' . $method; |
497
|
|
|
|
|
|
|
|
498
|
0
|
|
0
|
|
|
0
|
$channel_id ||= 0; |
499
|
|
|
|
|
|
|
|
500
|
0
|
|
|
|
|
0
|
my $method_frame = Net::AMQP::Frame::Method->new( |
501
|
|
|
|
|
|
|
method_frame => $method->new(%$args) |
502
|
|
|
|
|
|
|
); |
503
|
|
|
|
|
|
|
|
504
|
0
|
|
|
|
|
0
|
$self->_write_frame( |
505
|
|
|
|
|
|
|
$method_frame, |
506
|
|
|
|
|
|
|
$channel_id |
507
|
|
|
|
|
|
|
); |
508
|
|
|
|
|
|
|
|
509
|
0
|
|
|
|
|
0
|
return $self->_expect($exp, $cb, $failure_cb, $channel_id); |
510
|
|
|
|
|
|
|
} |
511
|
|
|
|
|
|
|
|
512
|
|
|
|
|
|
|
sub _expect { |
513
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
514
|
0
|
|
|
|
|
0
|
my ($exp, $cb, $failure_cb, $channel_id) = @_; |
515
|
0
|
0
|
|
|
|
0
|
my @expected = ref($exp) eq 'ARRAY' ? @$exp : ($exp); |
516
|
|
|
|
|
|
|
|
517
|
0
|
|
0
|
|
|
0
|
$channel_id ||= 0; |
518
|
|
|
|
|
|
|
|
519
|
0
|
|
|
|
|
0
|
my $queue; |
520
|
0
|
0
|
|
|
|
0
|
if (!$channel_id) { |
521
|
0
|
|
|
|
|
0
|
$queue = $self->queue; |
522
|
|
|
|
|
|
|
} |
523
|
|
|
|
|
|
|
else { |
524
|
0
|
|
|
|
|
0
|
my $channel = $self->channels->{$channel_id}; |
525
|
0
|
0
|
|
|
|
0
|
if (defined $channel) { |
526
|
0
|
|
|
|
|
0
|
$queue = $channel->queue; |
527
|
|
|
|
|
|
|
} |
528
|
|
|
|
|
|
|
else { |
529
|
0
|
|
0
|
|
|
0
|
$failure_cb->( |
530
|
|
|
|
|
|
|
"Unknown channel id received: " . ($channel_id // '(undef)')); |
531
|
|
|
|
|
|
|
} |
532
|
|
|
|
|
|
|
} |
533
|
|
|
|
|
|
|
|
534
|
0
|
0
|
|
|
|
0
|
return unless $queue; |
535
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
$queue->get( |
537
|
|
|
|
|
|
|
sub { |
538
|
0
|
|
|
0
|
|
0
|
my $frame = shift; |
539
|
|
|
|
|
|
|
|
540
|
0
|
0
|
|
|
|
0
|
return $failure_cb->("Received data is not method frame") |
541
|
|
|
|
|
|
|
if not $frame->isa("Net::AMQP::Frame::Method"); |
542
|
|
|
|
|
|
|
|
543
|
0
|
|
|
|
|
0
|
my $method_frame = $frame->method_frame; |
544
|
0
|
|
|
|
|
0
|
for my $exp (@expected) { |
545
|
0
|
0
|
|
|
|
0
|
return $cb->($frame) |
546
|
|
|
|
|
|
|
if $method_frame->isa("Net::AMQP::Protocol::" . $exp); |
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
|
549
|
0
|
|
|
|
|
0
|
$failure_cb->("Method is not " |
550
|
|
|
|
|
|
|
. join(', ', @expected) |
551
|
|
|
|
|
|
|
. ". It's " |
552
|
|
|
|
|
|
|
. ref($method_frame)); |
553
|
|
|
|
|
|
|
} |
554
|
0
|
|
|
|
|
0
|
); |
555
|
|
|
|
|
|
|
} |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
sub _write_frame { |
558
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
559
|
0
|
|
|
|
|
0
|
my $id = $self->stream_id; |
560
|
0
|
|
|
|
|
0
|
my ($out, $channel, $cb) = @_; |
561
|
|
|
|
|
|
|
|
562
|
0
|
0
|
|
|
|
0
|
if ($out->isa('Net::AMQP::Protocol::Base')) { |
563
|
0
|
|
|
|
|
0
|
$out = $out->frame_wrap; |
564
|
|
|
|
|
|
|
} |
565
|
0
|
|
0
|
|
|
0
|
$out->channel($channel // 0); |
566
|
|
|
|
|
|
|
|
567
|
0
|
|
|
|
|
0
|
return $self->_write($id, $out->to_raw_frame, $cb); |
568
|
|
|
|
|
|
|
} |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
sub _write { |
571
|
0
|
|
|
0
|
|
0
|
my $self = shift @_; |
572
|
0
|
|
|
|
|
0
|
my $id = shift @_; |
573
|
0
|
|
|
|
|
0
|
my $frame = shift @_; |
574
|
0
|
|
|
|
|
0
|
my $cb = shift @_; |
575
|
|
|
|
|
|
|
|
576
|
0
|
|
|
|
|
0
|
warn "-> @{[dumper $frame]}" if DEBUG; |
577
|
|
|
|
|
|
|
|
578
|
0
|
|
|
|
|
0
|
utf8::downgrade($frame); |
579
|
0
|
0
|
|
|
|
0
|
$self->_loop->stream($id)->write($frame => $cb) |
580
|
|
|
|
|
|
|
if defined $self->_loop->stream($id); |
581
|
|
|
|
|
|
|
} |
582
|
|
|
|
|
|
|
|
583
|
|
|
|
|
|
|
sub DESTROY { |
584
|
20
|
|
|
20
|
|
51952
|
my $self = shift; |
585
|
20
|
50
|
|
|
|
67
|
my $ioloop = $self->ioloop or return; |
586
|
20
|
|
|
|
|
134
|
my $heartbeat_tid = $self->{heartbeat_tid}; |
587
|
|
|
|
|
|
|
|
588
|
20
|
50
|
|
|
|
294
|
$ioloop->remove($heartbeat_tid) if $heartbeat_tid; |
589
|
|
|
|
|
|
|
} |
590
|
|
|
|
|
|
|
|
591
|
|
|
|
|
|
|
1; |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
=encoding utf8 |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
=head1 NAME |
596
|
|
|
|
|
|
|
|
597
|
|
|
|
|
|
|
Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=head1 SYNOPSIS |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
602
|
|
|
|
|
|
|
|
603
|
|
|
|
|
|
|
# Supply URL according to (https://www.rabbitmq.com/uri-spec.html) |
604
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->new( |
605
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/'); |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
# Catch all client related errors |
608
|
|
|
|
|
|
|
$client->catch(sub { warn "Some error caught in client"; }); |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
# When connection is in Open state, open new channel |
611
|
|
|
|
|
|
|
$client->on( |
612
|
|
|
|
|
|
|
open => sub { |
613
|
|
|
|
|
|
|
my ($client) = @_; |
614
|
|
|
|
|
|
|
|
615
|
|
|
|
|
|
|
# Create a new channel with auto-assigned id |
616
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
617
|
|
|
|
|
|
|
|
618
|
|
|
|
|
|
|
$channel->catch(sub { warn "Error on channel received"; }); |
619
|
|
|
|
|
|
|
|
620
|
|
|
|
|
|
|
$channel->on( |
621
|
|
|
|
|
|
|
open => sub { |
622
|
|
|
|
|
|
|
my ($channel) = @_; |
623
|
|
|
|
|
|
|
$channel->qos(prefetch_count => 1)->deliver; |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
# Publish some example message to test_queue |
626
|
|
|
|
|
|
|
my $publish = $channel->publish( |
627
|
|
|
|
|
|
|
exchange => 'test', |
628
|
|
|
|
|
|
|
routing_key => 'test_queue', |
629
|
|
|
|
|
|
|
body => 'Test message', |
630
|
|
|
|
|
|
|
mandatory => 0, |
631
|
|
|
|
|
|
|
immediate => 0, |
632
|
|
|
|
|
|
|
header => {} |
633
|
|
|
|
|
|
|
); |
634
|
|
|
|
|
|
|
# Deliver this message to server |
635
|
|
|
|
|
|
|
$publish->deliver; |
636
|
|
|
|
|
|
|
|
637
|
|
|
|
|
|
|
# Start consuming messages from test_queue |
638
|
|
|
|
|
|
|
my $consumer = $channel->consume(queue => 'test_queue'); |
639
|
|
|
|
|
|
|
$consumer->on(message => sub { say "Got a message" }); |
640
|
|
|
|
|
|
|
$consumer->deliver; |
641
|
|
|
|
|
|
|
} |
642
|
|
|
|
|
|
|
); |
643
|
|
|
|
|
|
|
$channel->on(close => sub { $log->error('Channel closed') }); |
644
|
|
|
|
|
|
|
|
645
|
|
|
|
|
|
|
$client->open_channel($channel); |
646
|
|
|
|
|
|
|
} |
647
|
|
|
|
|
|
|
); |
648
|
|
|
|
|
|
|
|
649
|
|
|
|
|
|
|
# Start connection |
650
|
|
|
|
|
|
|
$client->connect(); |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
# Start Mojo::IOLoop if not running already |
653
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
654
|
|
|
|
|
|
|
|
655
|
|
|
|
|
|
|
=head2 CONSUMER |
656
|
|
|
|
|
|
|
|
657
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
658
|
|
|
|
|
|
|
my $consumer = Mojo::RabbitMQ::Client->consumer( |
659
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo', |
660
|
|
|
|
|
|
|
defaults => { |
661
|
|
|
|
|
|
|
qos => {prefetch_count => 1}, |
662
|
|
|
|
|
|
|
queue => {durable => 1}, |
663
|
|
|
|
|
|
|
consumer => {no_ack => 0}, |
664
|
|
|
|
|
|
|
} |
665
|
|
|
|
|
|
|
); |
666
|
|
|
|
|
|
|
|
667
|
|
|
|
|
|
|
$consumer->catch(sub { die "Some error caught in Consumer" } ); |
668
|
|
|
|
|
|
|
$consumer->on('success' => sub { say "Consumer ready" }); |
669
|
|
|
|
|
|
|
$consumer->on( |
670
|
|
|
|
|
|
|
'message' => sub { |
671
|
|
|
|
|
|
|
my ($consumer, $message) = @_; |
672
|
|
|
|
|
|
|
|
673
|
|
|
|
|
|
|
$consumer->channel->ack($message)->deliver; |
674
|
|
|
|
|
|
|
} |
675
|
|
|
|
|
|
|
); |
676
|
|
|
|
|
|
|
$consumer->start(); |
677
|
|
|
|
|
|
|
|
678
|
|
|
|
|
|
|
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
679
|
|
|
|
|
|
|
|
680
|
|
|
|
|
|
|
=head2 PUBLISHER |
681
|
|
|
|
|
|
|
|
682
|
|
|
|
|
|
|
use Mojo::RabbitMQ::Client; |
683
|
|
|
|
|
|
|
my $publisher = Mojo::RabbitMQ::Client->publisher( |
684
|
|
|
|
|
|
|
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo' |
685
|
|
|
|
|
|
|
); |
686
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
$publisher->publish('plain text'); |
688
|
|
|
|
|
|
|
|
689
|
|
|
|
|
|
|
$publisher->publish( |
690
|
|
|
|
|
|
|
{encode => { to => 'json'}}, |
691
|
|
|
|
|
|
|
routing_key => 'mojo_mq' |
692
|
|
|
|
|
|
|
)->then(sub { |
693
|
|
|
|
|
|
|
say "Message published"; |
694
|
|
|
|
|
|
|
})->catch(sub { |
695
|
|
|
|
|
|
|
die "Publishing failed" |
696
|
|
|
|
|
|
|
})->wait; |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
=head1 DESCRIPTION |
699
|
|
|
|
|
|
|
|
700
|
|
|
|
|
|
|
L is a rewrite of L to work on top of L. |
701
|
|
|
|
|
|
|
|
702
|
|
|
|
|
|
|
=head1 EVENTS |
703
|
|
|
|
|
|
|
|
704
|
|
|
|
|
|
|
L inherits all events from L and can emit the |
705
|
|
|
|
|
|
|
following new ones. |
706
|
|
|
|
|
|
|
|
707
|
|
|
|
|
|
|
=head2 connect |
708
|
|
|
|
|
|
|
|
709
|
|
|
|
|
|
|
$client->on(connect => sub { |
710
|
|
|
|
|
|
|
my ($client, $stream) = @_; |
711
|
|
|
|
|
|
|
... |
712
|
|
|
|
|
|
|
}); |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
Emitted when TCP/IP connection with RabbitMQ server is established. |
715
|
|
|
|
|
|
|
|
716
|
|
|
|
|
|
|
=head2 open |
717
|
|
|
|
|
|
|
|
718
|
|
|
|
|
|
|
$client->on(open => sub { |
719
|
|
|
|
|
|
|
my ($client) = @_; |
720
|
|
|
|
|
|
|
... |
721
|
|
|
|
|
|
|
}); |
722
|
|
|
|
|
|
|
|
723
|
|
|
|
|
|
|
Emitted AMQP protocol Connection.Open-Ok method is received. |
724
|
|
|
|
|
|
|
|
725
|
|
|
|
|
|
|
=head2 close |
726
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
$client->on(close => sub { |
728
|
|
|
|
|
|
|
my ($client) = @_; |
729
|
|
|
|
|
|
|
... |
730
|
|
|
|
|
|
|
}); |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
Emitted on reception of Connection.Close-Ok method. |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
=head2 disconnect |
735
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
$client->on(close => sub { |
737
|
|
|
|
|
|
|
my ($client) = @_; |
738
|
|
|
|
|
|
|
... |
739
|
|
|
|
|
|
|
}); |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
Emitted when TCP/IP connection gets disconnected. |
742
|
|
|
|
|
|
|
|
743
|
|
|
|
|
|
|
=head1 ATTRIBUTES |
744
|
|
|
|
|
|
|
|
745
|
|
|
|
|
|
|
L has following attributes. |
746
|
|
|
|
|
|
|
|
747
|
|
|
|
|
|
|
=head2 tls |
748
|
|
|
|
|
|
|
|
749
|
|
|
|
|
|
|
my $tls = $client->tls; |
750
|
|
|
|
|
|
|
$client = $client->tls(1) |
751
|
|
|
|
|
|
|
|
752
|
|
|
|
|
|
|
Force secure connection. Default is disabled (C<0>). |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
=head2 user |
755
|
|
|
|
|
|
|
|
756
|
|
|
|
|
|
|
my $user = $client->user; |
757
|
|
|
|
|
|
|
$client = $client->user('guest') |
758
|
|
|
|
|
|
|
|
759
|
|
|
|
|
|
|
Sets username for authorization, by default it's not defined. |
760
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
=head2 pass |
762
|
|
|
|
|
|
|
|
763
|
|
|
|
|
|
|
my $pass = $client->pass; |
764
|
|
|
|
|
|
|
$client = $client->pass('secret') |
765
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
Sets user password for authorization, by default it's not defined. |
767
|
|
|
|
|
|
|
|
768
|
|
|
|
|
|
|
=head2 host |
769
|
|
|
|
|
|
|
|
770
|
|
|
|
|
|
|
my $host = $client->host; |
771
|
|
|
|
|
|
|
$client = $client->host('localhost') |
772
|
|
|
|
|
|
|
|
773
|
|
|
|
|
|
|
Hostname or IP address of RabbitMQ server. Defaults to C. |
774
|
|
|
|
|
|
|
|
775
|
|
|
|
|
|
|
=head2 port |
776
|
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
my $port = $client->port; |
778
|
|
|
|
|
|
|
$client = $client->port(1234) |
779
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
Port on which RabbitMQ server listens for new connections. |
781
|
|
|
|
|
|
|
Defaults to C<5672>, which is standard RabbitMQ server listen port. |
782
|
|
|
|
|
|
|
|
783
|
|
|
|
|
|
|
=head2 vhost |
784
|
|
|
|
|
|
|
|
785
|
|
|
|
|
|
|
my $vhost = $client->vhost; |
786
|
|
|
|
|
|
|
$client = $client->vhost('/') |
787
|
|
|
|
|
|
|
|
788
|
|
|
|
|
|
|
RabbitMQ virtual server to user. Default is C>. |
789
|
|
|
|
|
|
|
|
790
|
|
|
|
|
|
|
=head2 params |
791
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
my $params = $client->params; |
793
|
|
|
|
|
|
|
$client = $client->params(Mojo::Parameters->new('verify=1')) |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
Sets additional parameters for connection. Default is not defined. |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
For list of supported parameters see L"SUPPORTED QUERY PARAMETERS">. |
798
|
|
|
|
|
|
|
|
799
|
|
|
|
|
|
|
=head2 url |
800
|
|
|
|
|
|
|
|
801
|
|
|
|
|
|
|
my $url = $client->url; |
802
|
|
|
|
|
|
|
$client = $client->url('amqp://...'); |
803
|
|
|
|
|
|
|
|
804
|
|
|
|
|
|
|
Sets all connection parameters in one string, according to specification from |
805
|
|
|
|
|
|
|
L. |
806
|
|
|
|
|
|
|
|
807
|
|
|
|
|
|
|
amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ] |
808
|
|
|
|
|
|
|
|
809
|
|
|
|
|
|
|
amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ] |
810
|
|
|
|
|
|
|
|
811
|
|
|
|
|
|
|
amqp_userinfo = username [ ":" password ] |
812
|
|
|
|
|
|
|
|
813
|
|
|
|
|
|
|
username = *( unreserved / pct-encoded / sub-delims ) |
814
|
|
|
|
|
|
|
|
815
|
|
|
|
|
|
|
password = *( unreserved / pct-encoded / sub-delims ) |
816
|
|
|
|
|
|
|
|
817
|
|
|
|
|
|
|
vhost = segment |
818
|
|
|
|
|
|
|
|
819
|
|
|
|
|
|
|
=head2 heartbeat_timeout |
820
|
|
|
|
|
|
|
|
821
|
|
|
|
|
|
|
my $timeout = $client->heartbeat_timeout; |
822
|
|
|
|
|
|
|
$client = $client->heartbeat_timeout(180); |
823
|
|
|
|
|
|
|
|
824
|
|
|
|
|
|
|
Heartbeats are use to monitor peer reachability in AMQP. |
825
|
|
|
|
|
|
|
Default value is C<60> seconds, if set to C<0> no heartbeats will be sent. |
826
|
|
|
|
|
|
|
|
827
|
|
|
|
|
|
|
=head2 connect_timeout |
828
|
|
|
|
|
|
|
|
829
|
|
|
|
|
|
|
my $timeout = $client->connect_timeout; |
830
|
|
|
|
|
|
|
$client = $client->connect_timeout(5); |
831
|
|
|
|
|
|
|
|
832
|
|
|
|
|
|
|
Connection timeout used by L. |
833
|
|
|
|
|
|
|
Defaults to environment variable C or C<10> seconds |
834
|
|
|
|
|
|
|
if nothing else is set. |
835
|
|
|
|
|
|
|
|
836
|
|
|
|
|
|
|
=head2 max_channels |
837
|
|
|
|
|
|
|
|
838
|
|
|
|
|
|
|
my $max_channels = $client->max_channels; |
839
|
|
|
|
|
|
|
$client = $client->max_channels(10); |
840
|
|
|
|
|
|
|
|
841
|
|
|
|
|
|
|
Maximum number of channels allowed to be active. Defaults to C<0> which |
842
|
|
|
|
|
|
|
means no implicit limit. |
843
|
|
|
|
|
|
|
|
844
|
|
|
|
|
|
|
When you try to call C over limit an C will be |
845
|
|
|
|
|
|
|
emitted on channel saying that: I. |
846
|
|
|
|
|
|
|
|
847
|
|
|
|
|
|
|
=head1 STATIC METHODS |
848
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
=head2 consumer |
850
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->consumer(...) |
852
|
|
|
|
|
|
|
|
853
|
|
|
|
|
|
|
Shortcut for creating L. |
854
|
|
|
|
|
|
|
|
855
|
|
|
|
|
|
|
=head2 publisher |
856
|
|
|
|
|
|
|
|
857
|
|
|
|
|
|
|
my $client = Mojo::RabbitMQ::Client->publisher(...) |
858
|
|
|
|
|
|
|
|
859
|
|
|
|
|
|
|
Shortcut for creating L. |
860
|
|
|
|
|
|
|
|
861
|
|
|
|
|
|
|
=head1 METHODS |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
L inherits all methods from L and implements |
864
|
|
|
|
|
|
|
the following new ones. |
865
|
|
|
|
|
|
|
|
866
|
|
|
|
|
|
|
=head2 connect |
867
|
|
|
|
|
|
|
|
868
|
|
|
|
|
|
|
$client->connect(); |
869
|
|
|
|
|
|
|
|
870
|
|
|
|
|
|
|
Tries to connect to RabbitMQ server and negotiate AMQP protocol. |
871
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
=head2 close |
873
|
|
|
|
|
|
|
|
874
|
|
|
|
|
|
|
$client->close(); |
875
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
=head2 param |
877
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
my $param = $client->param('name'); |
879
|
|
|
|
|
|
|
$client = $client->param(name => 'value'); |
880
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
=head2 add_channel |
882
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
884
|
|
|
|
|
|
|
... |
885
|
|
|
|
|
|
|
$channel = $client->add_channel($channel); |
886
|
|
|
|
|
|
|
$channel->open; |
887
|
|
|
|
|
|
|
|
888
|
|
|
|
|
|
|
=head2 open_channel |
889
|
|
|
|
|
|
|
|
890
|
|
|
|
|
|
|
my $channel = Mojo::RabbitMQ::Client::Channel->new(); |
891
|
|
|
|
|
|
|
... |
892
|
|
|
|
|
|
|
$client->open_channel($channel); |
893
|
|
|
|
|
|
|
|
894
|
|
|
|
|
|
|
=head2 delete_channel |
895
|
|
|
|
|
|
|
|
896
|
|
|
|
|
|
|
my $removed = $client->delete_channel($channel->id); |
897
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
=head1 SUPPORTED QUERY PARAMETERS |
899
|
|
|
|
|
|
|
|
900
|
|
|
|
|
|
|
There's no formal specification, nevertheless a list of common parameters |
901
|
|
|
|
|
|
|
recognized by officially supported RabbitMQ clients is maintained here: |
902
|
|
|
|
|
|
|
L. |
903
|
|
|
|
|
|
|
|
904
|
|
|
|
|
|
|
Some shortcuts are also supported, you'll find them in parenthesis. |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
Aliases are less significant, so when both are specified only primary |
907
|
|
|
|
|
|
|
value will be used. |
908
|
|
|
|
|
|
|
|
909
|
|
|
|
|
|
|
=head2 cacertfile (I) |
910
|
|
|
|
|
|
|
|
911
|
|
|
|
|
|
|
Path to Certificate Authority file for TLS. |
912
|
|
|
|
|
|
|
|
913
|
|
|
|
|
|
|
=head2 certfile (I) |
914
|
|
|
|
|
|
|
|
915
|
|
|
|
|
|
|
Path to the client certificate file for TLS. |
916
|
|
|
|
|
|
|
|
917
|
|
|
|
|
|
|
=head2 keyfile (I) |
918
|
|
|
|
|
|
|
|
919
|
|
|
|
|
|
|
Path to the client certificate private key file for TLS. |
920
|
|
|
|
|
|
|
|
921
|
|
|
|
|
|
|
=head2 fail_if_no_peer_cert (I) |
922
|
|
|
|
|
|
|
|
923
|
|
|
|
|
|
|
TLS verification mode, defaults to 0x01 on the client-side if a certificate |
924
|
|
|
|
|
|
|
authority file has been provided, or 0x00 otherwise. |
925
|
|
|
|
|
|
|
|
926
|
|
|
|
|
|
|
=head2 auth_mechanism |
927
|
|
|
|
|
|
|
|
928
|
|
|
|
|
|
|
Sets the AMQP authentication mechanism. Defaults to AMQPLAIN. AMQPLAIN and |
929
|
|
|
|
|
|
|
EXTERNAL are supported; EXTERNAL will only work if L does not need |
930
|
|
|
|
|
|
|
to do anything beyond passing along a username and password if specified. |
931
|
|
|
|
|
|
|
|
932
|
|
|
|
|
|
|
=head2 heartbeat |
933
|
|
|
|
|
|
|
|
934
|
|
|
|
|
|
|
Sets requested heartbeat timeout, just like C attribute. |
935
|
|
|
|
|
|
|
|
936
|
|
|
|
|
|
|
=head2 connection_timeout (I) |
937
|
|
|
|
|
|
|
|
938
|
|
|
|
|
|
|
Sets connection timeout - see L attribute. |
939
|
|
|
|
|
|
|
|
940
|
|
|
|
|
|
|
=head2 channel_max |
941
|
|
|
|
|
|
|
|
942
|
|
|
|
|
|
|
Sets maximum number of channels - see L attribute. |
943
|
|
|
|
|
|
|
|
944
|
|
|
|
|
|
|
=head1 SEE ALSO |
945
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
L, L, L |
947
|
|
|
|
|
|
|
|
948
|
|
|
|
|
|
|
=head1 COPYRIGHT AND LICENSE |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
Copyright (C) 2015-2017, Sebastian Podjasek and others |
951
|
|
|
|
|
|
|
|
952
|
|
|
|
|
|
|
Based on L - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >> |
953
|
|
|
|
|
|
|
|
954
|
|
|
|
|
|
|
This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0. |
955
|
|
|
|
|
|
|
|
956
|
|
|
|
|
|
|
=cut |