line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Beekeeper::MQTT; |
2
|
|
|
|
|
|
|
|
3
|
11
|
|
|
11
|
|
1195
|
use strict; |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
344
|
|
4
|
11
|
|
|
11
|
|
54
|
use warnings; |
|
11
|
|
|
|
|
30
|
|
|
11
|
|
|
|
|
478
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '0.10'; |
7
|
|
|
|
|
|
|
|
8
|
11
|
|
|
11
|
|
1103
|
use AnyEvent; |
|
11
|
|
|
|
|
5742
|
|
|
11
|
|
|
|
|
202
|
|
9
|
11
|
|
|
11
|
|
8942
|
use AnyEvent::Handle; |
|
11
|
|
|
|
|
101009
|
|
|
11
|
|
|
|
|
454
|
|
10
|
11
|
|
|
11
|
|
6631
|
use Time::HiRes; |
|
11
|
|
|
|
|
16520
|
|
|
11
|
|
|
|
|
68
|
|
11
|
11
|
|
|
11
|
|
1567
|
use List::Util 'shuffle'; |
|
11
|
|
|
|
|
26
|
|
|
11
|
|
|
|
|
1170
|
|
12
|
11
|
|
|
11
|
|
89
|
use Scalar::Util 'weaken'; |
|
11
|
|
|
|
|
27
|
|
|
11
|
|
|
|
|
570
|
|
13
|
11
|
|
|
11
|
|
77
|
use Exporter 'import'; |
|
11
|
|
|
|
|
161
|
|
|
11
|
|
|
|
|
358
|
|
14
|
11
|
|
|
11
|
|
188
|
use Carp; |
|
11
|
|
|
|
|
52
|
|
|
11
|
|
|
|
|
2661
|
|
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
our @EXPORT_OK; |
17
|
|
|
|
|
|
|
our %EXPORT_TAGS; |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
our $DEBUG = 0; |
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
EXPORT: { |
22
|
|
|
|
|
|
|
my (@const, @encode); |
23
|
|
|
|
|
|
|
foreach (keys %{Beekeeper::MQTT::}) { |
24
|
|
|
|
|
|
|
push @const, $_ if m/^MQTT_/; |
25
|
|
|
|
|
|
|
push @encode, $_ if m/^_(en|de)code/; |
26
|
|
|
|
|
|
|
} |
27
|
|
|
|
|
|
|
@EXPORT_OK = (@const, @encode); |
28
|
|
|
|
|
|
|
$EXPORT_TAGS{'const'} = \@const; |
29
|
|
|
|
|
|
|
$EXPORT_TAGS{'decode'} = \@encode; |
30
|
|
|
|
|
|
|
} |
31
|
|
|
|
|
|
|
|
32
|
|
|
|
|
|
|
# 2.1.2 Control Packet type |
33
|
|
|
|
|
|
|
|
34
|
11
|
|
|
11
|
|
115
|
use constant MQTT_CONNECT => 0x01; |
|
11
|
|
|
|
|
27
|
|
|
11
|
|
|
|
|
1150
|
|
35
|
11
|
|
|
11
|
|
85
|
use constant MQTT_CONNACK => 0x02; |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
610
|
|
36
|
11
|
|
|
11
|
|
64
|
use constant MQTT_PUBLISH => 0x03; |
|
11
|
|
|
|
|
32
|
|
|
11
|
|
|
|
|
734
|
|
37
|
11
|
|
|
11
|
|
77
|
use constant MQTT_PUBACK => 0x04; |
|
11
|
|
|
|
|
33
|
|
|
11
|
|
|
|
|
593
|
|
38
|
11
|
|
|
11
|
|
70
|
use constant MQTT_PUBREC => 0x05; |
|
11
|
|
|
|
|
17
|
|
|
11
|
|
|
|
|
577
|
|
39
|
11
|
|
|
11
|
|
66
|
use constant MQTT_PUBREL => 0x06; |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
545
|
|
40
|
11
|
|
|
11
|
|
67
|
use constant MQTT_PUBCOMP => 0x07; |
|
11
|
|
|
|
|
31
|
|
|
11
|
|
|
|
|
648
|
|
41
|
11
|
|
|
11
|
|
102
|
use constant MQTT_SUBSCRIBE => 0x08; |
|
11
|
|
|
|
|
33
|
|
|
11
|
|
|
|
|
711
|
|
42
|
11
|
|
|
11
|
|
78
|
use constant MQTT_SUBACK => 0x09; |
|
11
|
|
|
|
|
25
|
|
|
11
|
|
|
|
|
722
|
|
43
|
11
|
|
|
11
|
|
89
|
use constant MQTT_UNSUBSCRIBE => 0x0A; |
|
11
|
|
|
|
|
25
|
|
|
11
|
|
|
|
|
568
|
|
44
|
11
|
|
|
11
|
|
67
|
use constant MQTT_UNSUBACK => 0x0B; |
|
11
|
|
|
|
|
30
|
|
|
11
|
|
|
|
|
608
|
|
45
|
11
|
|
|
11
|
|
77
|
use constant MQTT_PINGREQ => 0x0C; |
|
11
|
|
|
|
|
27
|
|
|
11
|
|
|
|
|
570
|
|
46
|
11
|
|
|
11
|
|
85
|
use constant MQTT_PINGRESP => 0x0D; |
|
11
|
|
|
|
|
26
|
|
|
11
|
|
|
|
|
526
|
|
47
|
11
|
|
|
11
|
|
72
|
use constant MQTT_DISCONNECT => 0x0E; |
|
11
|
|
|
|
|
31
|
|
|
11
|
|
|
|
|
513
|
|
48
|
11
|
|
|
11
|
|
63
|
use constant MQTT_AUTH => 0x0F; |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
635
|
|
49
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
# 2.2.2.2 Properties |
51
|
|
|
|
|
|
|
|
52
|
11
|
|
|
11
|
|
85
|
use constant MQTT_PAYLOAD_FORMAT_INDICATOR => 0x01; # byte PUBLISH, Will Properties |
|
11
|
|
|
|
|
70
|
|
|
11
|
|
|
|
|
631
|
|
53
|
11
|
|
|
11
|
|
75
|
use constant MQTT_MESSAGE_EXPIRY_INTERVAL => 0x02; # long int PUBLISH, Will Properties |
|
11
|
|
|
|
|
22
|
|
|
11
|
|
|
|
|
589
|
|
54
|
11
|
|
|
11
|
|
78
|
use constant MQTT_CONTENT_TYPE => 0x03; # utf8 string PUBLISH, Will Properties |
|
11
|
|
|
|
|
28
|
|
|
11
|
|
|
|
|
609
|
|
55
|
11
|
|
|
11
|
|
69
|
use constant MQTT_RESPONSE_TOPIC => 0x08; # utf8 string PUBLISH, Will Properties |
|
11
|
|
|
|
|
28
|
|
|
11
|
|
|
|
|
602
|
|
56
|
11
|
|
|
11
|
|
78
|
use constant MQTT_CORRELATION_DATA => 0x09; # binary data PUBLISH, Will Properties |
|
11
|
|
|
|
|
37
|
|
|
11
|
|
|
|
|
543
|
|
57
|
11
|
|
|
11
|
|
67
|
use constant MQTT_SUBSCRIPTION_IDENTIFIER => 0x0B; # variable int PUBLISH, SUBSCRIBE |
|
11
|
|
|
|
|
33
|
|
|
11
|
|
|
|
|
630
|
|
58
|
11
|
|
|
11
|
|
78
|
use constant MQTT_SESSION_EXPIRY_INTERVAL => 0x11; # long int CONNECT, CONNACK, DISCONNECT |
|
11
|
|
|
|
|
28
|
|
|
11
|
|
|
|
|
699
|
|
59
|
11
|
|
|
11
|
|
77
|
use constant MQTT_ASSIGNED_CLIENT_IDENTIFIER => 0x12; # utf8 string CONNACK |
|
11
|
|
|
|
|
32
|
|
|
11
|
|
|
|
|
620
|
|
60
|
11
|
|
|
11
|
|
90
|
use constant MQTT_SERVER_KEEP_ALIVE => 0x13; # short int CONNACK |
|
11
|
|
|
|
|
35
|
|
|
11
|
|
|
|
|
645
|
|
61
|
11
|
|
|
11
|
|
67
|
use constant MQTT_AUTHENTICATION_METHOD => 0x15; # utf8 string CONNECT, CONNACK, AUTH |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
592
|
|
62
|
11
|
|
|
11
|
|
66
|
use constant MQTT_AUTHENTICATION_DATA => 0x16; # binary data CONNECT, CONNACK, AUTH |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
544
|
|
63
|
11
|
|
|
11
|
|
71
|
use constant MQTT_REQUEST_PROBLEM_INFORMATION => 0x17; # byte CONNECT |
|
11
|
|
|
|
|
22
|
|
|
11
|
|
|
|
|
538
|
|
64
|
11
|
|
|
11
|
|
61
|
use constant MQTT_WILL_DELAY_INTERVAL => 0x18; # long int Will Properties |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
553
|
|
65
|
11
|
|
|
11
|
|
118
|
use constant MQTT_REQUEST_RESPONSE_INFORMATION => 0x19; # byte CONNECT |
|
11
|
|
|
|
|
26
|
|
|
11
|
|
|
|
|
578
|
|
66
|
11
|
|
|
11
|
|
69
|
use constant MQTT_RESPONSE_INFORMATION => 0x1A; # utf8 string CONNACK |
|
11
|
|
|
|
|
18
|
|
|
11
|
|
|
|
|
500
|
|
67
|
11
|
|
|
11
|
|
64
|
use constant MQTT_SERVER_REFERENCE => 0x1C; # utf8 string CONNACK, DISCONNECT |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
596
|
|
68
|
11
|
|
|
11
|
|
79
|
use constant MQTT_REASON_STRING => 0x1F; # utf8 string CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH |
|
11
|
|
|
|
|
30
|
|
|
11
|
|
|
|
|
568
|
|
69
|
11
|
|
|
11
|
|
65
|
use constant MQTT_RECEIVE_MAXIMUM => 0x21; # short int CONNECT, CONNACK |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
587
|
|
70
|
11
|
|
|
11
|
|
67
|
use constant MQTT_TOPIC_ALIAS_MAXIMUM => 0x22; # short int CONNECT, CONNACK |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
511
|
|
71
|
11
|
|
|
11
|
|
65
|
use constant MQTT_TOPIC_ALIAS => 0x23; # short int PUBLISH |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
543
|
|
72
|
11
|
|
|
11
|
|
84
|
use constant MQTT_MAXIMUM_QOS => 0x24; # byte CONNACK |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
625
|
|
73
|
11
|
|
|
11
|
|
73
|
use constant MQTT_RETAIN_AVAILABLE => 0x25; # byte CONNACK |
|
11
|
|
|
|
|
42
|
|
|
11
|
|
|
|
|
547
|
|
74
|
11
|
|
|
11
|
|
87
|
use constant MQTT_USER_PROPERTY => 0x26; # utf8 pair CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH |
|
11
|
|
|
|
|
31
|
|
|
11
|
|
|
|
|
556
|
|
75
|
11
|
|
|
11
|
|
70
|
use constant MQTT_MAXIMUM_PACKET_SIZE => 0x27; # long int CONNECT, CONNACK |
|
11
|
|
|
|
|
19
|
|
|
11
|
|
|
|
|
590
|
|
76
|
11
|
|
|
11
|
|
90
|
use constant MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE => 0x28; # byte CONNACK |
|
11
|
|
|
|
|
29
|
|
|
11
|
|
|
|
|
613
|
|
77
|
11
|
|
|
11
|
|
75
|
use constant MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE => 0x29; # byte CONNACK |
|
11
|
|
|
|
|
20
|
|
|
11
|
|
|
|
|
569
|
|
78
|
11
|
|
|
11
|
|
284
|
use constant MQTT_SHARED_SUBSCRIPTION_AVAILABLE => 0x2A; # byte CONNACK |
|
11
|
|
|
|
|
29
|
|
|
11
|
|
|
|
|
99324
|
|
79
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
# 2.4 Reason Code |
81
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
my %Reason_code = ( |
83
|
|
|
|
|
|
|
0x00 => 'Success', # CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH |
84
|
|
|
|
|
|
|
# 0x00 => 'Normal disconnection', # DISCONNECT |
85
|
|
|
|
|
|
|
# 0x00 => 'Granted QoS 0', # SUBACK |
86
|
|
|
|
|
|
|
# 0x01 => 'Granted QoS 1', # SUBACK |
87
|
|
|
|
|
|
|
# 0x02 => 'Granted QoS 2', # SUBACK |
88
|
|
|
|
|
|
|
0x04 => 'Disconnect with Will Message', # DISCONNECT |
89
|
|
|
|
|
|
|
0x10 => 'No matching subscribers', # PUBACK, PUBREC |
90
|
|
|
|
|
|
|
0x11 => 'No subscription existed', # UNSUBACK |
91
|
|
|
|
|
|
|
0x18 => 'Continue authentication', # AUTH |
92
|
|
|
|
|
|
|
0x19 => 'Re-authenticate', # AUTH |
93
|
|
|
|
|
|
|
0x80 => 'Unspecified error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
94
|
|
|
|
|
|
|
0x81 => 'Malformed Packet', # CONNACK, DISCONNECT |
95
|
|
|
|
|
|
|
0x82 => 'Protocol Error', # CONNACK, DISCONNECT |
96
|
|
|
|
|
|
|
0x83 => 'Implementation specific error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
97
|
|
|
|
|
|
|
0x84 => 'Unsupported Protocol Version', # CONNACK |
98
|
|
|
|
|
|
|
0x85 => 'Client Identifier not valid', # CONNACK |
99
|
|
|
|
|
|
|
0x86 => 'Bad User Name or Password', # CONNACK |
100
|
|
|
|
|
|
|
0x87 => 'Not authorized', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT |
101
|
|
|
|
|
|
|
0x88 => 'Server unavailable', # CONNACK |
102
|
|
|
|
|
|
|
0x89 => 'Server busy', # CONNACK, DISCONNECT |
103
|
|
|
|
|
|
|
0x8A => 'Banned', # CONNACK |
104
|
|
|
|
|
|
|
0x8B => 'Server shutting down', # DISCONNECT |
105
|
|
|
|
|
|
|
0x8C => 'Bad authentication method', # CONNACK, DISCONNECT |
106
|
|
|
|
|
|
|
0x8D => 'Keep Alive timeout', # DISCONNECT |
107
|
|
|
|
|
|
|
0x8E => 'Session taken over', # DISCONNECT |
108
|
|
|
|
|
|
|
0x8F => 'Topic Filter invalid', # SUBACK, UNSUBACK, DISCONNECT |
109
|
|
|
|
|
|
|
0x90 => 'Topic Name invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT |
110
|
|
|
|
|
|
|
0x91 => 'Packet Identifier in use', # PUBACK, PUBREC, SUBACK, UNSUBACK |
111
|
|
|
|
|
|
|
0x92 => 'Packet Identifier not found', # PUBREL, PUBCOMP |
112
|
|
|
|
|
|
|
0x93 => 'Receive Maximum exceeded', # DISCONNECT |
113
|
|
|
|
|
|
|
0x94 => 'Topic Alias invalid', # DISCONNECT |
114
|
|
|
|
|
|
|
0x95 => 'Packet too large', # CONNACK, DISCONNECT |
115
|
|
|
|
|
|
|
0x96 => 'Message rate too high', # DISCONNECT |
116
|
|
|
|
|
|
|
0x97 => 'Quota exceeded', # CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT |
117
|
|
|
|
|
|
|
0x98 => 'Administrative action', # DISCONNECT |
118
|
|
|
|
|
|
|
0x99 => 'Payload format invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT |
119
|
|
|
|
|
|
|
0x9A => 'Retain not supported', # CONNACK, DISCONNECT |
120
|
|
|
|
|
|
|
0x9B => 'QoS not supported', # CONNACK, DISCONNECT |
121
|
|
|
|
|
|
|
0x9C => 'Use another server', # CONNACK, DISCONNECT |
122
|
|
|
|
|
|
|
0x9D => 'Server moved', # CONNACK, DISCONNECT |
123
|
|
|
|
|
|
|
0x9E => 'Shared Subscriptions not supported', # SUBACK, DISCONNECT |
124
|
|
|
|
|
|
|
0x9F => 'Connection rate exceeded', # CONNACK, DISCONNECT |
125
|
|
|
|
|
|
|
0xA0 => 'Maximum connect time', # DISCONNECT |
126
|
|
|
|
|
|
|
0xA1 => 'Subscription Identifiers not supported', # SUBACK, DISCONNECT |
127
|
|
|
|
|
|
|
0xA2 => 'Wildcard Subscriptions not supported', # SUBACK, DISCONNECT |
128
|
|
|
|
|
|
|
); |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
# 3.9.3 Subscribe Reason Codes |
131
|
|
|
|
|
|
|
|
132
|
|
|
|
|
|
|
my %Subscribe_reason_code = ( |
133
|
|
|
|
|
|
|
%Reason_code, |
134
|
|
|
|
|
|
|
0x00 => 'Granted QoS 0', |
135
|
|
|
|
|
|
|
0x01 => 'Granted QoS 1', |
136
|
|
|
|
|
|
|
0x02 => 'Granted QoS 2', |
137
|
|
|
|
|
|
|
); |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
# 3.14.2.1 Disconnect Reason Code |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
my %Disconnect_reason_code = ( |
142
|
|
|
|
|
|
|
%Reason_code, |
143
|
|
|
|
|
|
|
0x00 => 'Normal disconnection', |
144
|
|
|
|
|
|
|
); |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
sub _decode_byte { |
147
|
76
|
|
|
76
|
|
160
|
my ($packet, $offs) = @_; |
148
|
|
|
|
|
|
|
|
149
|
76
|
|
|
|
|
299
|
my $byte = unpack("C", substr($$packet, $$offs, 1)); |
150
|
76
|
|
|
|
|
166
|
$$offs += 1; |
151
|
|
|
|
|
|
|
|
152
|
76
|
|
|
|
|
5346
|
return $byte; |
153
|
|
|
|
|
|
|
} |
154
|
|
|
|
|
|
|
|
155
|
|
|
|
|
|
|
sub _decode_int_16 { |
156
|
19
|
|
|
19
|
|
50
|
my ($packet, $offs) = @_; |
157
|
|
|
|
|
|
|
|
158
|
19
|
|
|
|
|
89
|
my $int = unpack("n", substr($$packet, $$offs, 2)); |
159
|
19
|
|
|
|
|
51
|
$$offs += 2; |
160
|
|
|
|
|
|
|
|
161
|
19
|
|
|
|
|
77
|
return $int; |
162
|
|
|
|
|
|
|
} |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
sub _decode_int_32 { |
165
|
0
|
|
|
0
|
|
0
|
my ($packet, $offs) = @_; |
166
|
|
|
|
|
|
|
|
167
|
0
|
|
|
|
|
0
|
my $int = unpack("N", substr($$packet, $$offs, 4)); |
168
|
0
|
|
|
|
|
0
|
$$offs += 4; |
169
|
|
|
|
|
|
|
|
170
|
0
|
|
|
|
|
0
|
return $int; |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
sub _decode_utf8_str { |
174
|
0
|
|
|
0
|
|
0
|
my ($packet, $offs) = @_; |
175
|
|
|
|
|
|
|
|
176
|
0
|
|
|
|
|
0
|
my $str = unpack("n/a", substr($$packet, $$offs)); |
177
|
0
|
|
|
|
|
0
|
$$offs += 2 + length($str); |
178
|
0
|
|
|
|
|
0
|
utf8::decode($str); |
179
|
|
|
|
|
|
|
|
180
|
0
|
|
|
|
|
0
|
return $str; |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
sub _decode_binary_data { |
184
|
0
|
|
|
0
|
|
0
|
my ($packet, $offs) = @_; |
185
|
|
|
|
|
|
|
|
186
|
0
|
|
|
|
|
0
|
my $data = unpack("n/a", substr($$packet, $$offs)); |
187
|
0
|
|
|
|
|
0
|
$$offs += 2 + length($data); |
188
|
|
|
|
|
|
|
|
189
|
0
|
|
|
|
|
0
|
return $data; |
190
|
|
|
|
|
|
|
} |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
sub _decode_var_int { |
193
|
206
|
|
|
206
|
|
512
|
my ($packet, $offs) = @_; |
194
|
|
|
|
|
|
|
|
195
|
206
|
|
|
|
|
405
|
my $int = 0; |
196
|
206
|
|
|
|
|
332
|
my $mult = 1; |
197
|
206
|
|
|
|
|
319
|
my $byte; |
198
|
|
|
|
|
|
|
|
199
|
206
|
|
|
|
|
421
|
do { |
200
|
206
|
|
|
|
|
928
|
$byte = unpack("C", substr($$packet, $$offs, 1)); |
201
|
206
|
|
|
|
|
807
|
$int += ($byte & 0x7F) * $mult; |
202
|
206
|
|
|
|
|
390
|
$mult *= 128; |
203
|
206
|
|
|
|
|
854
|
$$offs++; |
204
|
|
|
|
|
|
|
} while ($byte & 0x80); |
205
|
|
|
|
|
|
|
|
206
|
206
|
|
|
|
|
1042
|
return $int; |
207
|
|
|
|
|
|
|
} |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
sub _encode_var_int { |
210
|
301
|
100
|
|
301
|
|
1916
|
return pack("C", $_[0]) if ($_[0] < 128); |
211
|
30
|
|
|
|
|
237
|
my @a = unpack("C*", pack("w", $_[0])); |
212
|
30
|
|
|
|
|
91
|
$a[0] &= 0x7F; |
213
|
30
|
|
|
|
|
64
|
$a[-1] |= 0x80; |
214
|
30
|
|
|
|
|
15384
|
return pack("C*", reverse @a); |
215
|
|
|
|
|
|
|
} |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
sub new { |
219
|
19
|
|
|
19
|
1
|
10217
|
my ($class, %args) = @_; |
220
|
|
|
|
|
|
|
|
221
|
19
|
|
|
|
|
687
|
my $self = { |
222
|
|
|
|
|
|
|
bus_id => undef, |
223
|
|
|
|
|
|
|
bus_role => undef, |
224
|
|
|
|
|
|
|
handle => undef, # the socket |
225
|
|
|
|
|
|
|
hosts => undef, # list of all hosts in cluster |
226
|
|
|
|
|
|
|
is_connected => undef, # true once connected |
227
|
|
|
|
|
|
|
try_hosts => undef, # list of hosts to try to connect |
228
|
|
|
|
|
|
|
connect_err => undef, # count of connection errors |
229
|
|
|
|
|
|
|
timeout_tmr => undef, # timer used for connection timeout |
230
|
|
|
|
|
|
|
reconnect_tmr => undef, # timer used for connection retry |
231
|
|
|
|
|
|
|
connack_cb => undef, # connack callback |
232
|
|
|
|
|
|
|
error_cb => undef, # error callback |
233
|
|
|
|
|
|
|
client_id => undef, # client id |
234
|
|
|
|
|
|
|
server_prop => {}, # server properties |
235
|
|
|
|
|
|
|
server_alias => {}, # server topic aliases |
236
|
|
|
|
|
|
|
client_alias => {}, # client topic aliases |
237
|
|
|
|
|
|
|
subscriptions => {}, # topic subscriptions |
238
|
|
|
|
|
|
|
subscr_cb => {}, # subscription callbacks |
239
|
|
|
|
|
|
|
packet_cb => {}, # packet callbacks |
240
|
|
|
|
|
|
|
buffers => {}, # raw mqtt buffers |
241
|
|
|
|
|
|
|
packet_seq => 1, # sequence used for packet ids |
242
|
|
|
|
|
|
|
subscr_seq => 1, # sequence used for subscription ids |
243
|
|
|
|
|
|
|
alias_seq => 1, # sequence used for topic alias ids |
244
|
|
|
|
|
|
|
use_alias => 0, # topic alias enabled |
245
|
|
|
|
|
|
|
config => \%args, |
246
|
|
|
|
|
|
|
}; |
247
|
|
|
|
|
|
|
|
248
|
19
|
|
|
|
|
104
|
$self->{bus_id} = delete $args{'bus_id'}; |
249
|
19
|
|
33
|
|
|
197
|
$self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id}; |
250
|
19
|
|
|
|
|
86
|
$self->{error_cb} = delete $args{'on_error'}; |
251
|
|
|
|
|
|
|
|
252
|
19
|
|
|
|
|
96
|
bless $self, $class; |
253
|
19
|
|
|
|
|
161
|
return $self; |
254
|
|
|
|
|
|
|
} |
255
|
|
|
|
|
|
|
|
256
|
0
|
|
|
0
|
0
|
0
|
sub bus_id { $_[0]->{bus_id} } |
257
|
0
|
|
|
0
|
0
|
0
|
sub bus_role { $_[0]->{bus_role} } |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
sub _fatal { |
260
|
0
|
|
|
0
|
|
0
|
my ($self, $errstr) = @_; |
261
|
0
|
0
|
|
|
|
0
|
die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb}; |
262
|
0
|
|
|
|
|
0
|
$self->{error_cb}->($errstr); |
263
|
|
|
|
|
|
|
} |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
our $BUSY_SINCE = undef; |
266
|
|
|
|
|
|
|
our $BUSY_TIME = 0; |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
sub connect { |
269
|
19
|
|
|
19
|
1
|
211
|
my ($self, %args) = @_; |
270
|
|
|
|
|
|
|
|
271
|
19
|
|
|
|
|
185
|
$self->{connack_cb} = $args{'on_connack'}; |
272
|
19
|
|
|
|
|
659
|
$self->{connect_cv} = AnyEvent->condvar; |
273
|
|
|
|
|
|
|
|
274
|
19
|
|
|
|
|
105294
|
$self->_connect; |
275
|
|
|
|
|
|
|
|
276
|
19
|
50
|
|
|
|
164
|
$self->{connect_cv}->recv if $args{'blocking'}; |
277
|
19
|
|
|
|
|
720
|
$self->{connect_cv} = undef; |
278
|
|
|
|
|
|
|
|
279
|
19
|
50
|
|
|
|
101
|
return $args{'blocking'} ? $self->{is_connected} : 1; |
280
|
|
|
|
|
|
|
} |
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
sub _connect { |
283
|
19
|
|
|
19
|
|
57
|
my ($self) = @_; |
284
|
19
|
|
|
|
|
104
|
weaken($self); |
285
|
|
|
|
|
|
|
|
286
|
19
|
|
|
|
|
58
|
my $config = $self->{config}; |
287
|
|
|
|
|
|
|
|
288
|
19
|
|
|
|
|
48
|
my $timeout = $config->{'timeout'}; |
289
|
19
|
50
|
|
|
|
79
|
$timeout = 30 unless defined $timeout; |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
# Ensure that timeout is set properly when the event loop was blocked |
292
|
19
|
|
|
|
|
174
|
AnyEvent->now_update; |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
# Connection timeout handler |
295
|
19
|
50
|
33
|
|
|
422
|
if ($timeout && !$self->{timeout_tmr}) { |
296
|
|
|
|
|
|
|
$self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { |
297
|
0
|
|
|
0
|
|
0
|
$self->_reset_connection; |
298
|
0
|
|
|
|
|
0
|
$self->{connect_cv}->send; |
299
|
0
|
|
|
|
|
0
|
$self->_fatal("Could not connect to MQTT broker after $timeout seconds"); |
300
|
19
|
|
|
|
|
302
|
}); |
301
|
|
|
|
|
|
|
} |
302
|
|
|
|
|
|
|
|
303
|
19
|
50
|
|
|
|
650
|
unless ($self->{hosts}) { |
304
|
|
|
|
|
|
|
# Initialize the list of cluster hosts |
305
|
19
|
|
50
|
|
|
94
|
my $hosts = $config->{'host'} || 'localhost'; |
306
|
19
|
50
|
|
|
|
109
|
my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts ); |
307
|
19
|
|
|
|
|
278
|
$self->{hosts} = [ shuffle @hosts ]; |
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
|
310
|
|
|
|
|
|
|
# Determine next host of cluster to connect to |
311
|
19
|
|
50
|
|
|
152
|
my $try_hosts = $self->{try_hosts} ||= []; |
312
|
19
|
50
|
|
|
|
91
|
@$try_hosts = @{$self->{hosts}} unless @$try_hosts; |
|
19
|
|
|
|
|
105
|
|
313
|
|
|
|
|
|
|
|
314
|
|
|
|
|
|
|
# TCP connection args |
315
|
19
|
|
|
|
|
53
|
my $host = shift @$try_hosts; |
316
|
19
|
|
50
|
|
|
131
|
my $tls = $config->{'tls'} || 0; |
317
|
19
|
|
33
|
|
|
82
|
my $port = $config->{'port'} || ( $tls ? 8883 : 1883 ); |
318
|
|
|
|
|
|
|
|
319
|
19
|
|
|
|
|
133
|
($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint |
320
|
19
|
|
|
|
|
122
|
($port) = ($port =~ m/^([0-9]+)$/s); |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
$self->{handle} = AnyEvent::Handle->new( |
323
|
|
|
|
|
|
|
connect => [ $host, $port ], |
324
|
|
|
|
|
|
|
tls => $tls ? 'connect' : undef, |
325
|
|
|
|
|
|
|
keepalive => 1, |
326
|
|
|
|
|
|
|
no_delay => 1, |
327
|
|
|
|
|
|
|
on_connect => sub { |
328
|
19
|
|
|
19
|
|
7539
|
my ($fh, $host, $port) = @_; |
329
|
|
|
|
|
|
|
# Send CONNECT packet |
330
|
19
|
|
|
|
|
60
|
$self->{server_prop}->{host} = $host; |
331
|
19
|
|
|
|
|
45
|
$self->{server_prop}->{port} = $port; |
332
|
19
|
|
|
|
|
122
|
$self->_send_connect; |
333
|
|
|
|
|
|
|
}, |
334
|
|
|
|
|
|
|
on_connect_error => sub { |
335
|
0
|
|
|
0
|
|
0
|
my ($fh, $errmsg) = @_; |
336
|
|
|
|
|
|
|
# Some error occurred while connection, such as an unresolved hostname |
337
|
|
|
|
|
|
|
# or connection refused. Try next host of cluster immediately, or retry |
338
|
|
|
|
|
|
|
# in few seconds if all hosts of the cluster are unresponsive |
339
|
0
|
|
|
|
|
0
|
$self->{connect_err}++; |
340
|
0
|
0
|
|
|
|
0
|
warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}}); |
|
0
|
|
|
|
|
0
|
|
341
|
0
|
0
|
|
|
|
0
|
my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}}; |
|
0
|
|
|
|
|
0
|
|
|
0
|
|
|
|
|
0
|
|
342
|
|
|
|
|
|
|
$self->{reconnect_tmr} = AnyEvent->timer( |
343
|
|
|
|
|
|
|
after => ($delay < 10 ? $delay : 10), |
344
|
0
|
|
|
|
|
0
|
cb => sub { $self->_connect }, |
345
|
0
|
0
|
|
|
|
0
|
); |
346
|
|
|
|
|
|
|
}, |
347
|
|
|
|
|
|
|
on_error => sub { |
348
|
0
|
|
|
0
|
|
0
|
my ($fh, $fatal, $errmsg) = @_; |
349
|
|
|
|
|
|
|
# Some error occurred, such as a read error |
350
|
0
|
|
|
|
|
0
|
$self->_reset_connection; |
351
|
0
|
|
|
|
|
0
|
$self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg"); |
352
|
|
|
|
|
|
|
}, |
353
|
|
|
|
|
|
|
on_eof => sub { |
354
|
0
|
|
|
0
|
|
0
|
my ($fh) = @_; |
355
|
|
|
|
|
|
|
# The server has closed the connection without sending DISCONNECT |
356
|
0
|
|
|
|
|
0
|
$self->_reset_connection; |
357
|
0
|
|
|
|
|
0
|
$self->_fatal("MQTT broker at $host:$port has gone away"); |
358
|
|
|
|
|
|
|
}, |
359
|
|
|
|
|
|
|
on_read => sub { |
360
|
272
|
|
|
272
|
|
4299249
|
my ($fh) = @_; |
361
|
|
|
|
|
|
|
|
362
|
272
|
|
|
|
|
2122
|
my $packet_type; |
363
|
|
|
|
|
|
|
my $packet_flags; |
364
|
|
|
|
|
|
|
|
365
|
272
|
|
|
|
|
0
|
my $rbuff_len; |
366
|
272
|
|
|
|
|
0
|
my $packet_len; |
367
|
|
|
|
|
|
|
|
368
|
272
|
|
|
|
|
0
|
my $mult; |
369
|
272
|
|
|
|
|
0
|
my $offs; |
370
|
272
|
|
|
|
|
0
|
my $byte; |
371
|
|
|
|
|
|
|
|
372
|
272
|
|
|
|
|
0
|
my $timing_packets; |
373
|
|
|
|
|
|
|
|
374
|
272
|
50
|
|
|
|
941
|
unless (defined $BUSY_SINCE) { |
375
|
|
|
|
|
|
|
# Measure time elapsed while processing incoming packets |
376
|
272
|
|
|
|
|
809
|
$BUSY_SINCE = Time::HiRes::time; |
377
|
272
|
|
|
|
|
518
|
$timing_packets = 1; |
378
|
|
|
|
|
|
|
} |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
PARSE_PACKET: { |
381
|
|
|
|
|
|
|
|
382
|
272
|
|
|
|
|
461
|
$rbuff_len = length $fh->{rbuf}; |
|
464
|
|
|
|
|
1393
|
|
383
|
|
|
|
|
|
|
|
384
|
464
|
100
|
|
|
|
1637
|
last PARSE_PACKET unless $rbuff_len >= 2; |
385
|
|
|
|
|
|
|
|
386
|
287
|
50
|
|
|
|
787
|
unless ($packet_type) { |
387
|
|
|
|
|
|
|
|
388
|
287
|
|
|
|
|
506
|
$packet_len = 0; |
389
|
287
|
|
|
|
|
471
|
$mult = 1; |
390
|
287
|
|
|
|
|
466
|
$offs = 1; |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
PARSE_LEN: { |
393
|
287
|
|
|
|
|
427
|
$byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 ); |
|
583
|
|
|
|
|
2168
|
|
394
|
583
|
|
|
|
|
2044
|
$packet_len += ($byte & 0x7f) * $mult; |
395
|
583
|
100
|
|
|
|
1830
|
last unless ($byte & 0x80); |
396
|
296
|
50
|
|
|
|
713
|
last PARSE_PACKET if ($offs >= $rbuff_len); # Not enough data |
397
|
296
|
|
|
|
|
422
|
$mult *= 128; |
398
|
296
|
50
|
|
|
|
706
|
redo if ($offs < 5); |
399
|
|
|
|
|
|
|
} |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
#TODO: Check max packet size |
402
|
|
|
|
|
|
|
|
403
|
287
|
|
|
|
|
1171
|
$byte = unpack('C', substr( $fh->{rbuf}, 0, 1 )); |
404
|
287
|
|
|
|
|
803
|
$packet_type = $byte >> 4; |
405
|
287
|
|
|
|
|
804
|
$packet_flags = $byte & 0x0F; |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
|
408
|
287
|
100
|
|
|
|
1068
|
if ($rbuff_len < ($offs + $packet_len)) { |
409
|
|
|
|
|
|
|
# Not enough data |
410
|
95
|
|
|
|
|
210
|
last PARSE_PACKET; |
411
|
|
|
|
|
|
|
} |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
# Consume packet from buffer |
414
|
192
|
|
|
|
|
14075
|
my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), ''); |
415
|
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
# Trim fixed header from packet |
417
|
192
|
|
|
|
|
613
|
substr($packet, 0, $offs, ''); |
418
|
|
|
|
|
|
|
|
419
|
192
|
100
|
|
|
|
1152
|
if ($packet_type == MQTT_PUBLISH) { |
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
420
|
|
|
|
|
|
|
|
421
|
84
|
|
|
|
|
647
|
$self->_receive_publish(\$packet, $packet_flags); |
422
|
|
|
|
|
|
|
} |
423
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PUBACK) { |
424
|
|
|
|
|
|
|
|
425
|
70
|
|
|
|
|
537
|
$self->_receive_puback(\$packet); |
426
|
|
|
|
|
|
|
} |
427
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PUBREC) { |
428
|
|
|
|
|
|
|
|
429
|
0
|
|
|
|
|
0
|
$self->_receive_pubrec(\$packet); |
430
|
|
|
|
|
|
|
} |
431
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PUBREL) { |
432
|
|
|
|
|
|
|
|
433
|
0
|
|
|
|
|
0
|
$self->_receive_pubrel(\$packet); |
434
|
|
|
|
|
|
|
} |
435
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PUBCOMP) { |
436
|
|
|
|
|
|
|
|
437
|
0
|
|
|
|
|
0
|
$self->_receive_pubcomp(\$packet); |
438
|
|
|
|
|
|
|
} |
439
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PINGREQ) { |
440
|
|
|
|
|
|
|
|
441
|
0
|
|
|
|
|
0
|
$self->pingresp; |
442
|
|
|
|
|
|
|
} |
443
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_PINGRESP) { |
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
# Client takes no action on receiving PINGRESP |
446
|
|
|
|
|
|
|
} |
447
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_SUBACK) { |
448
|
|
|
|
|
|
|
|
449
|
19
|
|
|
|
|
136
|
$self->_receive_suback(\$packet); |
450
|
|
|
|
|
|
|
} |
451
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_UNSUBACK) { |
452
|
|
|
|
|
|
|
|
453
|
0
|
|
|
|
|
0
|
$self->_receive_unsuback(\$packet); |
454
|
|
|
|
|
|
|
} |
455
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_CONNACK) { |
456
|
|
|
|
|
|
|
|
457
|
19
|
|
|
|
|
126
|
$self->_receive_connack(\$packet); |
458
|
|
|
|
|
|
|
} |
459
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_DISCONNECT) { |
460
|
|
|
|
|
|
|
|
461
|
0
|
|
|
|
|
0
|
$self->_receive_disconnect(\$packet); |
462
|
|
|
|
|
|
|
} |
463
|
|
|
|
|
|
|
elsif ($packet_type == MQTT_AUTH) { |
464
|
|
|
|
|
|
|
|
465
|
0
|
|
|
|
|
0
|
$self->_receive_auth(\$packet); |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
else { |
468
|
|
|
|
|
|
|
# Protocol error |
469
|
0
|
|
|
|
|
0
|
$self->_fatal("Received packet with unknown type $packet_type"); |
470
|
|
|
|
|
|
|
} |
471
|
|
|
|
|
|
|
|
472
|
|
|
|
|
|
|
# Prepare for next frame |
473
|
192
|
|
|
|
|
17797
|
undef $packet_type; |
474
|
|
|
|
|
|
|
|
475
|
|
|
|
|
|
|
# Handle could have been destroyed at this point |
476
|
192
|
50
|
|
|
|
4303
|
redo PARSE_PACKET if defined $fh->{rbuf}; |
477
|
|
|
|
|
|
|
} |
478
|
|
|
|
|
|
|
|
479
|
272
|
50
|
|
|
|
792
|
if (defined $timing_packets) { |
480
|
272
|
|
|
|
|
5811
|
$BUSY_TIME += Time::HiRes::time - $BUSY_SINCE; |
481
|
272
|
|
|
|
|
963
|
undef $BUSY_SINCE; |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
}, |
484
|
19
|
50
|
|
|
|
675
|
); |
485
|
|
|
|
|
|
|
|
486
|
19
|
|
|
|
|
13514
|
1; |
487
|
|
|
|
|
|
|
} |
488
|
|
|
|
|
|
|
|
489
|
|
|
|
|
|
|
sub _send_connect { |
490
|
19
|
|
|
19
|
|
66
|
my ($self) = @_; |
491
|
|
|
|
|
|
|
|
492
|
19
|
|
|
|
|
41
|
my %prop = %{$self->{config}}; |
|
19
|
|
|
|
|
137
|
|
493
|
|
|
|
|
|
|
|
494
|
19
|
|
|
|
|
72
|
my $username = delete $prop{'username'}; |
495
|
19
|
|
|
|
|
50
|
my $password = delete $prop{'password'}; |
496
|
19
|
|
|
|
|
53
|
my $client_id = delete $prop{'client_id'}; |
497
|
19
|
|
|
|
|
45
|
my $clean_start = delete $prop{'clean_start'}; |
498
|
19
|
|
|
|
|
41
|
my $keep_alive = delete $prop{'keep_alive'}; |
499
|
19
|
|
|
|
|
70
|
my $will = delete $prop{'will'}; |
500
|
|
|
|
|
|
|
|
501
|
19
|
50
|
|
|
|
619
|
unless ($client_id) { |
502
|
19
|
|
|
|
|
76
|
$client_id = ''; |
503
|
19
|
|
|
|
|
484
|
$client_id .= ('0'..'9','a'..'z','A'..'Z')[rand 62] for (1..22); |
504
|
|
|
|
|
|
|
} |
505
|
|
|
|
|
|
|
|
506
|
19
|
|
|
|
|
63
|
$self->{client_id} = $client_id; |
507
|
|
|
|
|
|
|
|
508
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
# 3.1.2.11 Properties |
510
|
|
|
|
|
|
|
|
511
|
19
|
|
|
|
|
378
|
my $raw_prop = ''; |
512
|
|
|
|
|
|
|
|
513
|
19
|
100
|
|
|
|
106
|
if (exists $prop{'session_expiry_interval'}) { |
514
|
|
|
|
|
|
|
# 3.1.2.11.2 Session Expiry Interval (long int) |
515
|
6
|
|
|
|
|
49
|
$raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $prop{'session_expiry_interval'}); |
516
|
|
|
|
|
|
|
} |
517
|
|
|
|
|
|
|
|
518
|
19
|
100
|
|
|
|
67
|
if (exists $prop{'receive_maximum'}) { |
519
|
|
|
|
|
|
|
# 3.1.2.11.3 Receive Maximum (short int) |
520
|
8
|
|
|
|
|
34
|
$raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $prop{'receive_maximum'}); |
521
|
|
|
|
|
|
|
} |
522
|
|
|
|
|
|
|
|
523
|
19
|
50
|
|
|
|
60
|
if (exists $prop{'maximum_packet_size'}) { |
524
|
|
|
|
|
|
|
# 3.1.2.11.4 Maximum Packet Size (long int) |
525
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $prop{'maximum_packet_size'}); |
526
|
|
|
|
|
|
|
} |
527
|
|
|
|
|
|
|
|
528
|
19
|
100
|
|
|
|
67
|
if (exists $prop{'topic_alias_maximum'}) { |
529
|
|
|
|
|
|
|
# 3.1.2.11.5 Topic Alias Maximum (short int) |
530
|
6
|
|
|
|
|
23
|
$raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $prop{'topic_alias_maximum'}); |
531
|
|
|
|
|
|
|
} |
532
|
|
|
|
|
|
|
|
533
|
19
|
50
|
|
|
|
62
|
if (exists $prop{'request_response_information'}) { |
534
|
|
|
|
|
|
|
# 3.1.2.11.6 Request Response Information (byte) |
535
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C C", MQTT_REQUEST_RESPONSE_INFORMATION, delete $prop{'request_response_information'}); |
536
|
|
|
|
|
|
|
} |
537
|
|
|
|
|
|
|
|
538
|
19
|
50
|
|
|
|
57
|
if (exists $prop{'request_problem_information'}) { |
539
|
|
|
|
|
|
|
# 3.1.2.11.7 Request Problem Information (byte) |
540
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C C", MQTT_REQUEST_PROBLEM_INFORMATION, delete $prop{'request_problem_information'}); |
541
|
|
|
|
|
|
|
} |
542
|
|
|
|
|
|
|
|
543
|
19
|
50
|
|
|
|
63
|
if (exists $prop{'authentication_method'}) { |
544
|
|
|
|
|
|
|
# 3.1.2.11.9 Authentication Method (utf8 string) |
545
|
0
|
|
|
|
|
0
|
utf8::encode( $prop{'authentication_method'} ); |
546
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $prop{'authentication_method'}); |
547
|
|
|
|
|
|
|
} |
548
|
|
|
|
|
|
|
|
549
|
19
|
50
|
|
|
|
76
|
if (exists $prop{'authentication_data'}) { |
550
|
|
|
|
|
|
|
# 3.1.2.11.10 Authentication Data (binary data) |
551
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n", MQTT_AUTHENTICATION_DATA, delete $prop{'authentication_data'}); |
552
|
|
|
|
|
|
|
} |
553
|
|
|
|
|
|
|
|
554
|
19
|
|
|
|
|
117
|
foreach my $key (keys %prop) { |
555
|
|
|
|
|
|
|
# 3.1.2.11.8 User Property (utf8 string pair) |
556
|
38
|
|
|
|
|
127
|
my $val = $prop{$key}; |
557
|
38
|
50
|
|
|
|
106
|
next unless defined $val; |
558
|
38
|
|
|
|
|
159
|
utf8::encode( $key ); |
559
|
38
|
|
|
|
|
100
|
utf8::encode( $val ); |
560
|
38
|
|
|
|
|
197
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
561
|
|
|
|
|
|
|
} |
562
|
|
|
|
|
|
|
|
563
|
|
|
|
|
|
|
|
564
|
|
|
|
|
|
|
# 3.1.2 Variable Header |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
# 3.1.2.1 Protocol Name (utf8 string) |
567
|
19
|
|
|
|
|
66
|
my $raw_mqtt = pack("n/a*", "MQTT"); |
568
|
|
|
|
|
|
|
|
569
|
|
|
|
|
|
|
# 3.1.2.2 Protocol Version (byte) |
570
|
19
|
|
|
|
|
36
|
$raw_mqtt .= pack("C", 5); |
571
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
# 3.1.2.3 Connect Flags (byte) |
573
|
19
|
|
|
|
|
36
|
my $flags = 0; |
574
|
19
|
100
|
|
|
|
76
|
$flags |= 0x02 if $clean_start; # 3.1.2.4 Clean Start |
575
|
19
|
50
|
|
|
|
64
|
$flags |= 0x80 if defined $username; # 3.1.2.8 User Name Flag |
576
|
19
|
50
|
|
|
|
52
|
$flags |= 0x40 if defined $password; # 3.1.2.9 Password Flag |
577
|
|
|
|
|
|
|
|
578
|
19
|
50
|
|
|
|
145
|
if ($will) { |
579
|
0
|
|
|
|
|
0
|
$flags |= 0x04; # 3.1.2.5 Will Flag |
580
|
0
|
|
|
|
|
0
|
$flags |= $will->{'qos'} << 3; # 3.1.2.6 Will QoS |
581
|
0
|
0
|
|
|
|
0
|
$flags |= 0x20 if $will->{'retain'}; # 3.1.2.7 Will Retain |
582
|
|
|
|
|
|
|
} |
583
|
|
|
|
|
|
|
|
584
|
19
|
|
|
|
|
45
|
$raw_mqtt .= pack("C", $flags); |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
# 3.1.2.10 Keep Alive (short int) |
587
|
19
|
|
50
|
|
|
157
|
$raw_mqtt .= pack("n", $keep_alive || 0); |
588
|
|
|
|
|
|
|
|
589
|
|
|
|
|
|
|
# 3.1.2.11 Properties |
590
|
19
|
|
|
|
|
80
|
$raw_mqtt .= _encode_var_int(length $raw_prop); |
591
|
19
|
|
|
|
|
63
|
$raw_mqtt .= $raw_prop; |
592
|
|
|
|
|
|
|
|
593
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
# 3.1.3 Payload |
595
|
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
# 3.1.3.1 Client Identifier (utf8 string) |
597
|
19
|
|
|
|
|
69
|
$raw_mqtt .= pack("n/a*", $client_id); |
598
|
|
|
|
|
|
|
|
599
|
19
|
50
|
|
|
|
53
|
if ($will) { |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
#TODO: 3.1.3.2 Will Properties |
602
|
0
|
|
|
|
|
0
|
my $will_prop = ''; |
603
|
|
|
|
|
|
|
|
604
|
0
|
|
|
|
|
0
|
$raw_mqtt .= _encode_var_int(length $will_prop); |
605
|
0
|
|
|
|
|
0
|
$raw_mqtt .= $will_prop; |
606
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
# 3.1.3.3 Will Topic (utf8 string) |
608
|
0
|
|
|
|
|
0
|
utf8::encode( $will->{'topic'}); |
609
|
0
|
|
|
|
|
0
|
$raw_mqtt .= pack("n/a*", $will->{'topic'}); |
610
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
# 3.1.3.4 Will Payload (binary data) |
612
|
0
|
|
|
|
|
0
|
$raw_mqtt .= pack("n/a*", $will->{'payload'}); |
613
|
|
|
|
|
|
|
} |
614
|
|
|
|
|
|
|
|
615
|
19
|
50
|
|
|
|
60
|
if (defined $username) { |
616
|
|
|
|
|
|
|
# 3.1.3.5 Username (utf8 string) |
617
|
19
|
|
|
|
|
69
|
utf8::encode( $username ); |
618
|
19
|
|
|
|
|
69
|
$raw_mqtt .= pack("n/a*", $username); |
619
|
|
|
|
|
|
|
} |
620
|
|
|
|
|
|
|
|
621
|
19
|
50
|
|
|
|
56
|
if (defined $password) { |
622
|
|
|
|
|
|
|
# 3.1.3.6 Password (binary data) |
623
|
19
|
|
|
|
|
78
|
$raw_mqtt .= pack("n/a*", $password); |
624
|
|
|
|
|
|
|
} |
625
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
$self->{handle}->push_write( |
627
|
19
|
|
|
|
|
88
|
pack("C", MQTT_CONNECT << 4) . # 3.1.1 Packet type |
628
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.1.1 Packet length |
629
|
|
|
|
|
|
|
$raw_mqtt |
630
|
|
|
|
|
|
|
); |
631
|
|
|
|
|
|
|
} |
632
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
sub _receive_connack { |
634
|
19
|
|
|
19
|
|
78
|
my ($self, $packet) = @_; |
635
|
|
|
|
|
|
|
|
636
|
19
|
|
|
|
|
48
|
my $prop = $self->{server_prop}; |
637
|
19
|
|
|
|
|
42
|
my $offs = 0; |
638
|
|
|
|
|
|
|
|
639
|
|
|
|
|
|
|
# 3.2.2.1 Acknowledge flags (byte) |
640
|
19
|
|
|
|
|
116
|
my $ack_flags = _decode_byte($packet, \$offs); |
641
|
19
|
|
|
|
|
141
|
$prop->{'session_present'} = $ack_flags & 0x01; |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
# 3.2.2.2 Reason code (byte) |
644
|
19
|
|
|
|
|
64
|
my $reason_code = _decode_byte($packet, \$offs); |
645
|
19
|
|
|
|
|
82
|
$prop->{'reason_code'} = $reason_code; |
646
|
19
|
|
|
|
|
129
|
$prop->{'reason'} = $Reason_code{$reason_code}; |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
# 3.2.2.3.1 Properties Length (variable length int) |
649
|
19
|
|
|
|
|
85
|
my $prop_len = _decode_var_int($packet, \$offs); |
650
|
19
|
|
|
|
|
70
|
my $prop_end = $offs + $prop_len; |
651
|
|
|
|
|
|
|
|
652
|
19
|
|
|
|
|
82
|
while ($offs < $prop_end) { |
653
|
|
|
|
|
|
|
|
654
|
19
|
|
|
|
|
48
|
my $prop_id = _decode_byte($packet, \$offs); |
655
|
|
|
|
|
|
|
|
656
|
19
|
50
|
|
|
|
160
|
if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) { |
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
657
|
|
|
|
|
|
|
# 3.2.2.3.2 Session Expiry Interval (long int) |
658
|
0
|
|
|
|
|
0
|
$prop->{'session_expiry_interval'} = _decode_int_32($packet, \$offs); |
659
|
|
|
|
|
|
|
} |
660
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) { |
661
|
|
|
|
|
|
|
# 3.2.2.3.3 Receive Maximum (short int) |
662
|
0
|
|
|
|
|
0
|
$prop->{'receive_maximum'} = _decode_int_16($packet, \$offs); |
663
|
|
|
|
|
|
|
} |
664
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_MAXIMUM_QOS) { |
665
|
|
|
|
|
|
|
# 3.2.2.3.4 Maximum QoS (byte) |
666
|
19
|
|
|
|
|
59
|
$prop->{'maximum_qos'} = _decode_byte($packet, \$offs); |
667
|
|
|
|
|
|
|
} |
668
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_RETAIN_AVAILABLE) { |
669
|
|
|
|
|
|
|
# 3.2.2.3.5 Retain Available (byte) |
670
|
0
|
|
|
|
|
0
|
$prop->{'retain_available'} = _decode_byte($packet, \$offs); |
671
|
|
|
|
|
|
|
} |
672
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) { |
673
|
|
|
|
|
|
|
# 3.2.2.3.6 Maximum Packet Size (long int) |
674
|
0
|
|
|
|
|
0
|
$prop->{'maximum_packet_size'} = _decode_int_32($packet, \$offs); |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_ASSIGNED_CLIENT_IDENTIFIER) { |
677
|
|
|
|
|
|
|
# 3.2.2.3.7 Assigned Client Identifier (utf8 string) |
678
|
0
|
|
|
|
|
0
|
$prop->{'assigned_client_identifier'} = _decode_utf8_str($packet, \$offs); |
679
|
|
|
|
|
|
|
} |
680
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) { |
681
|
|
|
|
|
|
|
# 3.2.2.3.8 Topic Alias Maximum (short int) |
682
|
0
|
|
|
|
|
0
|
$prop->{'topic_alias_maximum'} = _decode_int_16($packet, \$offs); |
683
|
|
|
|
|
|
|
} |
684
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_REASON_STRING) { |
685
|
|
|
|
|
|
|
# 3.2.2.3.9 Reason String (utf8 string) |
686
|
0
|
|
|
|
|
0
|
$prop->{'reason_string'} = _decode_utf8_str($packet, \$offs); |
687
|
|
|
|
|
|
|
} |
688
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
689
|
|
|
|
|
|
|
# 3.2.2.3.10 User Property (utf8 string pair) |
690
|
0
|
|
|
|
|
0
|
my $key = _decode_utf8_str($packet, \$offs); |
691
|
0
|
|
|
|
|
0
|
my $val = _decode_utf8_str($packet, \$offs); |
692
|
0
|
|
|
|
|
0
|
$prop->{$key} = $val; |
693
|
|
|
|
|
|
|
} |
694
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE) { |
695
|
|
|
|
|
|
|
# 3.2.2.3.11 Wildcard Subscription Available (byte) |
696
|
0
|
|
|
|
|
0
|
$prop->{'wildcard_subscription_available'} = _decode_byte($packet, \$offs); |
697
|
|
|
|
|
|
|
} |
698
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE) { |
699
|
|
|
|
|
|
|
# 3.2.2.3.12 Subscription Identifiers Available (byte) |
700
|
0
|
|
|
|
|
0
|
$prop->{'subscription_identifier_available'} = _decode_byte($packet, \$offs); |
701
|
|
|
|
|
|
|
} |
702
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SHARED_SUBSCRIPTION_AVAILABLE) { |
703
|
|
|
|
|
|
|
# 3.2.2.3.13 Shared Subscription Available (byte) |
704
|
0
|
|
|
|
|
0
|
$prop->{'shared_subscription_available'} = _decode_byte($packet, \$offs); |
705
|
|
|
|
|
|
|
} |
706
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SERVER_KEEP_ALIVE) { |
707
|
|
|
|
|
|
|
# 3.2.2.3.14 Server Keep Alive (short int) |
708
|
0
|
|
|
|
|
0
|
$prop->{'server_keep_alive'} = _decode_int_16($packet, \$offs); |
709
|
|
|
|
|
|
|
} |
710
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_RESPONSE_INFORMATION) { |
711
|
|
|
|
|
|
|
# 3.2.2.3.15 Response Information (utf8 string) |
712
|
0
|
|
|
|
|
0
|
$prop->{'response_information'} = _decode_utf8_str($packet, \$offs); |
713
|
|
|
|
|
|
|
} |
714
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SERVER_REFERENCE) { |
715
|
|
|
|
|
|
|
# 3.2.2.3.16 Server Reference (utf8 string) |
716
|
0
|
|
|
|
|
0
|
$prop->{'server_reference'} = _decode_utf8_str($packet, \$offs); |
717
|
|
|
|
|
|
|
} |
718
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) { |
719
|
|
|
|
|
|
|
# 3.2.2.3.17 Authentication Method (utf8 string) |
720
|
0
|
|
|
|
|
0
|
$prop->{'authentication_method'} = _decode_utf8_str($packet, \$offs); |
721
|
|
|
|
|
|
|
} |
722
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_AUTHENTICATION_DATA) { |
723
|
|
|
|
|
|
|
# 3.2.2.3.18 Authentication Data (binary data) |
724
|
0
|
|
|
|
|
0
|
$prop->{'authentication_data'} = _decode_binary_data($packet, \$offs); |
725
|
|
|
|
|
|
|
} |
726
|
|
|
|
|
|
|
else { |
727
|
|
|
|
|
|
|
# Protocol error |
728
|
0
|
|
|
|
|
0
|
$self->_fatal("Received CONNACK with unknown property $prop_id"); |
729
|
|
|
|
|
|
|
} |
730
|
|
|
|
|
|
|
} |
731
|
|
|
|
|
|
|
|
732
|
19
|
|
|
|
|
4147
|
my $success = ($reason_code == 0x00); |
733
|
|
|
|
|
|
|
|
734
|
19
|
50
|
|
|
|
76
|
unless ( $success ) { |
735
|
|
|
|
|
|
|
# Server will close the connection |
736
|
|
|
|
|
|
|
# warn "Served refused CONNACK: $reason"; |
737
|
|
|
|
|
|
|
#TODO: handle |
738
|
|
|
|
|
|
|
} |
739
|
|
|
|
|
|
|
|
740
|
19
|
|
|
|
|
8794
|
$self->{is_connected} = 1; |
741
|
19
|
|
|
|
|
140
|
$self->{timeout_tmr} = undef; |
742
|
19
|
|
|
|
|
50
|
$self->{reconnect_tmr} = undef; |
743
|
19
|
|
|
|
|
42
|
$self->{connect_err} = undef; |
744
|
|
|
|
|
|
|
|
745
|
|
|
|
|
|
|
#TODO: ... blocking connection |
746
|
19
|
50
|
|
|
|
4978
|
$self->{connect_cv}->send if $self->{connect_cv}; |
747
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
# Execute CONNACK callback |
749
|
19
|
|
|
|
|
233
|
my $connack_cb = $self->{connack_cb}; |
750
|
19
|
50
|
|
|
|
4571
|
$connack_cb->($success, $prop) if $connack_cb; |
751
|
|
|
|
|
|
|
} |
752
|
|
|
|
|
|
|
|
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
sub disconnect { |
755
|
13
|
|
|
13
|
1
|
19888628
|
my ($self, %args) = @_; |
756
|
|
|
|
|
|
|
|
757
|
13
|
50
|
|
|
|
70
|
unless (defined $self->{handle}) { |
758
|
0
|
|
|
|
|
0
|
carp "Already disconnected from MQTT broker"; |
759
|
0
|
|
|
|
|
0
|
return; |
760
|
|
|
|
|
|
|
} |
761
|
|
|
|
|
|
|
|
762
|
13
|
|
|
|
|
67
|
my $reason_code = delete $args{'reason_code'}; |
763
|
|
|
|
|
|
|
|
764
|
|
|
|
|
|
|
# 3.14.2.2 Properties |
765
|
|
|
|
|
|
|
|
766
|
13
|
|
|
|
|
61
|
my $raw_prop = ''; |
767
|
|
|
|
|
|
|
|
768
|
13
|
50
|
|
|
|
71
|
if (exists $args{'session_expiry_interval'}) { |
769
|
|
|
|
|
|
|
# 3.14.2.2.2 Session Expiry Interval (long int) |
770
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'session_expiry_interval'} ); |
771
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'}); |
772
|
|
|
|
|
|
|
} |
773
|
|
|
|
|
|
|
|
774
|
13
|
50
|
|
|
|
49
|
if (exists $args{'reason_string'}) { |
775
|
|
|
|
|
|
|
# 3.14.2.2.3 Reason String (utf8 string) |
776
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'reason_string'} ); |
777
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'}); |
778
|
|
|
|
|
|
|
} |
779
|
|
|
|
|
|
|
|
780
|
13
|
50
|
|
|
|
49
|
if (exists $args{'server_reference'}) { |
781
|
|
|
|
|
|
|
# 3.14.2.2.5 Server Reference (utf8 string) |
782
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'server_reference'} ); |
783
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'}); |
784
|
|
|
|
|
|
|
} |
785
|
|
|
|
|
|
|
|
786
|
13
|
|
|
|
|
83
|
foreach my $key (keys %args) { |
787
|
|
|
|
|
|
|
# 3.14.2.2.4 User Property (utf8 string pair) |
788
|
0
|
|
|
|
|
0
|
my $val = $args{$key}; |
789
|
0
|
0
|
|
|
|
0
|
next unless defined $val; |
790
|
0
|
|
|
|
|
0
|
utf8::encode( $key ); |
791
|
0
|
|
|
|
|
0
|
utf8::encode( $val ); |
792
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
793
|
|
|
|
|
|
|
} |
794
|
|
|
|
|
|
|
|
795
|
|
|
|
|
|
|
# 3.14.2 Variable Header |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
# 3.14.2.1 Disconnect Reason Code (byte) |
798
|
13
|
|
50
|
|
|
171
|
my $raw_mqtt = pack("C", $reason_code || 0); |
799
|
|
|
|
|
|
|
|
800
|
|
|
|
|
|
|
# 3.14.2.2 Properties |
801
|
13
|
|
|
|
|
75
|
$raw_mqtt .= _encode_var_int(length $raw_prop); |
802
|
13
|
|
|
|
|
45
|
$raw_mqtt .= $raw_prop; |
803
|
|
|
|
|
|
|
|
804
|
|
|
|
|
|
|
$self->{handle}->push_write( |
805
|
13
|
|
|
|
|
98
|
pack("C", MQTT_DISCONNECT << 4) . # 3.14.1 Packet type |
806
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.14.1 Packet length |
807
|
|
|
|
|
|
|
$raw_mqtt |
808
|
|
|
|
|
|
|
); |
809
|
|
|
|
|
|
|
|
810
|
13
|
|
|
|
|
8875
|
$self->_reset_connection; |
811
|
|
|
|
|
|
|
} |
812
|
|
|
|
|
|
|
|
813
|
|
|
|
|
|
|
sub _reset_connection { |
814
|
13
|
|
|
13
|
|
59
|
my ($self) = @_; |
815
|
|
|
|
|
|
|
|
816
|
13
|
|
|
|
|
105
|
$self->{handle} = undef; |
817
|
|
|
|
|
|
|
|
818
|
13
|
|
|
|
|
5712
|
$self->{is_connected} = undef; |
819
|
13
|
|
|
|
|
37
|
$self->{reconnect_tmr} = undef; |
820
|
13
|
|
|
|
|
34
|
$self->{timeout_tmr} = undef; |
821
|
13
|
|
|
|
|
29
|
$self->{connect_err} = undef; |
822
|
|
|
|
|
|
|
|
823
|
13
|
|
|
|
|
94
|
$self->{server_prop} = {}; |
824
|
13
|
|
|
|
|
54
|
$self->{server_alias} = {}; |
825
|
13
|
|
|
|
|
34
|
$self->{client_alias} = {}; |
826
|
13
|
|
|
|
|
53
|
$self->{subscriptions} = {}; |
827
|
13
|
|
|
|
|
130
|
$self->{subscr_cb} = {}; |
828
|
13
|
|
|
|
|
43
|
$self->{packet_cb} = {}; |
829
|
13
|
|
|
|
|
41
|
$self->{buffers} = {}; |
830
|
13
|
|
|
|
|
38
|
$self->{packet_seq} = 1; |
831
|
13
|
|
|
|
|
38
|
$self->{subscr_seq} = 1; |
832
|
13
|
|
|
|
|
29
|
$self->{alias_seq} = 1; |
833
|
13
|
|
|
|
|
172
|
$self->{use_alias} = 0; |
834
|
|
|
|
|
|
|
} |
835
|
|
|
|
|
|
|
|
836
|
|
|
|
|
|
|
sub _receive_disconnect { |
837
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
838
|
|
|
|
|
|
|
|
839
|
|
|
|
|
|
|
# Handle abbreviated packet |
840
|
0
|
0
|
|
|
|
0
|
$$packet = "\x00\x00" if (length $$packet == 0); |
841
|
0
|
0
|
|
|
|
0
|
$$packet .= "\x00" if (length $$packet == 1); |
842
|
|
|
|
|
|
|
|
843
|
|
|
|
|
|
|
# 3.14.2.1 Reason Code (byte) |
844
|
0
|
|
|
|
|
0
|
my $offs = 0; |
845
|
0
|
|
|
|
|
0
|
my $reason_code = _decode_byte($packet, \$offs); |
846
|
0
|
|
|
|
|
0
|
my $reason = $Disconnect_reason_code{$reason_code}; |
847
|
|
|
|
|
|
|
|
848
|
|
|
|
|
|
|
# 3.14.2.2.1 Property Length (variable length int) |
849
|
0
|
|
|
|
|
0
|
my $prop_len = _decode_var_int($packet, \$offs); |
850
|
0
|
|
|
|
|
0
|
my $prop_end = $offs + $prop_len; |
851
|
|
|
|
|
|
|
|
852
|
0
|
|
|
|
|
0
|
my %prop = ( |
853
|
|
|
|
|
|
|
'reason_code' => $reason_code, |
854
|
|
|
|
|
|
|
'reason' => $reason, |
855
|
|
|
|
|
|
|
); |
856
|
|
|
|
|
|
|
|
857
|
0
|
|
|
|
|
0
|
while ($offs < $prop_end) { |
858
|
|
|
|
|
|
|
|
859
|
0
|
|
|
|
|
0
|
my $prop_id = _decode_byte($packet, \$offs); |
860
|
|
|
|
|
|
|
|
861
|
0
|
0
|
|
|
|
0
|
if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
862
|
|
|
|
|
|
|
# 3.14.2.2.2 Session Expiry Interval (long int) |
863
|
0
|
|
|
|
|
0
|
$prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs); |
864
|
|
|
|
|
|
|
} |
865
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_REASON_STRING) { |
866
|
|
|
|
|
|
|
# 3.14.2.2.3 Reason String (utf8 string) |
867
|
0
|
|
|
|
|
0
|
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs); |
868
|
|
|
|
|
|
|
} |
869
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
870
|
|
|
|
|
|
|
# 3.14.2.2.4 User Property (utf8 string pair) |
871
|
0
|
|
|
|
|
0
|
my $key = _decode_utf8_str($packet, \$offs); |
872
|
0
|
|
|
|
|
0
|
my $val = _decode_utf8_str($packet, \$offs); |
873
|
0
|
|
|
|
|
0
|
$prop{$key} = $val; |
874
|
|
|
|
|
|
|
} |
875
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SERVER_REFERENCE) { |
876
|
|
|
|
|
|
|
# 3.14.2.2.5 Server Reference (utf8 string) |
877
|
0
|
|
|
|
|
0
|
$prop{'server_reference'} = _decode_utf8_str($packet, \$offs); |
878
|
|
|
|
|
|
|
} |
879
|
|
|
|
|
|
|
else { |
880
|
|
|
|
|
|
|
# Protocol error |
881
|
0
|
|
|
|
|
0
|
$self->_fatal("Received DISCONNECT with unknown property $prop_id"); |
882
|
|
|
|
|
|
|
} |
883
|
|
|
|
|
|
|
} |
884
|
|
|
|
|
|
|
|
885
|
0
|
|
|
|
|
0
|
$self->_reset_connection; |
886
|
|
|
|
|
|
|
|
887
|
0
|
|
|
|
|
0
|
my $disconn_cb = $self->{disconn_cb}; |
888
|
|
|
|
|
|
|
|
889
|
0
|
0
|
|
|
|
0
|
if ($disconn_cb) { |
890
|
0
|
|
|
|
|
0
|
$disconn_cb->(\%prop); |
891
|
|
|
|
|
|
|
} |
892
|
|
|
|
|
|
|
else { |
893
|
0
|
|
|
|
|
0
|
$self->_fatal("Disconnected from MQTT broker: $prop{reason}"); |
894
|
|
|
|
|
|
|
} |
895
|
|
|
|
|
|
|
} |
896
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
sub pingreq { |
899
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
900
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
$self->{handle}->push_write( |
902
|
0
|
|
|
|
|
0
|
pack( "C C", |
903
|
|
|
|
|
|
|
MQTT_PINGREQ << 4, # 3.12.1 Packet type |
904
|
|
|
|
|
|
|
0, # 3.12.1 Remaining length |
905
|
|
|
|
|
|
|
) |
906
|
|
|
|
|
|
|
); |
907
|
|
|
|
|
|
|
} |
908
|
|
|
|
|
|
|
|
909
|
|
|
|
|
|
|
sub pingresp { |
910
|
0
|
|
|
0
|
0
|
0
|
my ($self) = @_; |
911
|
|
|
|
|
|
|
|
912
|
|
|
|
|
|
|
$self->{handle}->push_write( |
913
|
0
|
|
|
|
|
0
|
pack( "C C", |
914
|
|
|
|
|
|
|
MQTT_PINGRESP << 4, # 3.13.1 Packet type |
915
|
|
|
|
|
|
|
0, # 3.13.1 Remaining length |
916
|
|
|
|
|
|
|
) |
917
|
|
|
|
|
|
|
); |
918
|
|
|
|
|
|
|
} |
919
|
|
|
|
|
|
|
|
920
|
|
|
|
|
|
|
|
921
|
|
|
|
|
|
|
sub subscribe { |
922
|
19
|
|
|
19
|
1
|
2001545
|
my ($self, %args) = @_; |
923
|
|
|
|
|
|
|
|
924
|
19
|
|
|
|
|
74
|
my $topic = delete $args{'topic'}; |
925
|
19
|
|
|
|
|
50
|
my $topics = delete $args{'topics'}; |
926
|
19
|
|
|
|
|
45
|
my $subscr_cb = delete $args{'on_publish'}; |
927
|
19
|
|
|
|
|
37
|
my $suback_cb = delete $args{'on_suback'}; |
928
|
19
|
|
|
|
|
49
|
my $max_qos = delete $args{'maximum_qos'}; |
929
|
19
|
|
|
|
|
33
|
my $no_local = delete $args{'no_local'}; |
930
|
19
|
|
|
|
|
42
|
my $retain_asp = delete $args{'retain_as_published'}; |
931
|
19
|
|
|
|
|
66
|
my $retain_hdl = delete $args{'retain_handling'}; |
932
|
|
|
|
|
|
|
|
933
|
19
|
50
|
|
|
|
67
|
$topics = [] unless defined $topics; |
934
|
19
|
50
|
|
|
|
79
|
push (@$topics, $topic) if defined $topic; |
935
|
|
|
|
|
|
|
|
936
|
19
|
50
|
|
|
|
65
|
croak "Subscription topics were not specified" unless @$topics; |
937
|
19
|
50
|
|
|
|
56
|
croak "on_publish callback is required" unless $subscr_cb; |
938
|
|
|
|
|
|
|
|
939
|
19
|
|
|
|
|
59
|
foreach my $topic (@$topics) { |
940
|
19
|
50
|
|
|
|
77
|
croak "Undefined subscription topic" unless defined $topic; |
941
|
19
|
50
|
|
|
|
72
|
croak "Empty subscription topic" unless length $topic; |
942
|
|
|
|
|
|
|
} |
943
|
|
|
|
|
|
|
|
944
|
19
|
|
|
|
|
59
|
my $packet_id = $self->{packet_seq}++; |
945
|
19
|
50
|
|
|
|
60
|
$self->{packet_seq} = 1 if $packet_id == 0xFFFF; |
946
|
|
|
|
|
|
|
|
947
|
|
|
|
|
|
|
# Set callback for incomings PUBLISH |
948
|
19
|
|
|
|
|
43
|
my $subscr_id = $self->{subscr_seq}++; |
949
|
19
|
|
|
|
|
82
|
$self->{subscr_cb}->{$subscr_id} = $subscr_cb; |
950
|
|
|
|
|
|
|
|
951
|
|
|
|
|
|
|
# Parameters for expected SUBACK |
952
|
19
|
|
|
|
|
144
|
$self->{packet_cb}->{$packet_id} = { |
953
|
|
|
|
|
|
|
topics => [ @$topics ], # copy |
954
|
|
|
|
|
|
|
subscr_id => $subscr_id, |
955
|
|
|
|
|
|
|
suback_cb => $suback_cb, |
956
|
|
|
|
|
|
|
}; |
957
|
|
|
|
|
|
|
|
958
|
|
|
|
|
|
|
# 3.8.2.1.2 Subscription Identifier (variable len int) |
959
|
19
|
|
|
|
|
62
|
my $raw_prop = pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) . |
960
|
|
|
|
|
|
|
_encode_var_int($subscr_id); |
961
|
|
|
|
|
|
|
|
962
|
|
|
|
|
|
|
# 3.8.2.1.3 User Property (utf8 string pair) |
963
|
19
|
|
|
|
|
83
|
foreach my $key (keys %args) { |
964
|
0
|
|
|
|
|
0
|
my $val = $args{$key}; |
965
|
0
|
0
|
|
|
|
0
|
next unless defined $val; |
966
|
0
|
|
|
|
|
0
|
utf8::encode( $key ); |
967
|
0
|
|
|
|
|
0
|
utf8::encode( $val ); |
968
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
969
|
|
|
|
|
|
|
} |
970
|
|
|
|
|
|
|
|
971
|
19
|
|
|
|
|
81
|
my $raw_mqtt = pack("n", $packet_id) . # 3.8.2 Packet identifier |
972
|
|
|
|
|
|
|
_encode_var_int(length $raw_prop) . # 3.8.2.1 Properties Length |
973
|
|
|
|
|
|
|
$raw_prop; # 3.8.2.1 Properties |
974
|
|
|
|
|
|
|
|
975
|
|
|
|
|
|
|
# 3.8.3.1 Subscription Options |
976
|
19
|
|
|
|
|
56
|
my $options = 0; |
977
|
19
|
100
|
|
|
|
59
|
$options |= ($max_qos & 0x03) if $max_qos; # Maximum QoS |
978
|
19
|
50
|
|
|
|
44
|
$options |= 0x04 if $no_local; # No Local |
979
|
19
|
50
|
|
|
|
38
|
$options |= 0x08 if $retain_asp; # Retain As Published |
980
|
19
|
50
|
|
|
|
40
|
$options |= ($retain_hdl & 0x03) << 4 if $retain_hdl; # Retain Handling |
981
|
|
|
|
|
|
|
|
982
|
|
|
|
|
|
|
# 3.8.3 Payload |
983
|
19
|
|
|
|
|
45
|
foreach my $topic (@$topics) { |
984
|
19
|
|
|
|
|
66
|
utf8::encode( $topic ); |
985
|
19
|
|
|
|
|
101
|
$raw_mqtt .= pack("n/a* C", $topic, $options); |
986
|
|
|
|
|
|
|
} |
987
|
|
|
|
|
|
|
|
988
|
|
|
|
|
|
|
$self->{handle}->push_write( |
989
|
19
|
|
|
|
|
65
|
pack("C", MQTT_SUBSCRIBE << 4 | 0x02) . # 3.8.1 Packet type |
990
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.8.1 Packet length |
991
|
|
|
|
|
|
|
$raw_mqtt |
992
|
|
|
|
|
|
|
); |
993
|
|
|
|
|
|
|
|
994
|
19
|
|
|
|
|
8744
|
1; |
995
|
|
|
|
|
|
|
} |
996
|
|
|
|
|
|
|
|
997
|
|
|
|
|
|
|
sub _receive_suback { |
998
|
19
|
|
|
19
|
|
60
|
my ($self, $packet) = @_; |
999
|
|
|
|
|
|
|
|
1000
|
|
|
|
|
|
|
# 3.9.2 Packet id (short int) |
1001
|
19
|
|
|
|
|
35
|
my $offs = 0; |
1002
|
19
|
|
|
|
|
78
|
my $packet_id = _decode_int_16($packet, \$offs); |
1003
|
|
|
|
|
|
|
|
1004
|
|
|
|
|
|
|
# 3.9.2.1.1 Property Length (variable length int) |
1005
|
19
|
|
|
|
|
75
|
my $prop_len = _decode_var_int($packet, \$offs); |
1006
|
19
|
|
|
|
|
90
|
my $prop_end = $offs + $prop_len; |
1007
|
19
|
|
|
|
|
63
|
my %prop; |
1008
|
|
|
|
|
|
|
|
1009
|
19
|
|
|
|
|
82
|
while ($offs < $prop_end) { |
1010
|
|
|
|
|
|
|
|
1011
|
0
|
|
|
|
|
0
|
my $prop_id = _decode_byte($packet, \$offs); |
1012
|
|
|
|
|
|
|
|
1013
|
0
|
0
|
|
|
|
0
|
if ($prop_id == MQTT_REASON_STRING) { |
|
|
0
|
|
|
|
|
|
1014
|
|
|
|
|
|
|
# 3.9.2.1.2 Reason String (utf8 string) |
1015
|
0
|
|
|
|
|
0
|
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs); |
1016
|
|
|
|
|
|
|
} |
1017
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
1018
|
|
|
|
|
|
|
# 3.9.2.1.3 User Property (utf8 string pair) |
1019
|
0
|
|
|
|
|
0
|
my $key = _decode_utf8_str($packet, \$offs); |
1020
|
0
|
|
|
|
|
0
|
my $val = _decode_utf8_str($packet, \$offs); |
1021
|
0
|
|
|
|
|
0
|
$prop{$key} = $val; |
1022
|
|
|
|
|
|
|
} |
1023
|
|
|
|
|
|
|
else { |
1024
|
|
|
|
|
|
|
# Protocol error |
1025
|
0
|
|
|
|
|
0
|
$self->_fatal("Received SUBACK with unexpected property $prop_id"); |
1026
|
|
|
|
|
|
|
} |
1027
|
|
|
|
|
|
|
} |
1028
|
|
|
|
|
|
|
|
1029
|
|
|
|
|
|
|
# 3.9.3 Payload |
1030
|
19
|
|
|
|
|
120
|
my @reason_codes = unpack("C*", substr($$packet, $offs)); |
1031
|
|
|
|
|
|
|
|
1032
|
19
|
|
|
|
|
79
|
my $packet_cb = delete $self->{packet_cb}->{$packet_id}; |
1033
|
19
|
50
|
|
|
|
74
|
$self->_fatal("Received unexpected SUBACK") unless $packet_cb; |
1034
|
|
|
|
|
|
|
|
1035
|
19
|
|
|
|
|
40
|
my $topics = $packet_cb->{topics}; |
1036
|
19
|
|
|
|
|
40
|
my $suback_cb = $packet_cb->{suback_cb}; |
1037
|
19
|
|
|
|
|
42
|
my $subscr_id = $packet_cb->{subscr_id}; |
1038
|
|
|
|
|
|
|
|
1039
|
19
|
|
|
|
|
33
|
my $success = 1; |
1040
|
19
|
|
|
|
|
54
|
my @properties; |
1041
|
|
|
|
|
|
|
|
1042
|
19
|
|
|
|
|
82
|
foreach my $code (@reason_codes) { |
1043
|
|
|
|
|
|
|
|
1044
|
19
|
|
|
|
|
49
|
my $topic = shift @$topics; |
1045
|
19
|
|
|
|
|
82
|
my $reason = $Subscribe_reason_code{$code}; |
1046
|
19
|
|
|
|
|
39
|
my $granted_qos; |
1047
|
|
|
|
|
|
|
|
1048
|
19
|
50
|
|
|
|
67
|
if ($code <= 2) { |
1049
|
|
|
|
|
|
|
# Success |
1050
|
19
|
|
|
|
|
65
|
$granted_qos = $code; |
1051
|
19
|
|
|
|
|
99
|
$self->{subscriptions}->{$topic} = $subscr_id; |
1052
|
|
|
|
|
|
|
} |
1053
|
|
|
|
|
|
|
else { |
1054
|
|
|
|
|
|
|
# Failure |
1055
|
0
|
|
|
|
|
0
|
$success = 0; |
1056
|
0
|
|
|
|
|
0
|
$granted_qos = undef; |
1057
|
0
|
0
|
|
|
|
0
|
unless ($suback_cb) { |
1058
|
0
|
|
|
|
|
0
|
$self->_fatal("Subscription to topic '$topic' failed: $reason"); |
1059
|
|
|
|
|
|
|
} |
1060
|
|
|
|
|
|
|
} |
1061
|
|
|
|
|
|
|
|
1062
|
19
|
50
|
|
|
|
55
|
$DEBUG && warn "Subscribed to: $topic\n"; |
1063
|
|
|
|
|
|
|
|
1064
|
19
|
|
|
|
|
189
|
push @properties, { |
1065
|
|
|
|
|
|
|
topic => $topic, |
1066
|
|
|
|
|
|
|
reason_code => $code, |
1067
|
|
|
|
|
|
|
granted_qos => $granted_qos, |
1068
|
|
|
|
|
|
|
reason => $reason, |
1069
|
|
|
|
|
|
|
%prop |
1070
|
|
|
|
|
|
|
}; |
1071
|
|
|
|
|
|
|
} |
1072
|
|
|
|
|
|
|
|
1073
|
19
|
100
|
|
|
|
135
|
$suback_cb->($success, @properties) if $suback_cb; |
1074
|
|
|
|
|
|
|
} |
1075
|
|
|
|
|
|
|
|
1076
|
|
|
|
|
|
|
|
1077
|
|
|
|
|
|
|
sub unsubscribe { |
1078
|
0
|
|
|
0
|
1
|
0
|
my ($self, %args) = @_; |
1079
|
|
|
|
|
|
|
|
1080
|
0
|
|
|
|
|
0
|
my $topic = delete $args{'topic'}; |
1081
|
0
|
|
|
|
|
0
|
my $topics = delete $args{'topics'}; |
1082
|
0
|
|
|
|
|
0
|
my $unsuback_cb = delete $args{'on_unsuback'}; |
1083
|
|
|
|
|
|
|
|
1084
|
0
|
0
|
|
|
|
0
|
$topics = [] unless defined $topics; |
1085
|
0
|
0
|
|
|
|
0
|
push (@$topics, $topic) if defined $topic; |
1086
|
|
|
|
|
|
|
|
1087
|
0
|
0
|
|
|
|
0
|
croak "Unsubscription topics were not specified" unless @$topics; |
1088
|
0
|
0
|
|
|
|
0
|
croak "on_unsuback callback is required" unless $unsuback_cb; |
1089
|
|
|
|
|
|
|
|
1090
|
0
|
|
|
|
|
0
|
foreach my $topic (@$topics) { |
1091
|
0
|
0
|
|
|
|
0
|
croak "Undefined unsubscription topic" unless defined $topic; |
1092
|
0
|
0
|
|
|
|
0
|
croak "Empty unsubscription topic" unless length $topic; |
1093
|
|
|
|
|
|
|
} |
1094
|
|
|
|
|
|
|
|
1095
|
0
|
|
|
|
|
0
|
my $packet_id = $self->{packet_seq}++; |
1096
|
0
|
0
|
|
|
|
0
|
$self->{packet_seq} = 1 if $packet_id == 0xFFFF; |
1097
|
|
|
|
|
|
|
|
1098
|
|
|
|
|
|
|
# Set callback for UNSUBACK |
1099
|
0
|
|
|
|
|
0
|
$self->{packet_cb}->{$packet_id} = { |
1100
|
|
|
|
|
|
|
topics => [ @$topics ], # copy |
1101
|
|
|
|
|
|
|
unsuback_cb => $unsuback_cb, |
1102
|
|
|
|
|
|
|
}; |
1103
|
|
|
|
|
|
|
|
1104
|
|
|
|
|
|
|
# 3.10.2.1.2 User Property (utf8 string pair) |
1105
|
0
|
|
|
|
|
0
|
my $raw_prop = ''; |
1106
|
0
|
|
|
|
|
0
|
foreach my $key (keys %args) { |
1107
|
0
|
|
|
|
|
0
|
my $val = $args{$key}; |
1108
|
0
|
0
|
|
|
|
0
|
next unless defined $val; |
1109
|
0
|
|
|
|
|
0
|
utf8::encode( $key ); |
1110
|
0
|
|
|
|
|
0
|
utf8::encode( $val ); |
1111
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
1112
|
|
|
|
|
|
|
} |
1113
|
|
|
|
|
|
|
|
1114
|
0
|
|
|
|
|
0
|
my $raw_mqtt = pack("n", $packet_id) . # 3.10.2 Packet identifier |
1115
|
|
|
|
|
|
|
_encode_var_int(length $raw_prop) . # 3.10.2.1 Property Length |
1116
|
|
|
|
|
|
|
$raw_prop; # 3.10.2.1 Properties |
1117
|
|
|
|
|
|
|
|
1118
|
|
|
|
|
|
|
# 3.10.3 Payload |
1119
|
0
|
|
|
|
|
0
|
foreach my $topic (@$topics) { |
1120
|
0
|
|
|
|
|
0
|
utf8::encode($topic); |
1121
|
0
|
|
|
|
|
0
|
$raw_mqtt .= pack("n/a*", $topic); |
1122
|
|
|
|
|
|
|
} |
1123
|
|
|
|
|
|
|
|
1124
|
|
|
|
|
|
|
$self->{handle}->push_write( |
1125
|
0
|
|
|
|
|
0
|
pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) . # 3.10.1 Packet type |
1126
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.10.1 Packet length |
1127
|
|
|
|
|
|
|
$raw_mqtt |
1128
|
|
|
|
|
|
|
); |
1129
|
|
|
|
|
|
|
|
1130
|
0
|
|
|
|
|
0
|
1; |
1131
|
|
|
|
|
|
|
} |
1132
|
|
|
|
|
|
|
|
1133
|
|
|
|
|
|
|
sub _receive_unsuback { |
1134
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
1135
|
0
|
|
|
|
|
0
|
weaken($self); |
1136
|
|
|
|
|
|
|
|
1137
|
|
|
|
|
|
|
# 3.11.2 Packet id (short int) |
1138
|
0
|
|
|
|
|
0
|
my $offs = 0; |
1139
|
0
|
|
|
|
|
0
|
my $packet_id = _decode_int_16($packet, \$offs); |
1140
|
|
|
|
|
|
|
|
1141
|
|
|
|
|
|
|
# 3.11.2.1.1 Property Length (variable length int) |
1142
|
0
|
|
|
|
|
0
|
my $prop_len = _decode_var_int($packet, \$offs); |
1143
|
0
|
|
|
|
|
0
|
my $prop_end = $offs + $prop_len; |
1144
|
0
|
|
|
|
|
0
|
my %prop; |
1145
|
|
|
|
|
|
|
|
1146
|
0
|
|
|
|
|
0
|
while ($offs < $prop_end) { |
1147
|
|
|
|
|
|
|
|
1148
|
0
|
|
|
|
|
0
|
my $prop_id = _decode_byte($packet, \$offs); |
1149
|
|
|
|
|
|
|
|
1150
|
0
|
0
|
|
|
|
0
|
if ($prop_id == MQTT_REASON_STRING) { |
|
|
0
|
|
|
|
|
|
1151
|
|
|
|
|
|
|
# 3.11.2.1.2 Reason String (utf8 string) |
1152
|
0
|
|
|
|
|
0
|
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs); |
1153
|
|
|
|
|
|
|
} |
1154
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
1155
|
|
|
|
|
|
|
# 3.11.2.1.3 User Property (utf8 string pair) |
1156
|
0
|
|
|
|
|
0
|
my $key = _decode_utf8_str($packet, \$offs); |
1157
|
0
|
|
|
|
|
0
|
my $val = _decode_utf8_str($packet, \$offs); |
1158
|
0
|
|
|
|
|
0
|
$prop{$key} = $val; |
1159
|
|
|
|
|
|
|
} |
1160
|
|
|
|
|
|
|
else { |
1161
|
|
|
|
|
|
|
# Protocol error |
1162
|
0
|
|
|
|
|
0
|
$self->_fatal("Received UNSUBACK with unexpected property $prop_id"); |
1163
|
|
|
|
|
|
|
} |
1164
|
|
|
|
|
|
|
} |
1165
|
|
|
|
|
|
|
|
1166
|
|
|
|
|
|
|
# 3.11.3 Payload |
1167
|
0
|
|
|
|
|
0
|
my @reason_codes = unpack("C*", substr($$packet, $offs)); |
1168
|
|
|
|
|
|
|
|
1169
|
0
|
|
|
|
|
0
|
my $packet_cb = delete $self->{packet_cb}->{$packet_id}; |
1170
|
0
|
0
|
|
|
|
0
|
$self->_fatal("Received unexpected UNSUBACK") unless $packet_cb; |
1171
|
|
|
|
|
|
|
|
1172
|
0
|
|
|
|
|
0
|
my $topics = $packet_cb->{topics}; |
1173
|
0
|
|
|
|
|
0
|
my $unsuback_cb = $packet_cb->{unsuback_cb}; |
1174
|
|
|
|
|
|
|
|
1175
|
0
|
|
|
|
|
0
|
my $success = 1; |
1176
|
0
|
|
|
|
|
0
|
my @properties; |
1177
|
|
|
|
|
|
|
|
1178
|
0
|
|
|
|
|
0
|
foreach my $code (@reason_codes) { |
1179
|
|
|
|
|
|
|
|
1180
|
0
|
|
|
|
|
0
|
my $topic = shift @$topics; |
1181
|
0
|
|
|
|
|
0
|
my $reason = $Reason_code{$code}; |
1182
|
|
|
|
|
|
|
|
1183
|
0
|
0
|
|
|
|
0
|
if ($code == 0) { |
1184
|
|
|
|
|
|
|
# Success |
1185
|
0
|
|
|
|
|
0
|
my $subs = $self->{subscriptions}; |
1186
|
0
|
|
|
|
|
0
|
my $subscr_id = delete $subs->{$topic}; |
1187
|
0
|
0
|
|
|
|
0
|
if ($subscr_id) { |
1188
|
|
|
|
|
|
|
# Free on_publish callback if not used by another subscription |
1189
|
0
|
|
|
|
|
0
|
my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs; |
|
0
|
|
|
|
|
0
|
|
1190
|
0
|
0
|
|
|
|
0
|
unless (@still_used) { |
1191
|
|
|
|
|
|
|
# But not right now, as broker may send some messages *after* unsubscription |
1192
|
|
|
|
|
|
|
$self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub { |
1193
|
0
|
|
|
0
|
|
0
|
delete $self->{_timers}->{"unsub-$subscr_id"}; |
1194
|
0
|
|
|
|
|
0
|
delete $self->{subscr_cb}->{$subscr_id}; |
1195
|
0
|
|
|
|
|
0
|
}); |
1196
|
|
|
|
|
|
|
} |
1197
|
|
|
|
|
|
|
} |
1198
|
|
|
|
|
|
|
} |
1199
|
|
|
|
|
|
|
else { |
1200
|
|
|
|
|
|
|
# Failure |
1201
|
0
|
|
|
|
|
0
|
$success = 0; |
1202
|
0
|
0
|
|
|
|
0
|
unless ($unsuback_cb) { |
1203
|
0
|
|
|
|
|
0
|
$self->_fatal("Unsubscription to topic '$topic' failed: $reason"); |
1204
|
|
|
|
|
|
|
} |
1205
|
|
|
|
|
|
|
} |
1206
|
|
|
|
|
|
|
|
1207
|
0
|
|
|
|
|
0
|
push @properties, { |
1208
|
|
|
|
|
|
|
topic => $topic, |
1209
|
|
|
|
|
|
|
reason_code => $code, |
1210
|
|
|
|
|
|
|
reason => $reason, |
1211
|
|
|
|
|
|
|
%prop |
1212
|
|
|
|
|
|
|
}; |
1213
|
|
|
|
|
|
|
} |
1214
|
|
|
|
|
|
|
|
1215
|
0
|
0
|
|
|
|
0
|
$unsuback_cb->($success, @properties) if $unsuback_cb; |
1216
|
|
|
|
|
|
|
} |
1217
|
|
|
|
|
|
|
|
1218
|
|
|
|
|
|
|
our $AE_WAITING; |
1219
|
|
|
|
|
|
|
|
1220
|
|
|
|
|
|
|
sub publish { |
1221
|
90
|
|
|
90
|
1
|
28009929
|
my ($self, %args) = @_; |
1222
|
|
|
|
|
|
|
|
1223
|
90
|
|
|
|
|
420
|
my $topic = delete $args{'topic'}; |
1224
|
90
|
|
|
|
|
258
|
my $payload = delete $args{'payload'}; |
1225
|
90
|
|
|
|
|
236
|
my $qos = delete $args{'qos'}; |
1226
|
90
|
|
|
|
|
236
|
my $dup = delete $args{'duplicate'}; |
1227
|
90
|
|
|
|
|
221
|
my $retain = delete $args{'retain'}; |
1228
|
90
|
|
|
|
|
280
|
my $on_puback = delete $args{'on_puback'}; |
1229
|
90
|
|
|
|
|
212
|
my $buffer_id = delete $args{'buffer_id'}; |
1230
|
|
|
|
|
|
|
|
1231
|
90
|
50
|
|
|
|
344
|
croak "Message topic was not specified" unless defined $topic; |
1232
|
|
|
|
|
|
|
|
1233
|
90
|
50
|
|
|
|
293
|
$DEBUG && warn "Sent message to: $topic\n"; |
1234
|
|
|
|
|
|
|
|
1235
|
90
|
50
|
|
|
|
282
|
$payload = '' unless defined $payload; |
1236
|
90
|
100
|
|
|
|
427
|
my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload; |
1237
|
|
|
|
|
|
|
|
1238
|
|
|
|
|
|
|
# 3.3.2.3.4 Topic Alias |
1239
|
90
|
|
|
|
|
183
|
my $topic_alias; |
1240
|
90
|
50
|
|
|
|
346
|
if ($self->{use_alias}) { |
1241
|
0
|
|
|
|
|
0
|
$topic_alias = $self->{client_alias}->{$topic}; |
1242
|
0
|
0
|
|
|
|
0
|
if ($topic_alias) { |
|
|
0
|
|
|
|
|
|
1243
|
|
|
|
|
|
|
# Send topic alias only |
1244
|
0
|
|
|
|
|
0
|
$topic = ''; |
1245
|
|
|
|
|
|
|
} |
1246
|
|
|
|
|
|
|
elsif ($self->{server_prop}->{'topic_alias_maximum'}) { |
1247
|
|
|
|
|
|
|
#TODO: Honor maximum |
1248
|
0
|
|
|
|
|
0
|
$topic_alias = $self->{alias_seq}++; |
1249
|
0
|
|
|
|
|
0
|
$self->{client_alias}->{$topic} = $topic_alias; |
1250
|
|
|
|
|
|
|
} |
1251
|
|
|
|
|
|
|
} |
1252
|
|
|
|
|
|
|
|
1253
|
|
|
|
|
|
|
# 3.3.1.2 QoS level |
1254
|
90
|
|
|
|
|
189
|
my $flags = 0; |
1255
|
90
|
100
|
|
|
|
378
|
$flags |= $qos << 1 if $qos; |
1256
|
90
|
50
|
|
|
|
269
|
$flags |= 0x04 if $dup; |
1257
|
90
|
50
|
|
|
|
270
|
$flags |= 0x01 if $retain; |
1258
|
|
|
|
|
|
|
|
1259
|
90
|
|
|
|
|
173
|
my $packet_id; |
1260
|
90
|
100
|
|
|
|
257
|
if ($qos) { |
1261
|
70
|
|
|
|
|
207
|
$packet_id = $self->{packet_seq}++; |
1262
|
70
|
50
|
|
|
|
232
|
$self->{packet_seq} = 1 if $packet_id == 0xFFFF; |
1263
|
|
|
|
|
|
|
} |
1264
|
|
|
|
|
|
|
|
1265
|
90
|
|
|
|
|
251
|
my $raw_prop = ''; |
1266
|
|
|
|
|
|
|
|
1267
|
90
|
100
|
|
|
|
404
|
if (utf8::is_utf8( $$payload_ref )) { |
1268
|
|
|
|
|
|
|
# 3.3.2.3.2 Payload Format Indicator (byte) |
1269
|
1
|
|
|
|
|
5
|
$raw_prop .= pack("C C", MQTT_PAYLOAD_FORMAT_INDICATOR, 0x01); |
1270
|
1
|
|
|
|
|
6
|
utf8::encode( $$payload_ref ); |
1271
|
|
|
|
|
|
|
} |
1272
|
|
|
|
|
|
|
|
1273
|
90
|
50
|
|
|
|
282
|
if (exists $args{'message_expiry_interval'}) { |
1274
|
|
|
|
|
|
|
# 3.3.2.3.3 Message Expiry Interval (long int) |
1275
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C N", MQTT_MESSAGE_EXPIRY_INTERVAL, delete $args{'message_expiry_interval'}); |
1276
|
|
|
|
|
|
|
} |
1277
|
|
|
|
|
|
|
|
1278
|
90
|
50
|
|
|
|
245
|
if ($topic_alias) { |
1279
|
|
|
|
|
|
|
# 3.3.2.3.4 Topic Alias (short int) |
1280
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n", MQTT_TOPIC_ALIAS, $topic_alias); |
1281
|
|
|
|
|
|
|
} |
1282
|
|
|
|
|
|
|
|
1283
|
90
|
100
|
|
|
|
288
|
if (exists $args{'response_topic'}) { |
1284
|
|
|
|
|
|
|
# 3.3.2.3.5 Response Topic (utf8 string) |
1285
|
68
|
|
|
|
|
343
|
utf8::encode( $args{'response_topic'} ); |
1286
|
68
|
|
|
|
|
553
|
$raw_prop .= pack("C n/a*", MQTT_RESPONSE_TOPIC, delete $args{'response_topic'}); |
1287
|
|
|
|
|
|
|
} |
1288
|
|
|
|
|
|
|
|
1289
|
90
|
50
|
|
|
|
326
|
if (exists $args{'correlation_data'}) { |
1290
|
|
|
|
|
|
|
# 3.3.2.3.6 Correlation Data (binary data) |
1291
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_CORRELATION_DATA, delete $args{'correlation_data'}); |
1292
|
|
|
|
|
|
|
} |
1293
|
|
|
|
|
|
|
|
1294
|
|
|
|
|
|
|
# if (exists $args{'subscription_identifier'}) { |
1295
|
|
|
|
|
|
|
# # 3.3.2.3.8 Subscription Identifier (variable int) |
1296
|
|
|
|
|
|
|
# my $id = delete $args{'subscription_identifier'}; |
1297
|
|
|
|
|
|
|
# $raw_prop .= pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) . _encode_var_int($id); |
1298
|
|
|
|
|
|
|
# } |
1299
|
|
|
|
|
|
|
|
1300
|
90
|
50
|
|
|
|
277
|
if (exists $args{'content_type'}) { |
1301
|
|
|
|
|
|
|
# 3.3.2.3.9 Content Type (utf8 string) |
1302
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'content_type'} ); |
1303
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_CONTENT_TYPE, delete $args{'content_type'}); |
1304
|
|
|
|
|
|
|
} |
1305
|
|
|
|
|
|
|
|
1306
|
90
|
|
|
|
|
435
|
foreach my $key (keys %args) { |
1307
|
|
|
|
|
|
|
# 3.3.2.3.7 User Property (utf8 string pair) |
1308
|
20
|
|
|
|
|
99
|
my $val = $args{$key}; |
1309
|
20
|
50
|
|
|
|
77
|
next unless defined $val; |
1310
|
20
|
|
|
|
|
81
|
utf8::encode( $key ); |
1311
|
20
|
|
|
|
|
61
|
utf8::encode( $val ); |
1312
|
20
|
|
|
|
|
143
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
1313
|
|
|
|
|
|
|
} |
1314
|
|
|
|
|
|
|
|
1315
|
|
|
|
|
|
|
# 3.3.2.1 Topic name (utf8 string) |
1316
|
90
|
|
|
|
|
322
|
utf8::encode( $topic ); |
1317
|
90
|
|
|
|
|
434
|
my $raw_mqtt = pack("n/a*", $topic); |
1318
|
|
|
|
|
|
|
|
1319
|
|
|
|
|
|
|
# 3.3.2.2 Packet identifier (short int) |
1320
|
90
|
100
|
|
|
|
340
|
$raw_mqtt .= pack("n", $packet_id) if $packet_id; |
1321
|
|
|
|
|
|
|
|
1322
|
|
|
|
|
|
|
# 3.3.2.3 Properties |
1323
|
90
|
|
|
|
|
480
|
$raw_mqtt .= _encode_var_int(length $raw_prop); |
1324
|
90
|
|
|
|
|
266
|
$raw_mqtt .= $raw_prop; |
1325
|
|
|
|
|
|
|
|
1326
|
|
|
|
|
|
|
# 3.3.3 Payload |
1327
|
90
|
|
|
|
|
13306
|
$raw_mqtt .= $$payload_ref; |
1328
|
|
|
|
|
|
|
|
1329
|
90
|
|
|
|
|
404
|
$raw_mqtt = pack("C", MQTT_PUBLISH << 4 | $flags) . # 3.3.1 Packet type |
1330
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.3.1 Packet length |
1331
|
|
|
|
|
|
|
$raw_mqtt; |
1332
|
|
|
|
|
|
|
|
1333
|
90
|
50
|
66
|
|
|
691
|
if ($qos && $on_puback) { |
1334
|
|
|
|
|
|
|
# Set PUBACK callback |
1335
|
0
|
|
|
|
|
0
|
$self->{packet_cb}->{$packet_id} = $on_puback; |
1336
|
|
|
|
|
|
|
} |
1337
|
|
|
|
|
|
|
|
1338
|
90
|
50
|
|
|
|
288
|
if ($buffer_id) { |
1339
|
|
|
|
|
|
|
# Do not send right now, wait until flush_buffer |
1340
|
0
|
|
0
|
|
|
0
|
my $buffer = $self->{buffers}->{$buffer_id} ||= {}; |
1341
|
0
|
|
|
|
|
0
|
$buffer->{raw_mqtt} .= $raw_mqtt; |
1342
|
0
|
0
|
|
|
|
0
|
$buffer->{packets}->{$packet_id} = 1 if $packet_id; |
1343
|
0
|
|
|
|
|
0
|
return 1; |
1344
|
|
|
|
|
|
|
} |
1345
|
|
|
|
|
|
|
|
1346
|
90
|
|
|
|
|
890
|
$self->{handle}->push_write( $raw_mqtt ); |
1347
|
|
|
|
|
|
|
|
1348
|
90
|
100
|
66
|
|
|
59791
|
if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) { |
1349
|
|
|
|
|
|
|
# push_write could not send all data to the handle because the kernel |
1350
|
|
|
|
|
|
|
# write buffer is full. The size of kernel write bufer (which can be |
1351
|
|
|
|
|
|
|
# queried with 'sysctl net.ipv4.tcp_wmem') is choosed by the kernel |
1352
|
|
|
|
|
|
|
# based on available memory, and is 4MB in known production servers. |
1353
|
|
|
|
|
|
|
# This will happen after sending more that 4MB of data very quickly. |
1354
|
|
|
|
|
|
|
# As client may be syncronous, wait until entire message is sent. |
1355
|
|
|
|
|
|
|
|
1356
|
|
|
|
|
|
|
# Make AnyEvent allow one level of recursive condvar blocking |
1357
|
1
|
50
|
|
|
|
8
|
$AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted"; |
1358
|
1
|
|
|
|
|
4
|
local $AE_WAITING = 1; |
1359
|
1
|
|
|
|
|
4
|
local $AnyEvent::CondVar::Base::WAITING = 0; |
1360
|
|
|
|
|
|
|
|
1361
|
1
|
|
|
|
|
74
|
my $flushed = AnyEvent->condvar; |
1362
|
1
|
|
|
|
|
17
|
$self->{handle}->on_drain( $flushed ); |
1363
|
1
|
|
|
|
|
23
|
$flushed->recv; |
1364
|
1
|
|
|
|
|
425
|
$self->{handle}->on_drain(); # clear |
1365
|
|
|
|
|
|
|
} |
1366
|
|
|
|
|
|
|
} |
1367
|
|
|
|
|
|
|
|
1368
|
|
|
|
|
|
|
sub _receive_publish { |
1369
|
84
|
|
|
84
|
|
427
|
my ($self, $packet, $flags) = @_; |
1370
|
|
|
|
|
|
|
|
1371
|
|
|
|
|
|
|
# 3.3.2.1 Topic Name (utf8 str) |
1372
|
84
|
|
|
|
|
566
|
my $topic = unpack("n/a", $$packet); |
1373
|
84
|
|
|
|
|
363
|
my $offs = 2 + length $topic; |
1374
|
84
|
|
|
|
|
408
|
utf8::decode($topic); |
1375
|
|
|
|
|
|
|
|
1376
|
84
|
50
|
|
|
|
333
|
$DEBUG && warn "Got message from: $topic\n"; |
1377
|
|
|
|
|
|
|
|
1378
|
84
|
|
|
|
|
843
|
my %prop = ( |
1379
|
|
|
|
|
|
|
'topic' => $topic, |
1380
|
|
|
|
|
|
|
'qos' => ($flags & 0x6) >> 1, |
1381
|
|
|
|
|
|
|
'dup' => ($flags & 0x8) >> 3, |
1382
|
|
|
|
|
|
|
); |
1383
|
|
|
|
|
|
|
|
1384
|
|
|
|
|
|
|
# 3.3.2.2 Packet Identifier (short int) |
1385
|
84
|
100
|
|
|
|
434
|
if ($prop{'qos'} > 0) { |
1386
|
3
|
|
|
|
|
21
|
$prop{'packet_id'} = unpack("n", substr($$packet, $offs, 2)); |
1387
|
3
|
|
|
|
|
15
|
$offs += 2; |
1388
|
|
|
|
|
|
|
} |
1389
|
|
|
|
|
|
|
|
1390
|
|
|
|
|
|
|
# 3.3.2.3.1 Properties Length (variable length int) |
1391
|
84
|
|
|
|
|
329
|
my $prop_len = _decode_var_int($packet, \$offs); |
1392
|
84
|
|
|
|
|
297
|
my $prop_end = $offs + $prop_len; |
1393
|
|
|
|
|
|
|
|
1394
|
84
|
|
|
|
|
252
|
my @subscr_ids; |
1395
|
|
|
|
|
|
|
my $prop_id; |
1396
|
|
|
|
|
|
|
|
1397
|
84
|
|
|
|
|
336
|
while ($offs < $prop_end) { |
1398
|
|
|
|
|
|
|
|
1399
|
173
|
|
|
|
|
685
|
$prop_id = unpack("C", substr($$packet, $offs, 1)); |
1400
|
173
|
|
|
|
|
446
|
$offs += 1; |
1401
|
|
|
|
|
|
|
|
1402
|
173
|
100
|
|
|
|
1276
|
if ($prop_id == MQTT_PAYLOAD_FORMAT_INDICATOR) { |
|
|
50
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
|
50
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
1403
|
|
|
|
|
|
|
# 3.3.2.3.2 Payload Format Indicator (byte) |
1404
|
1
|
|
|
|
|
8
|
$prop{'payload_format'} = unpack("C", substr($$packet, $offs, 1)); |
1405
|
1
|
|
|
|
|
6
|
$offs += 1; |
1406
|
|
|
|
|
|
|
} |
1407
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_MESSAGE_EXPIRY_INTERVAL) { |
1408
|
|
|
|
|
|
|
# 3.3.2.3.3 Message Expiry Interval (long int) |
1409
|
0
|
|
|
|
|
0
|
$prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4)); |
1410
|
0
|
|
|
|
|
0
|
$offs += 4; |
1411
|
|
|
|
|
|
|
} |
1412
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_TOPIC_ALIAS) { |
1413
|
|
|
|
|
|
|
# 3.3.2.3.4 Topic Alias (short int) |
1414
|
0
|
|
|
|
|
0
|
my $alias = unpack("n", substr($$packet, $offs, 2)); |
1415
|
0
|
|
|
|
|
0
|
$offs += 2; |
1416
|
0
|
0
|
|
|
|
0
|
if (length $topic) { |
1417
|
0
|
|
|
|
|
0
|
$self->{server_alias}->{$alias} = $topic; |
1418
|
|
|
|
|
|
|
} |
1419
|
|
|
|
|
|
|
else { |
1420
|
0
|
|
|
|
|
0
|
$prop{'topic'} = $self->{server_alias}->{$alias}; |
1421
|
|
|
|
|
|
|
} |
1422
|
|
|
|
|
|
|
} |
1423
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_RESPONSE_TOPIC) { |
1424
|
|
|
|
|
|
|
# 3.3.2.3.5 Response Topic (utf8 string) |
1425
|
1
|
|
|
|
|
8
|
my $resp_topic = unpack("n/a", substr($$packet, $offs)); |
1426
|
1
|
|
|
|
|
6
|
$offs += 2 + length $resp_topic; |
1427
|
1
|
|
|
|
|
6
|
utf8::decode( $resp_topic ); |
1428
|
1
|
|
|
|
|
6
|
$prop{'response_topic'} = $resp_topic; |
1429
|
|
|
|
|
|
|
} |
1430
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_CORRELATION_DATA) { |
1431
|
|
|
|
|
|
|
# 3.3.2.3.6 Correlation Data (binary data) |
1432
|
0
|
|
|
|
|
0
|
$prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs)); |
1433
|
0
|
|
|
|
|
0
|
$offs += 2 + length $prop{'correlation_data'}; |
1434
|
|
|
|
|
|
|
} |
1435
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
1436
|
|
|
|
|
|
|
# 3.3.2.3.7 User Property (utf8 string pair) |
1437
|
87
|
|
|
|
|
16621
|
my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs)); |
1438
|
87
|
|
|
|
|
449
|
$offs += 4 + length($key) + length($val); |
1439
|
87
|
|
|
|
|
359
|
utf8::decode( $key ); |
1440
|
87
|
|
|
|
|
291
|
utf8::decode( $val ); |
1441
|
87
|
|
|
|
|
652
|
$prop{$key} = $val; |
1442
|
|
|
|
|
|
|
} |
1443
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) { |
1444
|
|
|
|
|
|
|
# 3.3.2.3.8 Subscription Identifier (variable int) |
1445
|
84
|
|
|
|
|
282
|
push @subscr_ids, _decode_var_int($packet, \$offs); |
1446
|
|
|
|
|
|
|
} |
1447
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_CONTENT_TYPE) { |
1448
|
|
|
|
|
|
|
# 3.3.2.3.9 Content Type (utf8 string) |
1449
|
0
|
|
|
|
|
0
|
my $content_type = unpack("n/a", substr($$packet, $offs)); |
1450
|
0
|
|
|
|
|
0
|
$offs += 2 + length $content_type; |
1451
|
0
|
|
|
|
|
0
|
utf8::decode( $content_type ); |
1452
|
0
|
|
|
|
|
0
|
$prop{'content_type'} = $content_type; |
1453
|
|
|
|
|
|
|
} |
1454
|
|
|
|
|
|
|
else { |
1455
|
|
|
|
|
|
|
# Protocol error |
1456
|
0
|
|
|
|
|
0
|
$self->_fatal("Received PUBLISH with unknown property $prop_id"); |
1457
|
|
|
|
|
|
|
} |
1458
|
|
|
|
|
|
|
} |
1459
|
|
|
|
|
|
|
|
1460
|
|
|
|
|
|
|
# Trim variable header from packet, the remaining is the payload |
1461
|
84
|
|
|
|
|
378
|
substr($$packet, 0, $prop_end, ''); |
1462
|
|
|
|
|
|
|
|
1463
|
84
|
100
|
|
|
|
353
|
if ($prop{'payload_format'}) { |
1464
|
|
|
|
|
|
|
# Payload is UTF-8 Encoded Character Data |
1465
|
1
|
|
|
|
|
7
|
utf8::decode( $$packet ); |
1466
|
|
|
|
|
|
|
} |
1467
|
|
|
|
|
|
|
|
1468
|
84
|
|
|
|
|
244
|
foreach (@subscr_ids) { |
1469
|
|
|
|
|
|
|
# Execute subscriptions callbacks |
1470
|
|
|
|
|
|
|
|
1471
|
84
|
|
|
|
|
708
|
$self->{subscr_cb}->{$_}->($packet, \%prop); |
1472
|
|
|
|
|
|
|
} |
1473
|
|
|
|
|
|
|
} |
1474
|
|
|
|
|
|
|
|
1475
|
|
|
|
|
|
|
|
1476
|
|
|
|
|
|
|
sub puback { |
1477
|
3
|
|
|
3
|
1
|
4000254
|
my ($self, %args) = @_; |
1478
|
|
|
|
|
|
|
|
1479
|
3
|
50
|
|
|
|
28
|
croak "Missing packet_id" unless $args{'packet_id'}; |
1480
|
|
|
|
|
|
|
|
1481
|
|
|
|
|
|
|
my $raw_mqtt = pack( |
1482
|
|
|
|
|
|
|
"C C n C", |
1483
|
|
|
|
|
|
|
MQTT_PUBACK << 4, # 3.4.1 Packet type |
1484
|
|
|
|
|
|
|
3, # 3.4.1 Remaining length |
1485
|
|
|
|
|
|
|
$args{'packet_id'}, # 3.4.2 Packet identifier |
1486
|
3
|
|
50
|
|
|
59
|
$args{'reason_code'} || 0, # 3.4.2.1 Reason code |
1487
|
|
|
|
|
|
|
); |
1488
|
|
|
|
|
|
|
|
1489
|
3
|
50
|
|
|
|
20
|
if ($args{'buffer_id'}) { |
1490
|
|
|
|
|
|
|
# Do not send right now, wait until flush_buffer |
1491
|
0
|
|
|
|
|
0
|
$self->{buffers}->{$args{'buffer_id'}}->{raw_mqtt} .= $raw_mqtt; |
1492
|
0
|
|
|
|
|
0
|
return 1; |
1493
|
|
|
|
|
|
|
} |
1494
|
|
|
|
|
|
|
|
1495
|
3
|
|
|
|
|
30
|
$self->{handle}->push_write( $raw_mqtt ); |
1496
|
|
|
|
|
|
|
|
1497
|
3
|
|
|
|
|
2010
|
1; |
1498
|
|
|
|
|
|
|
} |
1499
|
|
|
|
|
|
|
|
1500
|
|
|
|
|
|
|
sub _receive_puback { |
1501
|
70
|
|
|
70
|
|
261
|
my ($self, $packet) = @_; |
1502
|
|
|
|
|
|
|
|
1503
|
70
|
|
|
|
|
447
|
my ($packet_id, $reason_code) = unpack("n C", $$packet); |
1504
|
70
|
50
|
|
|
|
319
|
$reason_code = 0 unless defined $reason_code; |
1505
|
|
|
|
|
|
|
|
1506
|
|
|
|
|
|
|
#TODO: 3.5.2.2 Properties |
1507
|
|
|
|
|
|
|
|
1508
|
70
|
|
|
|
|
302
|
my $puback_cb = delete $self->{packet_cb}->{$packet_id}; |
1509
|
70
|
50
|
|
|
|
344
|
return unless defined $puback_cb; |
1510
|
|
|
|
|
|
|
|
1511
|
0
|
|
|
|
|
0
|
$puback_cb->($reason_code); |
1512
|
|
|
|
|
|
|
} |
1513
|
|
|
|
|
|
|
|
1514
|
|
|
|
|
|
|
sub pubrec { |
1515
|
0
|
|
|
0
|
0
|
0
|
my ($self, %args) = @_; |
1516
|
|
|
|
|
|
|
|
1517
|
0
|
0
|
|
|
|
0
|
croak "Missing packet_id" unless $args{'packet_id'}; |
1518
|
|
|
|
|
|
|
|
1519
|
|
|
|
|
|
|
my $raw_mqtt = pack( |
1520
|
|
|
|
|
|
|
"C C n C", |
1521
|
|
|
|
|
|
|
MQTT_PUBREC << 4, # 3.5.1 Packet type |
1522
|
|
|
|
|
|
|
3, # 3.5.1 Remaining length |
1523
|
|
|
|
|
|
|
$args{'packet_id'}, # 3.5.2 Packet identifier |
1524
|
0
|
|
0
|
|
|
0
|
$args{'reason_code'} || 0, # 3.5.2.1 Reason code |
1525
|
|
|
|
|
|
|
); |
1526
|
|
|
|
|
|
|
|
1527
|
|
|
|
|
|
|
#TODO: set PUBREL callback |
1528
|
|
|
|
|
|
|
|
1529
|
0
|
|
|
|
|
0
|
$self->{handle}->push_write( $raw_mqtt ); |
1530
|
|
|
|
|
|
|
|
1531
|
0
|
|
|
|
|
0
|
1; |
1532
|
|
|
|
|
|
|
} |
1533
|
|
|
|
|
|
|
|
1534
|
|
|
|
|
|
|
sub _receive_pubrec { |
1535
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
1536
|
|
|
|
|
|
|
|
1537
|
0
|
|
|
|
|
0
|
my ($packet_id, $reason_code) = unpack("n C", $$packet); |
1538
|
0
|
0
|
|
|
|
0
|
$reason_code = 0 unless defined $reason_code; |
1539
|
|
|
|
|
|
|
|
1540
|
|
|
|
|
|
|
#TODO: 3.5.2.2 Properties |
1541
|
|
|
|
|
|
|
|
1542
|
0
|
|
|
|
|
0
|
my $pubrec_cb = delete $self->{packet_cb}->{$packet_id}; |
1543
|
0
|
0
|
|
|
|
0
|
return unless defined $pubrec_cb; |
1544
|
|
|
|
|
|
|
|
1545
|
0
|
|
|
|
|
0
|
$pubrec_cb->($packet_id, $reason_code); |
1546
|
|
|
|
|
|
|
} |
1547
|
|
|
|
|
|
|
|
1548
|
|
|
|
|
|
|
sub pubrel { |
1549
|
0
|
|
|
0
|
0
|
0
|
my ($self, %args) = @_; |
1550
|
|
|
|
|
|
|
|
1551
|
0
|
0
|
|
|
|
0
|
croak "Missing packet_id" unless $args{'packet_id'}; |
1552
|
|
|
|
|
|
|
|
1553
|
|
|
|
|
|
|
my $raw_mqtt = pack( |
1554
|
|
|
|
|
|
|
"C C n C", |
1555
|
|
|
|
|
|
|
MQTT_PUBREL << 4, # 3.6.1 Packet type |
1556
|
|
|
|
|
|
|
3, # 3.6.1 Remaining length |
1557
|
|
|
|
|
|
|
$args{'packet_id'}, # 3.6.2 Packet identifier |
1558
|
0
|
|
0
|
|
|
0
|
$args{'reason_code'} || 0, # 3.6.2.1 Reason code |
1559
|
|
|
|
|
|
|
); |
1560
|
|
|
|
|
|
|
|
1561
|
|
|
|
|
|
|
#TODO: set PUBREC callback |
1562
|
|
|
|
|
|
|
|
1563
|
0
|
|
|
|
|
0
|
$self->{handle}->push_write( $raw_mqtt ); |
1564
|
|
|
|
|
|
|
|
1565
|
0
|
|
|
|
|
0
|
1; |
1566
|
|
|
|
|
|
|
} |
1567
|
|
|
|
|
|
|
|
1568
|
|
|
|
|
|
|
sub _receive_pubrel { |
1569
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
1570
|
|
|
|
|
|
|
|
1571
|
0
|
|
|
|
|
0
|
my ($packet_id, $reason_code) = unpack("n C", $$packet); |
1572
|
0
|
0
|
|
|
|
0
|
$reason_code = 0 unless defined $reason_code; |
1573
|
|
|
|
|
|
|
|
1574
|
|
|
|
|
|
|
#TODO: 3.6.2.2 Properties |
1575
|
|
|
|
|
|
|
|
1576
|
0
|
|
|
|
|
0
|
my $pubrel_cb = delete $self->{packet_cb}->{$packet_id}; |
1577
|
0
|
0
|
|
|
|
0
|
return unless defined $pubrel_cb; |
1578
|
|
|
|
|
|
|
|
1579
|
0
|
|
|
|
|
0
|
$pubrel_cb->($packet_id, $reason_code); |
1580
|
|
|
|
|
|
|
} |
1581
|
|
|
|
|
|
|
|
1582
|
|
|
|
|
|
|
sub pubcomp { |
1583
|
0
|
|
|
0
|
0
|
0
|
my ($self, %args) = @_; |
1584
|
|
|
|
|
|
|
|
1585
|
0
|
0
|
|
|
|
0
|
croak "Missing packet_id" unless $args{'packet_id'}; |
1586
|
|
|
|
|
|
|
|
1587
|
|
|
|
|
|
|
my $raw_mqtt = pack( |
1588
|
|
|
|
|
|
|
"C C n C", |
1589
|
|
|
|
|
|
|
MQTT_PUBCOMP << 4, # 3.7.1 Packet type |
1590
|
|
|
|
|
|
|
3, # 3.7.1 Remaining length |
1591
|
|
|
|
|
|
|
$args{'packet_id'}, # 3.7.2 Packet identifier |
1592
|
0
|
|
0
|
|
|
0
|
$args{'reason_code'} || 0, # 3.7.2.1 Reason code |
1593
|
|
|
|
|
|
|
); |
1594
|
|
|
|
|
|
|
|
1595
|
0
|
|
|
|
|
0
|
$self->{handle}->push_write( $raw_mqtt ); |
1596
|
|
|
|
|
|
|
|
1597
|
0
|
|
|
|
|
0
|
1; |
1598
|
|
|
|
|
|
|
} |
1599
|
|
|
|
|
|
|
|
1600
|
|
|
|
|
|
|
sub _receive_pubcomp { |
1601
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
1602
|
|
|
|
|
|
|
|
1603
|
0
|
|
|
|
|
0
|
my ($packet_id, $reason_code) = unpack("n C", $$packet); |
1604
|
0
|
0
|
|
|
|
0
|
$reason_code = 0 unless defined $reason_code; |
1605
|
|
|
|
|
|
|
|
1606
|
|
|
|
|
|
|
#TODO: 3.7.2.2 Properties |
1607
|
|
|
|
|
|
|
|
1608
|
0
|
|
|
|
|
0
|
my $pubcomp_cb = delete $self->{packet_cb}->{$packet_id}; |
1609
|
0
|
0
|
|
|
|
0
|
return unless defined $pubcomp_cb; |
1610
|
|
|
|
|
|
|
|
1611
|
0
|
|
|
|
|
0
|
$pubcomp_cb->($reason_code); |
1612
|
|
|
|
|
|
|
} |
1613
|
|
|
|
|
|
|
|
1614
|
|
|
|
|
|
|
|
1615
|
|
|
|
|
|
|
sub auth { |
1616
|
0
|
|
|
0
|
0
|
0
|
my ($self, %args) = @_; |
1617
|
|
|
|
|
|
|
|
1618
|
0
|
|
|
|
|
0
|
my $reason_code = delete $args{'reason_code'}; |
1619
|
0
|
|
|
|
|
0
|
my $auth_cb = delete $args{'on_auth'}; |
1620
|
|
|
|
|
|
|
|
1621
|
|
|
|
|
|
|
# Set callback to be executed when the server answers with an AUTH packet |
1622
|
0
|
0
|
|
|
|
0
|
$self->{packet_cb}->{'auth'} = $auth_cb if $auth_cb; |
1623
|
|
|
|
|
|
|
|
1624
|
0
|
|
|
|
|
0
|
my $raw_prop = ''; |
1625
|
|
|
|
|
|
|
|
1626
|
0
|
0
|
|
|
|
0
|
if (exists $args{'authentication_method'}) { |
1627
|
|
|
|
|
|
|
# 3.15.2.2.2 Authentication Method (utf8 string) |
1628
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'authentication_method'} ); |
1629
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'}); |
1630
|
|
|
|
|
|
|
} |
1631
|
|
|
|
|
|
|
|
1632
|
0
|
0
|
|
|
|
0
|
if (exists $args{'authentication_data'}) { |
1633
|
|
|
|
|
|
|
# 3.15.2.2.3 Authentication Data (binary data) |
1634
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'}); |
1635
|
|
|
|
|
|
|
} |
1636
|
|
|
|
|
|
|
|
1637
|
0
|
0
|
|
|
|
0
|
if (exists $args{'reason_string'}) { |
1638
|
|
|
|
|
|
|
# 3.15.2.2.4 Reason String (utf8 string) |
1639
|
0
|
|
|
|
|
0
|
utf8::encode( $args{'reason_string'} ); |
1640
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'}); |
1641
|
|
|
|
|
|
|
} |
1642
|
|
|
|
|
|
|
|
1643
|
0
|
|
|
|
|
0
|
foreach my $key (keys %args) { |
1644
|
|
|
|
|
|
|
# 3.15.2.2.5 User Property (utf8 string pair) |
1645
|
0
|
|
|
|
|
0
|
my $val = $args{$key}; |
1646
|
0
|
0
|
|
|
|
0
|
next unless defined $val; |
1647
|
0
|
|
|
|
|
0
|
utf8::encode( $key ); |
1648
|
0
|
|
|
|
|
0
|
utf8::encode( $val ); |
1649
|
0
|
|
|
|
|
0
|
$raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val); |
1650
|
|
|
|
|
|
|
} |
1651
|
|
|
|
|
|
|
|
1652
|
|
|
|
|
|
|
# 3.15.2.1 Authenticate Reason Code (byte) |
1653
|
0
|
|
|
|
|
0
|
my $raw_mqtt = pack("C", $reason_code); |
1654
|
|
|
|
|
|
|
|
1655
|
|
|
|
|
|
|
# 3.15.2.2 Properties |
1656
|
0
|
|
|
|
|
0
|
$raw_mqtt .= _encode_var_int(length $raw_prop); |
1657
|
0
|
|
|
|
|
0
|
$raw_mqtt .= $raw_prop; |
1658
|
|
|
|
|
|
|
|
1659
|
|
|
|
|
|
|
$self->{handle}->push_write( |
1660
|
0
|
|
|
|
|
0
|
pack("C", MQTT_AUTH << 4) . # 3.15.1 Packet type |
1661
|
|
|
|
|
|
|
_encode_var_int(length $raw_mqtt) . # 3.15.1 Packet length |
1662
|
|
|
|
|
|
|
$raw_mqtt |
1663
|
|
|
|
|
|
|
); |
1664
|
|
|
|
|
|
|
|
1665
|
0
|
|
|
|
|
0
|
1; |
1666
|
|
|
|
|
|
|
} |
1667
|
|
|
|
|
|
|
|
1668
|
|
|
|
|
|
|
sub _receive_auth { |
1669
|
0
|
|
|
0
|
|
0
|
my ($self, $packet) = @_; |
1670
|
|
|
|
|
|
|
|
1671
|
|
|
|
|
|
|
# Handle abbreviated packet |
1672
|
0
|
0
|
|
|
|
0
|
$$packet = "\x00\x00" if (length $$packet == 0); |
1673
|
|
|
|
|
|
|
|
1674
|
|
|
|
|
|
|
# 3.15.2.1 Authenticate Reason Code (byte) |
1675
|
0
|
|
|
|
|
0
|
my $offs = 0; |
1676
|
0
|
|
|
|
|
0
|
my $reason_code = _decode_byte($packet, \$offs); |
1677
|
0
|
|
|
|
|
0
|
my $reason = $Reason_code{$reason_code}; |
1678
|
|
|
|
|
|
|
|
1679
|
|
|
|
|
|
|
# 3.15.2.2.1 Property Length (variable length int) |
1680
|
0
|
|
|
|
|
0
|
my $prop_len = _decode_var_int($packet, \$offs); |
1681
|
0
|
|
|
|
|
0
|
my $prop_end = $offs + $prop_len; |
1682
|
|
|
|
|
|
|
|
1683
|
0
|
|
|
|
|
0
|
my %prop = ( |
1684
|
|
|
|
|
|
|
reason_code => $reason_code, |
1685
|
|
|
|
|
|
|
reason => $reason, |
1686
|
|
|
|
|
|
|
); |
1687
|
|
|
|
|
|
|
|
1688
|
0
|
|
|
|
|
0
|
while ($offs < $prop_end) { |
1689
|
|
|
|
|
|
|
|
1690
|
0
|
|
|
|
|
0
|
my $prop_id = _decode_byte($packet, \$offs); |
1691
|
|
|
|
|
|
|
|
1692
|
0
|
0
|
|
|
|
0
|
if ($prop_id == MQTT_AUTHENTICATION_METHOD) { |
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
1693
|
|
|
|
|
|
|
# 3.15.2.2.2 Authentication Method (utf8 string) |
1694
|
0
|
|
|
|
|
0
|
$prop{'authentication_method'} = _decode_utf8_str($packet, \$offs); |
1695
|
|
|
|
|
|
|
} |
1696
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_AUTHENTICATION_DATA) { |
1697
|
|
|
|
|
|
|
# 3.15.2.2.3 Authentication Data (binary data) |
1698
|
0
|
|
|
|
|
0
|
$prop{'authentication_data'} = _decode_binary_data($packet, \$offs); |
1699
|
|
|
|
|
|
|
} |
1700
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_REASON_STRING) { |
1701
|
|
|
|
|
|
|
# 3.15.2.2.4 Reason String (utf8 string) |
1702
|
0
|
|
|
|
|
0
|
$prop{'reason_string'} = _decode_utf8_str($packet, \$offs); |
1703
|
|
|
|
|
|
|
} |
1704
|
|
|
|
|
|
|
elsif ($prop_id == MQTT_USER_PROPERTY) { |
1705
|
|
|
|
|
|
|
# 3.15.2.2.5 User Property (utf8 string pair) |
1706
|
0
|
|
|
|
|
0
|
my $key = _decode_utf8_str($packet, \$offs); |
1707
|
0
|
|
|
|
|
0
|
my $val = _decode_utf8_str($packet, \$offs); |
1708
|
0
|
|
|
|
|
0
|
$prop{$key} = $val; |
1709
|
|
|
|
|
|
|
} |
1710
|
|
|
|
|
|
|
else { |
1711
|
|
|
|
|
|
|
# Protocol error |
1712
|
0
|
|
|
|
|
0
|
$self->_fatal("Received AUTH with unexpected property $prop_id"); |
1713
|
|
|
|
|
|
|
} |
1714
|
|
|
|
|
|
|
} |
1715
|
|
|
|
|
|
|
|
1716
|
0
|
|
|
|
|
0
|
my $auth_cb = delete $self->{packet_cb}->{'auth'}; |
1717
|
|
|
|
|
|
|
|
1718
|
0
|
0
|
|
|
|
0
|
$auth_cb->(\%prop) if $auth_cb; |
1719
|
|
|
|
|
|
|
} |
1720
|
|
|
|
|
|
|
|
1721
|
|
|
|
|
|
|
|
1722
|
|
|
|
|
|
|
sub flush_buffer { |
1723
|
0
|
|
|
0
|
1
|
0
|
my ($self, %args) = @_; |
1724
|
|
|
|
|
|
|
|
1725
|
0
|
|
|
|
|
0
|
my $buffer = delete $self->{buffers}->{$args{'buffer_id'}}; |
1726
|
|
|
|
|
|
|
|
1727
|
|
|
|
|
|
|
# Nothing to do if nothing was buffered |
1728
|
0
|
0
|
|
|
|
0
|
return unless $buffer; |
1729
|
|
|
|
|
|
|
|
1730
|
0
|
|
|
|
|
0
|
$self->{handle}->push_write( $buffer->{raw_mqtt} ); |
1731
|
|
|
|
|
|
|
|
1732
|
0
|
0
|
0
|
|
|
0
|
if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) { |
1733
|
|
|
|
|
|
|
|
1734
|
|
|
|
|
|
|
# Kernel write buffer is full, see publish() above |
1735
|
|
|
|
|
|
|
|
1736
|
|
|
|
|
|
|
# Make AnyEvent allow one level of recursive condvar blocking |
1737
|
0
|
0
|
|
|
|
0
|
$AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted"; |
1738
|
0
|
|
|
|
|
0
|
local $AE_WAITING = 1; |
1739
|
0
|
|
|
|
|
0
|
local $AnyEvent::CondVar::Base::WAITING = 0; |
1740
|
|
|
|
|
|
|
|
1741
|
0
|
|
|
|
|
0
|
my $flushed = AnyEvent->condvar; |
1742
|
0
|
|
|
|
|
0
|
$self->{handle}->on_drain( $flushed ); |
1743
|
0
|
|
|
|
|
0
|
$flushed->recv; |
1744
|
0
|
|
|
|
|
0
|
$self->{handle}->on_drain(); # clear |
1745
|
|
|
|
|
|
|
} |
1746
|
|
|
|
|
|
|
|
1747
|
0
|
|
|
|
|
0
|
1; |
1748
|
|
|
|
|
|
|
} |
1749
|
|
|
|
|
|
|
|
1750
|
|
|
|
|
|
|
sub discard_buffer { |
1751
|
0
|
|
|
0
|
1
|
0
|
my ($self, %args) = @_; |
1752
|
|
|
|
|
|
|
|
1753
|
0
|
|
|
|
|
0
|
my $buffer = delete $self->{buffers}->{$args{'buffer_id'}}; |
1754
|
|
|
|
|
|
|
|
1755
|
|
|
|
|
|
|
# Nothing to do if nothing was buffered |
1756
|
0
|
0
|
|
|
|
0
|
return unless $buffer; |
1757
|
|
|
|
|
|
|
|
1758
|
|
|
|
|
|
|
# Remove all pending puback callbacks, as those will never be executed |
1759
|
0
|
|
|
|
|
0
|
foreach my $packet_id (keys %{$buffer->{packet_ids}}) { |
|
0
|
|
|
|
|
0
|
|
1760
|
0
|
|
|
|
|
0
|
delete $self->{packet_cb}->{$packet_id}; |
1761
|
|
|
|
|
|
|
} |
1762
|
|
|
|
|
|
|
|
1763
|
0
|
|
|
|
|
0
|
1; |
1764
|
|
|
|
|
|
|
} |
1765
|
|
|
|
|
|
|
|
1766
|
|
|
|
|
|
|
|
1767
|
|
|
|
|
|
|
sub DESTROY { |
1768
|
13
|
|
|
13
|
|
35
|
my $self = shift; |
1769
|
|
|
|
|
|
|
# Disconnect gracefully from server if already connected |
1770
|
13
|
50
|
|
|
|
196
|
return unless defined $self->{handle}; |
1771
|
0
|
|
|
|
|
|
$self->disconnect; |
1772
|
|
|
|
|
|
|
} |
1773
|
|
|
|
|
|
|
|
1774
|
|
|
|
|
|
|
1; |
1775
|
|
|
|
|
|
|
|
1776
|
|
|
|
|
|
|
__END__ |