line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package AnyEvent::Stomper; |
2
|
|
|
|
|
|
|
|
3
|
4
|
|
|
4
|
|
57877
|
use 5.008000; |
|
4
|
|
|
|
|
9
|
|
4
|
4
|
|
|
4
|
|
15
|
use strict; |
|
4
|
|
|
|
|
4
|
|
|
4
|
|
|
|
|
70
|
|
5
|
4
|
|
|
4
|
|
11
|
use warnings; |
|
4
|
|
|
|
|
11
|
|
|
4
|
|
|
|
|
95
|
|
6
|
4
|
|
|
4
|
|
12
|
use base qw( Exporter ); |
|
4
|
|
|
|
|
3
|
|
|
4
|
|
|
|
|
364
|
|
7
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
our $VERSION = '0.34'; |
9
|
|
|
|
|
|
|
|
10
|
4
|
|
|
4
|
|
1165
|
use AnyEvent::Stomper::Frame; |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
125
|
|
11
|
4
|
|
|
4
|
|
1203
|
use AnyEvent::Stomper::Error; |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
83
|
|
12
|
|
|
|
|
|
|
|
13
|
4
|
|
|
4
|
|
3081
|
use AnyEvent; |
|
4
|
|
|
|
|
15862
|
|
|
4
|
|
|
|
|
107
|
|
14
|
4
|
|
|
4
|
|
2271
|
use AnyEvent::Handle; |
|
4
|
|
|
|
|
55600
|
|
|
4
|
|
|
|
|
138
|
|
15
|
4
|
|
|
4
|
|
24
|
use Scalar::Util qw( looks_like_number weaken ); |
|
4
|
|
|
|
|
8
|
|
|
4
|
|
|
|
|
283
|
|
16
|
4
|
|
|
4
|
|
16
|
use List::Util qw( max ); |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
346
|
|
17
|
4
|
|
|
4
|
|
1811
|
use List::MoreUtils qw( bsearch_index ); |
|
4
|
|
|
|
|
25197
|
|
|
4
|
|
|
|
|
32
|
|
18
|
4
|
|
|
4
|
|
1825
|
use Carp qw( croak ); |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
292
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
my %ERROR_CODES; |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
BEGIN { |
23
|
4
|
|
|
4
|
|
26
|
%ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES; |
24
|
4
|
|
|
|
|
14
|
our @EXPORT_OK = keys %ERROR_CODES; |
25
|
4
|
|
|
|
|
208
|
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK ); |
26
|
|
|
|
|
|
|
} |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
use constant { |
29
|
|
|
|
|
|
|
# Default values |
30
|
4
|
|
|
|
|
1961
|
D_HOST => 'localhost', |
31
|
|
|
|
|
|
|
D_PORT => 61613, |
32
|
|
|
|
|
|
|
D_HEARTBEAT => [ 0, 0 ], |
33
|
|
|
|
|
|
|
|
34
|
|
|
|
|
|
|
%ERROR_CODES, |
35
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
# Operation status |
37
|
|
|
|
|
|
|
S_NEED_DO => 1, |
38
|
|
|
|
|
|
|
S_IN_PROGRESS => 2, |
39
|
|
|
|
|
|
|
S_DONE => 3, |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
EOL => "\n", |
42
|
|
|
|
|
|
|
RE_EOL => qr/\r?\n/, |
43
|
4
|
|
|
4
|
|
18
|
}; |
|
4
|
|
|
|
|
4
|
|
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
my %SUBUNSUB_CMDS = ( |
46
|
|
|
|
|
|
|
SUBSCRIBE => 1, |
47
|
|
|
|
|
|
|
UNSUBSCRIBE => 1, |
48
|
|
|
|
|
|
|
); |
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
my %ACK_CMDS = ( |
51
|
|
|
|
|
|
|
ACK => 1, |
52
|
|
|
|
|
|
|
NACK => 1, |
53
|
|
|
|
|
|
|
); |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
my %NEED_RECEIPT = ( |
56
|
|
|
|
|
|
|
CONNECT => 1, |
57
|
|
|
|
|
|
|
DISCONNECT => 1, |
58
|
|
|
|
|
|
|
%SUBUNSUB_CMDS, |
59
|
|
|
|
|
|
|
); |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
my %ESCAPE_MAP = ( |
62
|
|
|
|
|
|
|
"\r" => "\\r", |
63
|
|
|
|
|
|
|
"\n" => "\\n", |
64
|
|
|
|
|
|
|
':' => "\\c", |
65
|
|
|
|
|
|
|
"\\" => "\\\\", |
66
|
|
|
|
|
|
|
); |
67
|
|
|
|
|
|
|
my %UNESCAPE_MAP = reverse %ESCAPE_MAP; |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
my $RECEIPT_SEQ = 1; |
70
|
|
|
|
|
|
|
my $MESSAGE_SEQ = 1; |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
sub new { |
74
|
15
|
|
|
15
|
1
|
2598
|
my $class = shift; |
75
|
15
|
|
|
|
|
29
|
my %params = @_; |
76
|
|
|
|
|
|
|
|
77
|
15
|
|
|
|
|
25
|
my $self = bless {}, $class; |
78
|
|
|
|
|
|
|
|
79
|
15
|
|
100
|
|
|
77
|
$self->{host} = $params{host} || D_HOST; |
80
|
15
|
|
100
|
|
|
81
|
$self->{port} = $params{port} || D_PORT; |
81
|
15
|
|
|
|
|
16
|
$self->{login} = $params{login}; |
82
|
15
|
|
|
|
|
17
|
$self->{passcode} = $params{passcode}; |
83
|
15
|
|
|
|
|
14
|
$self->{vhost} = $params{vhost}; |
84
|
15
|
|
|
|
|
15
|
$self->{lazy} = $params{lazy}; |
85
|
15
|
|
50
|
|
|
48
|
$self->{handle_params} = $params{handle_params} || {}; |
86
|
15
|
|
50
|
|
|
57
|
$self->{default_headers} = $params{default_headers} || {}; |
87
|
15
|
|
|
|
|
20
|
$self->{on_connect} = $params{on_connect}; |
88
|
15
|
|
|
|
|
18
|
$self->{on_disconnect} = $params{on_disconnect}; |
89
|
|
|
|
|
|
|
|
90
|
15
|
100
|
|
|
|
23
|
if ( defined $params{heartbeat} ) { |
91
|
2
|
100
|
|
|
|
6
|
unless ( ref( $params{heartbeat} ) eq 'ARRAY' ) { |
92
|
1
|
|
|
|
|
188
|
croak q{"heartbeat" must be specified as array reference}; |
93
|
|
|
|
|
|
|
} |
94
|
|
|
|
|
|
|
|
95
|
1
|
|
|
|
|
2
|
foreach my $val ( @{ $params{heartbeat} } ) { |
|
1
|
|
|
|
|
3
|
|
96
|
1
|
50
|
|
|
|
5
|
if ( $val =~ /\D/ ) { |
97
|
1
|
|
|
|
|
98
|
croak q{"heartbeat" values must be an integer numbers}; |
98
|
|
|
|
|
|
|
} |
99
|
|
|
|
|
|
|
} |
100
|
|
|
|
|
|
|
|
101
|
0
|
|
|
|
|
0
|
$self->{heartbeat} = $params{heartbeat}; |
102
|
|
|
|
|
|
|
} |
103
|
|
|
|
|
|
|
else { |
104
|
13
|
|
|
|
|
16
|
$self->{heartbeat} = D_HEARTBEAT; |
105
|
|
|
|
|
|
|
} |
106
|
|
|
|
|
|
|
|
107
|
13
|
50
|
|
|
|
26
|
if ( defined $params{command_headers} ) { |
108
|
0
|
0
|
|
|
|
0
|
unless ( ref( $params{command_headers} ) eq 'HASH' ) { |
109
|
0
|
|
|
|
|
0
|
croak q{"command_headers" must be specified as hash reference}; |
110
|
|
|
|
|
|
|
} |
111
|
|
|
|
|
|
|
|
112
|
0
|
|
|
|
|
0
|
my %command_headers; |
113
|
0
|
|
|
|
|
0
|
while ( my ( $cmd_name, $headers ) = each %{ $params{command_headers} } ) { |
|
0
|
|
|
|
|
0
|
|
114
|
0
|
|
|
|
|
0
|
$command_headers{ uc($cmd_name) } = $headers; |
115
|
|
|
|
|
|
|
} |
116
|
0
|
|
|
|
|
0
|
$self->{command_headers} = \%command_headers; |
117
|
|
|
|
|
|
|
} |
118
|
|
|
|
|
|
|
|
119
|
13
|
|
|
|
|
36
|
$self->connection_timeout( $params{connection_timeout} ); |
120
|
11
|
|
|
|
|
36
|
$self->reconnect_interval( $params{reconnect_interval} ); |
121
|
9
|
|
|
|
|
24
|
$self->on_error( $params{on_error} ); |
122
|
|
|
|
|
|
|
|
123
|
9
|
|
|
|
|
20
|
$self->_reset_internals; |
124
|
9
|
|
|
|
|
12
|
$self->{_input_queue} = []; |
125
|
9
|
|
|
|
|
12
|
$self->{_temp_input_queue} = []; |
126
|
9
|
|
|
|
|
10
|
$self->{_write_queue} = []; |
127
|
9
|
|
|
|
|
10
|
$self->{_temp_write_queue} = []; |
128
|
9
|
|
|
|
|
34
|
$self->{_pending_receipts} = {}; |
129
|
9
|
|
|
|
|
9
|
$self->{_subs} = {}; |
130
|
|
|
|
|
|
|
|
131
|
9
|
100
|
|
|
|
20
|
unless ( $self->{lazy} ) { |
132
|
5
|
|
|
|
|
10
|
$self->_connect; |
133
|
|
|
|
|
|
|
} |
134
|
|
|
|
|
|
|
|
135
|
9
|
|
|
|
|
29
|
return $self; |
136
|
|
|
|
|
|
|
} |
137
|
|
|
|
|
|
|
|
138
|
|
|
|
|
|
|
sub execute { |
139
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
140
|
0
|
|
|
|
|
0
|
my $cmd_name = shift; |
141
|
|
|
|
|
|
|
|
142
|
0
|
|
|
|
|
0
|
my $cmd = $self->_prepare( $cmd_name, [@_] ); |
143
|
0
|
|
|
|
|
0
|
$self->_execute($cmd); |
144
|
|
|
|
|
|
|
|
145
|
0
|
|
|
|
|
0
|
return; |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
# Generate methods |
149
|
|
|
|
|
|
|
{ |
150
|
4
|
|
|
4
|
|
17
|
no strict qw( refs ); |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
597
|
|
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit |
153
|
|
|
|
|
|
|
abort disconnect ) ) |
154
|
|
|
|
|
|
|
{ |
155
|
|
|
|
|
|
|
*{$name} = sub { |
156
|
2
|
|
|
2
|
|
86
|
my $self = shift; |
157
|
|
|
|
|
|
|
|
158
|
2
|
|
|
|
|
8
|
my $cmd = $self->_prepare( $name, [@_] ); |
159
|
2
|
|
|
|
|
5
|
$self->_execute($cmd); |
160
|
|
|
|
|
|
|
|
161
|
0
|
|
|
|
|
0
|
return; |
162
|
|
|
|
|
|
|
} |
163
|
|
|
|
|
|
|
} |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
|
166
|
|
|
|
|
|
|
sub on_error { |
167
|
14
|
|
|
14
|
1
|
343
|
my $self = shift; |
168
|
|
|
|
|
|
|
|
169
|
14
|
100
|
|
|
|
33
|
if (@_) { |
170
|
11
|
|
|
|
|
9
|
my $on_error = shift; |
171
|
|
|
|
|
|
|
|
172
|
11
|
100
|
|
|
|
23
|
if ( defined $on_error ) { |
173
|
5
|
|
|
|
|
6
|
$self->{on_error} = $on_error; |
174
|
|
|
|
|
|
|
} |
175
|
|
|
|
|
|
|
else { |
176
|
|
|
|
|
|
|
$self->{on_error} = sub { |
177
|
1
|
|
|
1
|
|
1
|
my $err = shift; |
178
|
1
|
|
|
|
|
4
|
warn $err->message . "\n"; |
179
|
6
|
|
|
|
|
26
|
}; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
14
|
|
|
|
|
21
|
return $self->{on_error}; |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
# Generate accessors |
187
|
|
|
|
|
|
|
{ |
188
|
4
|
|
|
4
|
|
17
|
no strict qw( refs ); |
|
4
|
|
|
|
|
3
|
|
|
4
|
|
|
|
|
12040
|
|
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
foreach my $name ( qw( host port ) ) { |
191
|
|
|
|
|
|
|
*{$name} = sub { |
192
|
2
|
|
|
2
|
|
1733
|
my $self = shift; |
193
|
2
|
|
|
|
|
7
|
return $self->{$name}; |
194
|
|
|
|
|
|
|
} |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
foreach my $name ( qw( connection_timeout reconnect_interval ) ) { |
198
|
|
|
|
|
|
|
*{$name} = sub { |
199
|
38
|
|
|
38
|
|
963
|
my $self = shift; |
200
|
|
|
|
|
|
|
|
201
|
38
|
100
|
|
|
|
66
|
if (@_) { |
202
|
32
|
|
|
|
|
34
|
my $seconds = shift; |
203
|
|
|
|
|
|
|
|
204
|
32
|
100
|
100
|
|
|
109
|
if ( defined $seconds |
|
|
|
66
|
|
|
|
|
205
|
|
|
|
|
|
|
&& ( !looks_like_number($seconds) || $seconds < 0 ) ) |
206
|
|
|
|
|
|
|
{ |
207
|
8
|
|
|
|
|
775
|
croak qq{"$name" must be a positive number}; |
208
|
|
|
|
|
|
|
} |
209
|
24
|
|
|
|
|
31
|
$self->{$name} = $seconds; |
210
|
|
|
|
|
|
|
} |
211
|
|
|
|
|
|
|
|
212
|
30
|
|
|
|
|
80
|
return $self->{$name}; |
213
|
|
|
|
|
|
|
}; |
214
|
|
|
|
|
|
|
} |
215
|
|
|
|
|
|
|
|
216
|
|
|
|
|
|
|
foreach my $name ( qw( on_connect on_disconnect ) ) { |
217
|
|
|
|
|
|
|
*{$name} = sub { |
218
|
10
|
|
|
10
|
|
514
|
my $self = shift; |
219
|
|
|
|
|
|
|
|
220
|
10
|
100
|
|
|
|
19
|
if (@_) { |
221
|
4
|
|
|
|
|
5
|
$self->{$name} = shift; |
222
|
|
|
|
|
|
|
} |
223
|
|
|
|
|
|
|
|
224
|
10
|
|
|
|
|
23
|
return $self->{$name}; |
225
|
|
|
|
|
|
|
}; |
226
|
|
|
|
|
|
|
} |
227
|
|
|
|
|
|
|
} |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
sub force_disconnect { |
230
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
231
|
|
|
|
|
|
|
|
232
|
0
|
|
|
|
|
0
|
$self->_disconnect(); |
233
|
|
|
|
|
|
|
|
234
|
0
|
|
|
|
|
0
|
return; |
235
|
|
|
|
|
|
|
} |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
sub _connect { |
238
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
239
|
|
|
|
|
|
|
|
240
|
|
|
|
|
|
|
$self->{_handle} = AnyEvent::Handle->new( |
241
|
5
|
|
|
|
|
23
|
%{ $self->{handle_params} }, |
242
|
5
|
|
|
|
|
6
|
connect => [ $self->{host}, $self->{port} ], |
243
|
|
|
|
|
|
|
on_prepare => $self->_create_on_prepare, |
244
|
|
|
|
|
|
|
on_connect => $self->_create_on_connect, |
245
|
|
|
|
|
|
|
on_connect_error => $self->_create_on_connect_error, |
246
|
|
|
|
|
|
|
on_wtimeout => $self->_create_on_wtimeout, |
247
|
|
|
|
|
|
|
on_rtimeout => $self->_create_on_rtimeout, |
248
|
|
|
|
|
|
|
on_eof => $self->_create_on_eof, |
249
|
|
|
|
|
|
|
on_error => $self->_create_on_handle_error, |
250
|
|
|
|
|
|
|
on_drain => $self->_create_on_drain, |
251
|
|
|
|
|
|
|
on_read => $self->_create_on_read, |
252
|
|
|
|
|
|
|
); |
253
|
|
|
|
|
|
|
|
254
|
5
|
|
|
|
|
601
|
return; |
255
|
|
|
|
|
|
|
} |
256
|
|
|
|
|
|
|
|
257
|
|
|
|
|
|
|
sub _create_on_prepare { |
258
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
259
|
|
|
|
|
|
|
|
260
|
5
|
|
|
|
|
15
|
weaken($self); |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
return sub { |
263
|
5
|
100
|
|
5
|
|
29715
|
if ( defined $self->{connection_timeout} ) { |
264
|
1
|
|
|
|
|
3
|
return $self->{connection_timeout}; |
265
|
|
|
|
|
|
|
} |
266
|
|
|
|
|
|
|
|
267
|
4
|
|
|
|
|
11
|
return; |
268
|
5
|
|
|
|
|
18
|
}; |
269
|
|
|
|
|
|
|
} |
270
|
|
|
|
|
|
|
|
271
|
|
|
|
|
|
|
sub _create_on_connect { |
272
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
273
|
|
|
|
|
|
|
|
274
|
5
|
|
|
|
|
8
|
weaken($self); |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
return sub { |
277
|
0
|
|
|
0
|
|
0
|
$self->{_connected} = 1; |
278
|
0
|
|
|
|
|
0
|
$self->_login; |
279
|
|
|
|
|
|
|
|
280
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{on_connect} ) { |
281
|
0
|
|
|
|
|
0
|
$self->{on_connect}->(); |
282
|
|
|
|
|
|
|
} |
283
|
5
|
|
|
|
|
13
|
}; |
284
|
|
|
|
|
|
|
} |
285
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
sub _create_on_connect_error { |
287
|
5
|
|
|
5
|
|
5
|
my $self = shift; |
288
|
|
|
|
|
|
|
|
289
|
5
|
|
|
|
|
7
|
weaken($self); |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
return sub { |
292
|
0
|
|
|
0
|
|
0
|
my $err_msg = pop; |
293
|
|
|
|
|
|
|
|
294
|
0
|
|
|
|
|
0
|
my $err = _new_error( |
295
|
|
|
|
|
|
|
"Can't connect to $self->{host}:$self->{port}: $err_msg", |
296
|
|
|
|
|
|
|
E_CANT_CONN |
297
|
|
|
|
|
|
|
); |
298
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
299
|
5
|
|
|
|
|
16
|
}; |
300
|
|
|
|
|
|
|
} |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
sub _create_on_wtimeout { |
303
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
304
|
|
|
|
|
|
|
|
305
|
5
|
|
|
|
|
7
|
weaken($self); |
306
|
|
|
|
|
|
|
|
307
|
|
|
|
|
|
|
return sub { |
308
|
0
|
|
|
0
|
|
0
|
$self->{_handle}->push_write(EOL); |
309
|
5
|
|
|
|
|
11
|
}; |
310
|
|
|
|
|
|
|
} |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
sub _create_on_rtimeout { |
313
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
314
|
|
|
|
|
|
|
|
315
|
5
|
|
|
|
|
9
|
weaken($self); |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
return sub { |
318
|
0
|
|
|
0
|
|
0
|
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT ); |
319
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
320
|
5
|
|
|
|
|
13
|
}; |
321
|
|
|
|
|
|
|
} |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
sub _create_on_eof { |
324
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
325
|
|
|
|
|
|
|
|
326
|
5
|
|
|
|
|
10
|
weaken($self); |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
return sub { |
329
|
0
|
|
|
0
|
|
0
|
my $err = _new_error( 'Connection closed by remote host.', |
330
|
|
|
|
|
|
|
E_CONN_CLOSED_BY_REMOTE_HOST ); |
331
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
332
|
5
|
|
|
|
|
14
|
}; |
333
|
|
|
|
|
|
|
} |
334
|
|
|
|
|
|
|
|
335
|
|
|
|
|
|
|
sub _create_on_handle_error { |
336
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
337
|
|
|
|
|
|
|
|
338
|
5
|
|
|
|
|
6
|
weaken($self); |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
return sub { |
341
|
0
|
|
|
0
|
|
0
|
my $err_msg = pop; |
342
|
|
|
|
|
|
|
|
343
|
0
|
|
|
|
|
0
|
my $err = _new_error( $err_msg, E_IO ); |
344
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
345
|
5
|
|
|
|
|
15
|
}; |
346
|
|
|
|
|
|
|
} |
347
|
|
|
|
|
|
|
|
348
|
|
|
|
|
|
|
sub _create_on_drain { |
349
|
5
|
|
|
5
|
|
6
|
my $self = shift; |
350
|
|
|
|
|
|
|
|
351
|
5
|
|
|
|
|
8
|
weaken($self); |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
return sub { |
354
|
0
|
0
|
|
0
|
|
0
|
return unless @{ $self->{_write_queue} }; |
|
0
|
|
|
|
|
0
|
|
355
|
|
|
|
|
|
|
|
356
|
0
|
|
|
|
|
0
|
$self->{_temp_write_queue} = $self->{_write_queue}; |
357
|
0
|
|
|
|
|
0
|
$self->{_write_queue} = []; |
358
|
|
|
|
|
|
|
|
359
|
0
|
|
|
|
|
0
|
while ( my $cmd = shift @{ $self->{_temp_write_queue} } ) { |
|
0
|
|
|
|
|
0
|
|
360
|
0
|
|
|
|
|
0
|
$cmd->{on_receipt}->(); |
361
|
|
|
|
|
|
|
} |
362
|
5
|
|
|
|
|
34
|
}; |
363
|
|
|
|
|
|
|
} |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
sub _create_on_read { |
366
|
5
|
|
|
5
|
|
5
|
my $self = shift; |
367
|
|
|
|
|
|
|
|
368
|
5
|
|
|
|
|
7
|
weaken($self); |
369
|
|
|
|
|
|
|
|
370
|
5
|
|
|
|
|
4
|
my $cmd_name; |
371
|
|
|
|
|
|
|
my $headers; |
372
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
return sub { |
374
|
0
|
|
|
0
|
|
0
|
my $handle = shift; |
375
|
|
|
|
|
|
|
|
376
|
0
|
|
|
|
|
0
|
my $frame; |
377
|
|
|
|
|
|
|
|
378
|
0
|
|
|
|
|
0
|
while (1) { |
379
|
0
|
0
|
|
|
|
0
|
return if $handle->destroyed; |
380
|
|
|
|
|
|
|
|
381
|
0
|
0
|
|
|
|
0
|
if ( defined $cmd_name ) { |
382
|
0
|
|
|
|
|
0
|
my $content_length; |
383
|
0
|
0
|
|
|
|
0
|
if ( defined $headers->{'content-length'} ) { |
384
|
0
|
|
|
|
|
0
|
$content_length = $headers->{'content-length'}; |
385
|
0
|
0
|
|
|
|
0
|
return if length( $handle->{rbuf} ) < $content_length + 1; |
386
|
|
|
|
|
|
|
} |
387
|
|
|
|
|
|
|
else { |
388
|
0
|
|
|
|
|
0
|
$content_length = index( $handle->{rbuf}, "\0" ); |
389
|
0
|
0
|
|
|
|
0
|
return if $content_length < 0 |
390
|
|
|
|
|
|
|
} |
391
|
|
|
|
|
|
|
|
392
|
0
|
|
|
|
|
0
|
my $body = substr( $handle->{rbuf}, 0, $content_length, '' ); |
393
|
0
|
|
|
|
|
0
|
$handle->{rbuf} =~ s/^\0(?:${\(RE_EOL)})*//; |
|
0
|
|
|
|
|
0
|
|
394
|
|
|
|
|
|
|
|
395
|
0
|
|
|
|
|
0
|
$frame = _new_frame( $cmd_name, $headers, $body ); |
396
|
|
|
|
|
|
|
|
397
|
0
|
|
|
|
|
0
|
undef $cmd_name; |
398
|
0
|
|
|
|
|
0
|
undef $headers; |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
else { |
401
|
0
|
|
|
|
|
0
|
$handle->{rbuf} =~ s/^(?:${\(RE_EOL)})+//; |
|
0
|
|
|
|
|
0
|
|
402
|
|
|
|
|
|
|
|
403
|
0
|
0
|
|
|
|
0
|
return unless $handle->{rbuf} =~ s/^(.+?)(?:${\(RE_EOL)}){2}//s; |
|
0
|
|
|
|
|
0
|
|
404
|
|
|
|
|
|
|
|
405
|
0
|
|
|
|
|
0
|
( $cmd_name, my @header_strings ) = split( m/${\(RE_EOL)}/, $1 ); |
|
0
|
|
|
|
|
0
|
|
406
|
0
|
|
|
|
|
0
|
foreach my $header_str (@header_strings) { |
407
|
0
|
|
|
|
|
0
|
my ( $name, $value ) = split( /:/, $header_str, 2 ); |
408
|
0
|
|
|
|
|
0
|
$headers->{ _unescape($name) } = _unescape($value); |
409
|
|
|
|
|
|
|
} |
410
|
|
|
|
|
|
|
|
411
|
0
|
|
|
|
|
0
|
next; |
412
|
|
|
|
|
|
|
} |
413
|
|
|
|
|
|
|
|
414
|
0
|
|
|
|
|
0
|
$self->_process_frame($frame); |
415
|
|
|
|
|
|
|
} |
416
|
5
|
|
|
|
|
39
|
}; |
417
|
|
|
|
|
|
|
} |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
sub _prepare { |
420
|
2
|
|
|
2
|
|
2
|
my $self = shift; |
421
|
2
|
|
|
|
|
4
|
my $cmd_name = uc(shift); |
422
|
2
|
|
|
|
|
2
|
my $args = shift; |
423
|
|
|
|
|
|
|
|
424
|
2
|
|
|
|
|
2
|
my %params; |
425
|
|
|
|
|
|
|
|
426
|
2
|
50
|
33
|
|
|
8
|
if ( ref( $args->[-1] ) eq 'CODE' |
427
|
0
|
|
|
|
|
0
|
&& scalar @{$args} % 2 > 0 ) |
428
|
|
|
|
|
|
|
{ |
429
|
0
|
0
|
|
|
|
0
|
if ( $cmd_name eq 'SUBSCRIBE' ) { |
430
|
0
|
|
|
|
|
0
|
$params{on_message} = pop @{$args}; |
|
0
|
|
|
|
|
0
|
|
431
|
|
|
|
|
|
|
} |
432
|
|
|
|
|
|
|
else { |
433
|
0
|
|
|
|
|
0
|
$params{on_receipt} = pop @{$args}; |
|
0
|
|
|
|
|
0
|
|
434
|
|
|
|
|
|
|
} |
435
|
|
|
|
|
|
|
} |
436
|
|
|
|
|
|
|
|
437
|
2
|
|
|
|
|
3
|
my %headers = @{$args}; |
|
2
|
|
|
|
|
5
|
|
438
|
|
|
|
|
|
|
|
439
|
2
|
|
|
|
|
4
|
foreach my $name ( qw( body on_receipt on_message ) ) { |
440
|
6
|
50
|
|
|
|
12
|
if ( defined $headers{$name} ) { |
441
|
0
|
|
|
|
|
0
|
$params{$name} = delete $headers{$name}; |
442
|
|
|
|
|
|
|
} |
443
|
|
|
|
|
|
|
} |
444
|
2
|
100
|
|
|
|
7
|
if ( exists $ACK_CMDS{$cmd_name} ) { |
445
|
1
|
|
|
|
|
3
|
$params{message} = delete $headers{message}; |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
|
448
|
|
|
|
|
|
|
%headers = ( |
449
|
2
|
|
|
|
|
12
|
%{ $self->{default_headers} }, |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
defined $self->{command_headers}{$cmd_name} |
452
|
2
|
50
|
|
|
|
2
|
? %{ $self->{command_headers}{$cmd_name} } |
|
0
|
|
|
|
|
0
|
|
453
|
|
|
|
|
|
|
: (), |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
%headers, |
456
|
|
|
|
|
|
|
); |
457
|
|
|
|
|
|
|
|
458
|
2
|
|
|
|
|
8
|
my $cmd = { |
459
|
|
|
|
|
|
|
name => $cmd_name, |
460
|
|
|
|
|
|
|
headers => \%headers, |
461
|
|
|
|
|
|
|
%params, |
462
|
|
|
|
|
|
|
}; |
463
|
|
|
|
|
|
|
|
464
|
2
|
50
|
|
|
|
5
|
unless ( defined $cmd->{on_receipt} ) { |
465
|
2
|
|
|
|
|
5
|
weaken($self); |
466
|
|
|
|
|
|
|
|
467
|
|
|
|
|
|
|
$cmd->{on_receipt} = sub { |
468
|
0
|
|
|
0
|
|
0
|
my $receipt = shift; |
469
|
0
|
|
|
|
|
0
|
my $err = shift; |
470
|
|
|
|
|
|
|
|
471
|
0
|
0
|
|
|
|
0
|
if ( defined $err ) { |
472
|
0
|
|
|
|
|
0
|
$self->{on_error}->($err); |
473
|
0
|
|
|
|
|
0
|
return; |
474
|
|
|
|
|
|
|
} |
475
|
2
|
|
|
|
|
7
|
}; |
476
|
|
|
|
|
|
|
} |
477
|
|
|
|
|
|
|
|
478
|
2
|
|
|
|
|
5
|
return $cmd; |
479
|
|
|
|
|
|
|
} |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
sub _execute { |
482
|
2
|
|
|
2
|
|
3
|
my $self = shift; |
483
|
2
|
|
|
|
|
1
|
my $cmd = shift; |
484
|
|
|
|
|
|
|
|
485
|
2
|
100
|
66
|
|
|
19
|
if ( $cmd->{name} eq 'SUBSCRIBE' |
|
|
50
|
33
|
|
|
|
|
486
|
|
|
|
|
|
|
&& !defined $cmd->{on_message} ) |
487
|
|
|
|
|
|
|
{ |
488
|
1
|
|
|
|
|
78
|
croak '"on_message" callback must be specified'; |
489
|
|
|
|
|
|
|
} |
490
|
|
|
|
|
|
|
elsif ( exists $ACK_CMDS{ $cmd->{name} } |
491
|
|
|
|
|
|
|
&& !defined $cmd->{message} ) |
492
|
|
|
|
|
|
|
{ |
493
|
1
|
|
|
|
|
116
|
croak '"message" parameter must be specified'; |
494
|
|
|
|
|
|
|
} |
495
|
|
|
|
|
|
|
|
496
|
0
|
0
|
|
|
|
0
|
unless ( $self->{_ready} ) { |
497
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{_handle} ) { |
|
|
0
|
|
|
|
|
|
498
|
0
|
0
|
|
|
|
0
|
if ( $self->{_connected} ) { |
499
|
0
|
0
|
|
|
|
0
|
if ( $self->{_login_state} == S_NEED_DO ) { |
500
|
0
|
|
|
|
|
0
|
$self->_login; |
501
|
|
|
|
|
|
|
} |
502
|
|
|
|
|
|
|
} |
503
|
|
|
|
|
|
|
} |
504
|
|
|
|
|
|
|
elsif ( $self->{lazy} ) { |
505
|
0
|
|
|
|
|
0
|
undef $self->{lazy}; |
506
|
0
|
|
|
|
|
0
|
$self->_connect; |
507
|
|
|
|
|
|
|
} |
508
|
|
|
|
|
|
|
else { |
509
|
0
|
0
|
0
|
|
|
0
|
if ( defined $self->{reconnect_interval} |
510
|
|
|
|
|
|
|
&& $self->{reconnect_interval} > 0 ) |
511
|
|
|
|
|
|
|
{ |
512
|
0
|
0
|
|
|
|
0
|
unless ( defined $self->{_reconnect_timer} ) { |
513
|
0
|
|
|
|
|
0
|
weaken($self); |
514
|
|
|
|
|
|
|
|
515
|
|
|
|
|
|
|
$self->{_reconnect_timer} = AE::timer( |
516
|
|
|
|
|
|
|
$self->{reconnect_interval}, 0, |
517
|
|
|
|
|
|
|
sub { |
518
|
0
|
|
|
0
|
|
0
|
undef $self->{_reconnect_timer}; |
519
|
0
|
|
|
|
|
0
|
$self->_connect; |
520
|
|
|
|
|
|
|
} |
521
|
0
|
|
|
|
|
0
|
); |
522
|
|
|
|
|
|
|
} |
523
|
|
|
|
|
|
|
} |
524
|
|
|
|
|
|
|
else { |
525
|
0
|
|
|
|
|
0
|
$self->_connect; |
526
|
|
|
|
|
|
|
} |
527
|
|
|
|
|
|
|
} |
528
|
|
|
|
|
|
|
|
529
|
0
|
|
|
|
|
0
|
push( @{ $self->{_input_queue} }, $cmd ); |
|
0
|
|
|
|
|
0
|
|
530
|
|
|
|
|
|
|
|
531
|
0
|
|
|
|
|
0
|
return; |
532
|
|
|
|
|
|
|
} |
533
|
|
|
|
|
|
|
|
534
|
0
|
|
|
|
|
0
|
$self->_push_write($cmd); |
535
|
|
|
|
|
|
|
|
536
|
0
|
|
|
|
|
0
|
return; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
sub _push_write { |
540
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
541
|
0
|
|
|
|
|
0
|
my $cmd = shift; |
542
|
|
|
|
|
|
|
|
543
|
0
|
|
|
|
|
0
|
my $cmd_headers = $cmd->{headers}; |
544
|
|
|
|
|
|
|
|
545
|
0
|
0
|
|
|
|
0
|
if ( exists $ACK_CMDS{ $cmd->{name} } ) { |
546
|
0
|
0
|
|
|
|
0
|
unless ( $self->_check_ack( $cmd->{message} ) ) { |
547
|
0
|
|
|
|
|
0
|
my $err = _new_error( "Unexpected $cmd->{name} sent.", E_OPRN_ERROR ); |
548
|
0
|
|
|
0
|
|
0
|
AE::postpone { $cmd->{on_receipt}->( undef, $err ) }; |
|
0
|
|
|
|
|
0
|
|
549
|
|
|
|
|
|
|
|
550
|
0
|
|
|
|
|
0
|
return; |
551
|
|
|
|
|
|
|
} |
552
|
|
|
|
|
|
|
|
553
|
0
|
|
|
|
|
0
|
my $msg_headers = $cmd->{message}->headers; |
554
|
|
|
|
|
|
|
|
555
|
0
|
0
|
|
|
|
0
|
if ( $self->{_version} <= 1.1 ) { |
556
|
0
|
|
|
|
|
0
|
$cmd_headers->{'message-id'} = $msg_headers->{'message-id'}; |
557
|
0
|
0
|
|
|
|
0
|
if ( $self->{_version} > 1.0 ) { |
558
|
0
|
|
|
|
|
0
|
$cmd_headers->{subscription} = $msg_headers->{subscription}; |
559
|
|
|
|
|
|
|
} |
560
|
|
|
|
|
|
|
} |
561
|
|
|
|
|
|
|
else { |
562
|
0
|
|
|
|
|
0
|
$cmd_headers->{id} = $msg_headers->{ack}; |
563
|
|
|
|
|
|
|
} |
564
|
|
|
|
|
|
|
} |
565
|
|
|
|
|
|
|
|
566
|
0
|
0
|
0
|
|
|
0
|
if ( exists $NEED_RECEIPT{ $cmd->{name} } |
567
|
|
|
|
|
|
|
|| defined $cmd_headers->{receipt} ) |
568
|
|
|
|
|
|
|
{ |
569
|
0
|
0
|
|
|
|
0
|
if ( $cmd->{name} eq 'CONNECT' ) { |
570
|
0
|
|
|
|
|
0
|
$self->{_pending_receipts}{CONNECTED} = $cmd; |
571
|
|
|
|
|
|
|
} |
572
|
|
|
|
|
|
|
else { |
573
|
0
|
0
|
0
|
|
|
0
|
if ( !defined $cmd_headers->{receipt} |
574
|
|
|
|
|
|
|
|| $cmd_headers->{receipt} eq 'auto' ) |
575
|
|
|
|
|
|
|
{ |
576
|
0
|
|
|
|
|
0
|
$cmd_headers->{receipt} = 'R_' . $self->{_session_id} . '.' |
577
|
|
|
|
|
|
|
. $RECEIPT_SEQ++; |
578
|
|
|
|
|
|
|
} |
579
|
0
|
|
|
|
|
0
|
$self->{_pending_receipts}{ $cmd_headers->{receipt} } = $cmd; |
580
|
|
|
|
|
|
|
} |
581
|
|
|
|
|
|
|
} |
582
|
|
|
|
|
|
|
else { |
583
|
0
|
|
|
|
|
0
|
push( @{ $self->{_write_queue} }, $cmd ); |
|
0
|
|
|
|
|
0
|
|
584
|
|
|
|
|
|
|
} |
585
|
|
|
|
|
|
|
|
586
|
0
|
|
|
|
|
0
|
my $body = $cmd->{body}; |
587
|
0
|
0
|
|
|
|
0
|
unless ( defined $body ) { |
588
|
0
|
|
|
|
|
0
|
$body = ''; |
589
|
|
|
|
|
|
|
} |
590
|
0
|
0
|
|
|
|
0
|
unless ( defined $cmd_headers->{'content-length'} ) { |
591
|
0
|
|
|
|
|
0
|
$cmd_headers->{'content-length'} = length($body); |
592
|
|
|
|
|
|
|
} |
593
|
|
|
|
|
|
|
|
594
|
0
|
|
|
|
|
0
|
my $frame_str = $cmd->{name} . EOL; |
595
|
0
|
|
|
|
|
0
|
while ( my ( $name, $value ) = each %{$cmd_headers} ) { |
|
0
|
|
|
|
|
0
|
|
596
|
0
|
0
|
|
|
|
0
|
unless ( defined $value ) { |
597
|
0
|
|
|
|
|
0
|
$value = ''; |
598
|
|
|
|
|
|
|
} |
599
|
0
|
|
|
|
|
0
|
$frame_str .= _escape($name) . ':' . _escape($value) . EOL; |
600
|
|
|
|
|
|
|
} |
601
|
0
|
|
|
|
|
0
|
$frame_str .= EOL . "$body\0"; |
602
|
|
|
|
|
|
|
|
603
|
0
|
|
|
|
|
0
|
$self->{_handle}->push_write($frame_str); |
604
|
|
|
|
|
|
|
|
605
|
0
|
|
|
|
|
0
|
return; |
606
|
|
|
|
|
|
|
} |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
sub _login { |
609
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
610
|
|
|
|
|
|
|
|
611
|
0
|
|
|
|
|
0
|
my ( $cx, $cy ) = @{ $self->{heartbeat} }; |
|
0
|
|
|
|
|
0
|
|
612
|
|
|
|
|
|
|
|
613
|
0
|
0
|
|
|
|
0
|
if ( $cy > 0 ) { |
614
|
0
|
|
|
|
|
0
|
$self->_rtimeout($cy); |
615
|
|
|
|
|
|
|
} |
616
|
|
|
|
|
|
|
|
617
|
0
|
|
|
|
|
0
|
my %cmd_headers = ( |
618
|
|
|
|
|
|
|
'accept-version' => '1.0,1.1,1.2', |
619
|
|
|
|
|
|
|
'heart-beat' => join( ',', $cx, $cy ), |
620
|
|
|
|
|
|
|
); |
621
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{login} ) { |
622
|
0
|
|
|
|
|
0
|
$cmd_headers{login} = $self->{login}; |
623
|
|
|
|
|
|
|
} |
624
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{passcode} ) { |
625
|
0
|
|
|
|
|
0
|
$cmd_headers{passcode} = $self->{passcode}; |
626
|
|
|
|
|
|
|
} |
627
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{vhost} ) { |
628
|
0
|
|
|
|
|
0
|
$cmd_headers{host} = $self->{vhost}; |
629
|
|
|
|
|
|
|
} |
630
|
|
|
|
|
|
|
|
631
|
0
|
|
|
|
|
0
|
weaken($self); |
632
|
0
|
|
|
|
|
0
|
$self->{_login_state} = S_IN_PROGRESS; |
633
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
$self->_push_write( |
635
|
|
|
|
|
|
|
{ name => 'CONNECT', |
636
|
|
|
|
|
|
|
headers => \%cmd_headers, |
637
|
|
|
|
|
|
|
|
638
|
|
|
|
|
|
|
on_receipt => sub { |
639
|
0
|
|
|
0
|
|
0
|
my $receipt = shift; |
640
|
0
|
|
|
|
|
0
|
my $err = shift; |
641
|
|
|
|
|
|
|
|
642
|
0
|
0
|
|
|
|
0
|
if ( defined $err ) { |
643
|
0
|
|
|
|
|
0
|
$self->{_login_state} = S_NEED_DO; |
644
|
0
|
|
|
|
|
0
|
$self->_abort($err); |
645
|
|
|
|
|
|
|
|
646
|
0
|
|
|
|
|
0
|
return; |
647
|
|
|
|
|
|
|
} |
648
|
|
|
|
|
|
|
|
649
|
0
|
|
|
|
|
0
|
$self->{_login_state} = S_DONE; |
650
|
|
|
|
|
|
|
|
651
|
0
|
|
|
|
|
0
|
my $receipt_headers = $receipt->headers; |
652
|
|
|
|
|
|
|
|
653
|
0
|
0
|
|
|
|
0
|
if ( defined $receipt_headers->{'heart-beat'} ) { |
654
|
0
|
|
|
|
|
0
|
my ( $sx, $sy ) = split( /,/, $receipt_headers->{'heart-beat'} ); |
655
|
|
|
|
|
|
|
|
656
|
0
|
0
|
|
|
|
0
|
if ( $sx > 0 ) { |
657
|
0
|
|
|
|
|
0
|
$self->_rtimeout( max( $cy, $sx ) ); |
658
|
|
|
|
|
|
|
} |
659
|
0
|
0
|
|
|
|
0
|
if ( $sy > 0 ) { |
660
|
0
|
|
|
|
|
0
|
$self->_wtimeout( max( $cx, $sy ) ); |
661
|
|
|
|
|
|
|
} |
662
|
|
|
|
|
|
|
} |
663
|
|
|
|
|
|
|
|
664
|
0
|
|
|
|
|
0
|
$self->{_ready} = 1; |
665
|
|
|
|
|
|
|
$self->{_version} |
666
|
0
|
|
0
|
|
|
0
|
= version->parse( $receipt_headers->{version} || 1.0 ); |
667
|
0
|
|
0
|
|
|
0
|
$self->{_session_id} = $receipt_headers->{session} || ''; |
668
|
|
|
|
|
|
|
|
669
|
0
|
|
|
|
|
0
|
$self->_process_input_queue; |
670
|
|
|
|
|
|
|
}, |
671
|
|
|
|
|
|
|
} |
672
|
0
|
|
|
|
|
0
|
); |
673
|
|
|
|
|
|
|
|
674
|
0
|
|
|
|
|
0
|
return; |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
sub _rtimeout { |
678
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
679
|
0
|
|
|
|
|
0
|
my $rtimeout = shift; |
680
|
|
|
|
|
|
|
|
681
|
0
|
|
|
|
|
0
|
$self->{_handle}->rtimeout_reset; |
682
|
0
|
|
|
|
|
0
|
$self->{_handle}->rtimeout( ( $rtimeout / 1000 ) * 3 ); |
683
|
|
|
|
|
|
|
|
684
|
0
|
|
|
|
|
0
|
return; |
685
|
|
|
|
|
|
|
} |
686
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
sub _wtimeout { |
688
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
689
|
0
|
|
|
|
|
0
|
my $wtimeout = shift; |
690
|
|
|
|
|
|
|
|
691
|
0
|
|
|
|
|
0
|
$self->{_handle}->wtimeout_reset; |
692
|
0
|
|
|
|
|
0
|
$self->{_handle}->wtimeout( $wtimeout / 1000 ); |
693
|
|
|
|
|
|
|
|
694
|
0
|
|
|
|
|
0
|
return; |
695
|
|
|
|
|
|
|
} |
696
|
|
|
|
|
|
|
|
697
|
|
|
|
|
|
|
sub _process_input_queue { |
698
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
699
|
|
|
|
|
|
|
|
700
|
0
|
|
|
|
|
0
|
$self->{_temp_input_queue} = $self->{_input_queue}; |
701
|
0
|
|
|
|
|
0
|
$self->{_input_queue} = []; |
702
|
|
|
|
|
|
|
|
703
|
0
|
|
|
|
|
0
|
while ( my $cmd = shift @{ $self->{_temp_input_queue} } ) { |
|
0
|
|
|
|
|
0
|
|
704
|
0
|
|
|
|
|
0
|
$self->_push_write($cmd); |
705
|
|
|
|
|
|
|
} |
706
|
|
|
|
|
|
|
|
707
|
0
|
|
|
|
|
0
|
return; |
708
|
|
|
|
|
|
|
} |
709
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
sub _check_ack { |
711
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
712
|
0
|
|
|
|
|
0
|
my $msg = shift; |
713
|
|
|
|
|
|
|
|
714
|
0
|
|
|
|
|
0
|
my $msg_headers = $msg->headers; |
715
|
0
|
|
0
|
|
|
0
|
my $sub_id = $msg_headers->{subscription} || $msg_headers->{destination}; |
716
|
0
|
|
|
|
|
0
|
my $sub = $self->{_subs}{$sub_id}; |
717
|
0
|
|
|
|
|
0
|
my $msg_tag = $msg_headers->{'message-tag'}; |
718
|
|
|
|
|
|
|
|
719
|
0
|
0
|
|
|
|
0
|
if ( defined $sub ) { |
720
|
0
|
0
|
|
|
|
0
|
if ( defined $sub->{pending_acks} ) { |
721
|
0
|
0
|
|
|
|
0
|
if ( ref( $sub->{pending_acks} ) eq 'ARRAY' ) { |
722
|
|
|
|
|
|
|
my $i = bsearch_index { |
723
|
0
|
0
|
|
0
|
|
0
|
$msg_tag > $_ ? -1 : $msg_tag < $_ ? 1 : 0; |
|
|
0
|
|
|
|
|
|
724
|
|
|
|
|
|
|
} |
725
|
0
|
|
|
|
|
0
|
@{ $sub->{pending_acks} }; |
|
0
|
|
|
|
|
0
|
|
726
|
|
|
|
|
|
|
|
727
|
0
|
0
|
|
|
|
0
|
if ( $i >= 0 ) { |
728
|
0
|
|
|
|
|
0
|
splice( @{ $sub->{pending_acks} }, 0, $i + 1 ); |
|
0
|
|
|
|
|
0
|
|
729
|
0
|
|
|
|
|
0
|
return 1; |
730
|
|
|
|
|
|
|
} |
731
|
|
|
|
|
|
|
} |
732
|
|
|
|
|
|
|
else { # HASH |
733
|
0
|
0
|
|
|
|
0
|
return 1 if delete $sub->{pending_acks}{$msg_tag}; |
734
|
|
|
|
|
|
|
} |
735
|
|
|
|
|
|
|
} |
736
|
|
|
|
|
|
|
} |
737
|
|
|
|
|
|
|
|
738
|
0
|
|
|
|
|
0
|
return; |
739
|
|
|
|
|
|
|
} |
740
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
sub _process_frame { |
742
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
743
|
0
|
|
|
|
|
0
|
my $frame = shift; |
744
|
|
|
|
|
|
|
|
745
|
0
|
0
|
|
|
|
0
|
if ( $frame->command eq 'MESSAGE' ) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
746
|
0
|
|
|
|
|
0
|
$self->_process_message($frame); |
747
|
|
|
|
|
|
|
} |
748
|
|
|
|
|
|
|
elsif ( $frame->command eq 'RECEIPT' ) { |
749
|
0
|
|
|
|
|
0
|
$self->_process_receipt($frame); |
750
|
|
|
|
|
|
|
} |
751
|
|
|
|
|
|
|
elsif ( $frame->command eq 'ERROR' ) { |
752
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{_pending_receipts}{CONNECTED} ) { |
753
|
0
|
|
|
|
|
0
|
$frame->headers->{'receipt-id'} = 'CONNECTED'; |
754
|
|
|
|
|
|
|
} |
755
|
0
|
|
|
|
|
0
|
$self->_process_error($frame); |
756
|
|
|
|
|
|
|
} |
757
|
|
|
|
|
|
|
else { # CONNECTED |
758
|
0
|
|
|
|
|
0
|
$frame->headers->{'receipt-id'} = 'CONNECTED'; |
759
|
0
|
|
|
|
|
0
|
$self->_process_receipt($frame); |
760
|
|
|
|
|
|
|
} |
761
|
|
|
|
|
|
|
|
762
|
0
|
|
|
|
|
0
|
return; |
763
|
|
|
|
|
|
|
} |
764
|
|
|
|
|
|
|
|
765
|
|
|
|
|
|
|
sub _process_message { |
766
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
767
|
0
|
|
|
|
|
0
|
my $msg = shift; |
768
|
|
|
|
|
|
|
|
769
|
0
|
|
|
|
|
0
|
my $msg_headers = $msg->headers; |
770
|
0
|
|
0
|
|
|
0
|
my $sub_id = $msg_headers->{subscription} || $msg_headers->{destination}; |
771
|
0
|
|
|
|
|
0
|
my $sub = $self->{_subs}{$sub_id}; |
772
|
|
|
|
|
|
|
|
773
|
0
|
0
|
|
|
|
0
|
unless ( defined $sub ) { |
774
|
0
|
|
|
|
|
0
|
my $err = _new_error( |
775
|
|
|
|
|
|
|
qq{Don't know how process MESSAGE frame. Unknown subscription "$sub_id"}, |
776
|
|
|
|
|
|
|
E_UNEXPECTED_DATA |
777
|
|
|
|
|
|
|
); |
778
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
779
|
|
|
|
|
|
|
|
780
|
0
|
|
|
|
|
0
|
return; |
781
|
|
|
|
|
|
|
} |
782
|
|
|
|
|
|
|
|
783
|
0
|
|
|
|
|
0
|
my $msg_tag = $MESSAGE_SEQ++; |
784
|
0
|
|
|
|
|
0
|
$msg_headers->{'message-tag'} = $msg_tag; |
785
|
|
|
|
|
|
|
|
786
|
0
|
0
|
|
|
|
0
|
if ( defined $sub->{pending_acks} ) { |
787
|
0
|
0
|
|
|
|
0
|
if ( ref( $sub->{pending_acks} ) eq 'ARRAY' ) { |
788
|
0
|
|
|
|
|
0
|
push( @{ $sub->{pending_acks} }, $msg_tag ); |
|
0
|
|
|
|
|
0
|
|
789
|
|
|
|
|
|
|
} |
790
|
|
|
|
|
|
|
else { # HASH |
791
|
0
|
|
|
|
|
0
|
$sub->{pending_acks}{$msg_tag} = 1; |
792
|
|
|
|
|
|
|
} |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
|
795
|
0
|
|
|
|
|
0
|
$sub->{on_message}->($msg); |
796
|
|
|
|
|
|
|
|
797
|
0
|
|
|
|
|
0
|
return; |
798
|
|
|
|
|
|
|
} |
799
|
|
|
|
|
|
|
|
800
|
|
|
|
|
|
|
sub _process_receipt { |
801
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
802
|
0
|
|
|
|
|
0
|
my $receipt = shift; |
803
|
|
|
|
|
|
|
|
804
|
0
|
|
|
|
|
0
|
my $receipt_id = $receipt->headers->{'receipt-id'}; |
805
|
0
|
|
|
|
|
0
|
my $cmd = delete $self->{_pending_receipts}{$receipt_id}; |
806
|
|
|
|
|
|
|
|
807
|
0
|
0
|
|
|
|
0
|
unless ( defined $cmd ) { |
808
|
0
|
|
|
|
|
0
|
my $err = _new_error( |
809
|
|
|
|
|
|
|
qq{Unknown RECEIPT frame received: receipt-id=$receipt_id}, |
810
|
|
|
|
|
|
|
E_UNEXPECTED_DATA |
811
|
|
|
|
|
|
|
); |
812
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
813
|
|
|
|
|
|
|
|
814
|
0
|
|
|
|
|
0
|
return; |
815
|
|
|
|
|
|
|
} |
816
|
|
|
|
|
|
|
|
817
|
0
|
0
|
|
|
|
0
|
if ( exists $SUBUNSUB_CMDS{ $cmd->{name} } ) { |
|
|
0
|
|
|
|
|
|
818
|
0
|
|
|
|
|
0
|
my $cmd_headers = $cmd->{headers}; |
819
|
0
|
|
0
|
|
|
0
|
my $sub_id = $cmd_headers->{id} || $cmd_headers->{destination}; |
820
|
|
|
|
|
|
|
|
821
|
0
|
0
|
|
|
|
0
|
if ( $cmd->{name} eq 'SUBSCRIBE' ) { |
822
|
0
|
|
|
|
|
0
|
$self->{_subs}{$sub_id} = $cmd; |
823
|
|
|
|
|
|
|
|
824
|
0
|
0
|
|
|
|
0
|
if ( defined $cmd_headers->{ack} ) { |
825
|
0
|
0
|
|
|
|
0
|
if ( $cmd_headers->{ack} eq 'client' ) { |
|
|
0
|
|
|
|
|
|
826
|
0
|
|
|
|
|
0
|
$cmd->{pending_acks} = []; |
827
|
|
|
|
|
|
|
} |
828
|
|
|
|
|
|
|
elsif ( $cmd_headers->{ack} eq 'client-individual' ) { |
829
|
0
|
|
|
|
|
0
|
$cmd->{pending_acks} = {}; |
830
|
|
|
|
|
|
|
} |
831
|
|
|
|
|
|
|
} |
832
|
|
|
|
|
|
|
} |
833
|
|
|
|
|
|
|
else { # UNSUBSCRIBE |
834
|
0
|
|
|
|
|
0
|
delete $self->{_subs}{$sub_id}; |
835
|
|
|
|
|
|
|
} |
836
|
|
|
|
|
|
|
} |
837
|
|
|
|
|
|
|
elsif ( $cmd->{name} eq 'DISCONNECT' ) { |
838
|
0
|
|
|
|
|
0
|
$self->_disconnect; |
839
|
|
|
|
|
|
|
} |
840
|
|
|
|
|
|
|
|
841
|
0
|
|
|
|
|
0
|
$cmd->{on_receipt}->($receipt); |
842
|
|
|
|
|
|
|
|
843
|
0
|
|
|
|
|
0
|
return; |
844
|
|
|
|
|
|
|
} |
845
|
|
|
|
|
|
|
|
846
|
|
|
|
|
|
|
sub _process_error { |
847
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
848
|
0
|
|
|
|
|
0
|
my $err_frame = shift; |
849
|
|
|
|
|
|
|
|
850
|
0
|
|
|
|
|
0
|
my $err_headers = $err_frame->headers; |
851
|
0
|
|
|
|
|
0
|
my $err = _new_error( $err_headers->{message}, E_OPRN_ERROR, $err_frame ); |
852
|
|
|
|
|
|
|
|
853
|
0
|
|
|
|
|
0
|
my $cmd; |
854
|
0
|
0
|
|
|
|
0
|
if ( defined $err_headers->{'receipt-id'} ) { |
855
|
0
|
|
|
|
|
0
|
$cmd = delete $self->{_pending_receipts}{ $err_headers->{'receipt-id'} }; |
856
|
|
|
|
|
|
|
} |
857
|
|
|
|
|
|
|
|
858
|
0
|
0
|
|
|
|
0
|
if ( defined $cmd ) { |
859
|
0
|
|
|
|
|
0
|
$cmd->{on_receipt}->( undef, $err ); |
860
|
|
|
|
|
|
|
} |
861
|
|
|
|
|
|
|
else { |
862
|
0
|
|
|
|
|
0
|
$self->_disconnect($err); |
863
|
|
|
|
|
|
|
} |
864
|
|
|
|
|
|
|
|
865
|
0
|
|
|
|
|
0
|
return; |
866
|
|
|
|
|
|
|
} |
867
|
|
|
|
|
|
|
|
868
|
|
|
|
|
|
|
sub _disconnect { |
869
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
870
|
0
|
|
|
|
|
0
|
my $err = shift; |
871
|
|
|
|
|
|
|
|
872
|
0
|
|
|
|
|
0
|
my $was_connected = $self->{_connected}; |
873
|
|
|
|
|
|
|
|
874
|
0
|
0
|
|
|
|
0
|
if ( defined $self->{_handle} ) { |
875
|
0
|
|
|
|
|
0
|
$self->{_handle}->destroy; |
876
|
|
|
|
|
|
|
} |
877
|
0
|
|
|
|
|
0
|
$self->_reset_internals; |
878
|
0
|
|
|
|
|
0
|
$self->_abort($err); |
879
|
|
|
|
|
|
|
|
880
|
0
|
0
|
0
|
|
|
0
|
if ( $was_connected && defined $self->{on_disconnect} ) { |
881
|
0
|
|
|
|
|
0
|
$self->{on_disconnect}->(); |
882
|
|
|
|
|
|
|
} |
883
|
|
|
|
|
|
|
|
884
|
0
|
|
|
|
|
0
|
return; |
885
|
|
|
|
|
|
|
} |
886
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
sub _reset_internals { |
888
|
9
|
|
|
9
|
|
10
|
my $self = shift; |
889
|
|
|
|
|
|
|
|
890
|
9
|
|
|
|
|
11
|
$self->{_handle} = undef; |
891
|
9
|
|
|
|
|
18
|
$self->{_connected} = 0; |
892
|
9
|
|
|
|
|
14
|
$self->{_login_state} = S_NEED_DO; |
893
|
9
|
|
|
|
|
9
|
$self->{_ready} = 0; |
894
|
9
|
|
|
|
|
12
|
$self->{_version} = undef; |
895
|
9
|
|
|
|
|
9
|
$self->{_session_id} = undef; |
896
|
9
|
|
|
|
|
11
|
$self->{_reconnect_timer} = undef; |
897
|
|
|
|
|
|
|
|
898
|
9
|
|
|
|
|
9
|
return; |
899
|
|
|
|
|
|
|
} |
900
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
sub _abort { |
902
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
903
|
0
|
|
|
|
|
0
|
my $err = shift; |
904
|
|
|
|
|
|
|
|
905
|
0
|
|
|
|
|
0
|
my @queued_commands = $self->_queued_commands; |
906
|
0
|
|
|
|
|
0
|
my %subs = %{ $self->{_subs} }; |
|
0
|
|
|
|
|
0
|
|
907
|
|
|
|
|
|
|
|
908
|
0
|
|
|
|
|
0
|
$self->{_input_queue} = []; |
909
|
0
|
|
|
|
|
0
|
$self->{_temp_input_queue} = []; |
910
|
0
|
|
|
|
|
0
|
$self->{_write_queue} = []; |
911
|
0
|
|
|
|
|
0
|
$self->{_temp_write_queue} = []; |
912
|
0
|
|
|
|
|
0
|
$self->{_pending_receipts} = {}; |
913
|
0
|
|
|
|
|
0
|
$self->{_subs} = {}; |
914
|
|
|
|
|
|
|
|
915
|
0
|
0
|
0
|
|
|
0
|
if ( !defined $err && @queued_commands ) { |
916
|
0
|
|
|
|
|
0
|
$err = _new_error( 'Connection closed by client prematurely.', |
917
|
|
|
|
|
|
|
E_CONN_CLOSED_BY_CLIENT ); |
918
|
|
|
|
|
|
|
} |
919
|
|
|
|
|
|
|
|
920
|
0
|
0
|
|
|
|
0
|
if ( defined $err ) { |
921
|
0
|
|
|
|
|
0
|
my $err_msg = $err->message; |
922
|
0
|
|
|
|
|
0
|
my $err_code = $err->code; |
923
|
0
|
|
|
|
|
0
|
my $err_frame = $err->frame; |
924
|
|
|
|
|
|
|
|
925
|
0
|
|
|
|
|
0
|
$self->{on_error}->($err); |
926
|
|
|
|
|
|
|
|
927
|
0
|
0
|
0
|
|
|
0
|
if ( %subs && $err_code != E_CONN_CLOSED_BY_CLIENT ) { |
928
|
0
|
|
|
|
|
0
|
foreach my $sub_id ( keys %subs ) { |
929
|
0
|
|
|
|
|
0
|
my $err = _new_error( qq{Subscription "$sub_id" lost: $err_msg}, |
930
|
|
|
|
|
|
|
$err_code, $err_frame ); |
931
|
|
|
|
|
|
|
|
932
|
0
|
|
|
|
|
0
|
my $sub = $subs{$sub_id}; |
933
|
0
|
|
|
|
|
0
|
$sub->{on_receipt}->( undef, $err ); |
934
|
|
|
|
|
|
|
} |
935
|
|
|
|
|
|
|
} |
936
|
|
|
|
|
|
|
|
937
|
0
|
|
|
|
|
0
|
foreach my $cmd (@queued_commands) { |
938
|
0
|
|
|
|
|
0
|
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg}, |
939
|
|
|
|
|
|
|
$err_code, $err_frame ); |
940
|
0
|
|
|
|
|
0
|
$cmd->{on_receipt}->( undef, $err ); |
941
|
|
|
|
|
|
|
} |
942
|
|
|
|
|
|
|
} |
943
|
|
|
|
|
|
|
|
944
|
0
|
|
|
|
|
0
|
return; |
945
|
|
|
|
|
|
|
} |
946
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
sub _queued_commands { |
948
|
9
|
|
|
9
|
|
8
|
my $self = shift; |
949
|
|
|
|
|
|
|
|
950
|
|
|
|
|
|
|
return ( |
951
|
9
|
|
|
|
|
19
|
values %{ $self->{_pending_receipts} }, |
952
|
9
|
|
|
|
|
12
|
@{ $self->{_temp_write_queue} }, |
953
|
9
|
|
|
|
|
11
|
@{ $self->{_write_queue} }, |
954
|
9
|
|
|
|
|
8
|
@{ $self->{_temp_input_queue} }, |
955
|
9
|
|
|
|
|
9
|
@{ $self->{_input_queue} }, |
|
9
|
|
|
|
|
16
|
|
956
|
|
|
|
|
|
|
); |
957
|
|
|
|
|
|
|
} |
958
|
|
|
|
|
|
|
|
959
|
|
|
|
|
|
|
sub _escape { |
960
|
0
|
|
|
0
|
|
0
|
my $str = shift; |
961
|
|
|
|
|
|
|
|
962
|
0
|
|
|
|
|
0
|
$str =~ s/([\r\n:\\])/$ESCAPE_MAP{$1}/ge; |
|
0
|
|
|
|
|
0
|
|
963
|
|
|
|
|
|
|
|
964
|
0
|
|
|
|
|
0
|
return $str; |
965
|
|
|
|
|
|
|
} |
966
|
|
|
|
|
|
|
|
967
|
|
|
|
|
|
|
sub _unescape { |
968
|
0
|
|
|
0
|
|
0
|
my $str = shift; |
969
|
|
|
|
|
|
|
|
970
|
0
|
|
|
|
|
0
|
$str =~ s/(\\[rnc\\])/$UNESCAPE_MAP{$1}/ge; |
|
0
|
|
|
|
|
0
|
|
971
|
|
|
|
|
|
|
|
972
|
0
|
|
|
|
|
0
|
return $str; |
973
|
|
|
|
|
|
|
} |
974
|
|
|
|
|
|
|
|
975
|
|
|
|
|
|
|
sub _new_frame { |
976
|
0
|
|
|
0
|
|
0
|
return AnyEvent::Stomper::Frame->new(@_); |
977
|
|
|
|
|
|
|
} |
978
|
|
|
|
|
|
|
|
979
|
|
|
|
|
|
|
sub _new_error { |
980
|
0
|
|
|
0
|
|
0
|
return AnyEvent::Stomper::Error->new(@_); |
981
|
|
|
|
|
|
|
} |
982
|
|
|
|
|
|
|
|
983
|
|
|
|
|
|
|
sub DESTROY { |
984
|
15
|
|
|
15
|
|
2299
|
my $self = shift; |
985
|
|
|
|
|
|
|
|
986
|
15
|
100
|
|
|
|
36
|
if ( defined $self->{_handle} ) { |
987
|
5
|
|
|
|
|
19
|
$self->{_handle}->destroy; |
988
|
|
|
|
|
|
|
} |
989
|
|
|
|
|
|
|
|
990
|
15
|
100
|
|
|
|
445
|
if ( defined $self->{_pending_receipts} ) { |
991
|
9
|
|
|
|
|
20
|
my @queued_commands = $self->_queued_commands; |
992
|
|
|
|
|
|
|
|
993
|
9
|
|
|
|
|
14
|
foreach my $cmd (@queued_commands) { |
994
|
0
|
|
|
|
|
0
|
warn qq{Operation "$cmd->{name}" aborted:} |
995
|
|
|
|
|
|
|
. " Client object destroyed prematurely.\n"; |
996
|
|
|
|
|
|
|
} |
997
|
|
|
|
|
|
|
} |
998
|
|
|
|
|
|
|
|
999
|
15
|
|
|
|
|
182
|
return; |
1000
|
|
|
|
|
|
|
} |
1001
|
|
|
|
|
|
|
|
1002
|
|
|
|
|
|
|
1; |
1003
|
|
|
|
|
|
|
__END__ |