File Coverage

lib/Beekeeper/MQTT.pm
Criterion Covered Total %
statement 523 854 61.2
branch 132 368 35.8
condition 13 38 34.2
subroutine 71 97 73.2
pod 9 17 52.9
total 748 1374 54.4


line stmt bran cond sub pod time code
1             package Beekeeper::MQTT;
2              
3 11     11   1291 use strict;
  11         23  
  11         417  
4 11     11   60 use warnings;
  11         24  
  11         487  
5              
6             our $VERSION = '0.08';
7              
8 11     11   1175 use AnyEvent;
  11         5933  
  11         259  
9 11     11   8999 use AnyEvent::Handle;
  11         104256  
  11         644  
10 11     11   7686 use Time::HiRes;
  11         17304  
  11         51  
11 11     11   1549 use List::Util 'shuffle';
  11         28  
  11         1236  
12 11     11   92 use Scalar::Util 'weaken';
  11         29  
  11         625  
13 11     11   72 use Exporter 'import';
  11         131  
  11         332  
14 11     11   234 use Carp;
  11         42  
  11         2775  
15              
16             our @EXPORT_OK;
17             our %EXPORT_TAGS;
18              
19             our $DEBUG = 0;
20              
21             EXPORT: {
22             my (@const, @encode);
23             foreach (keys %{Beekeeper::MQTT::}) {
24             push @const, $_ if m/^MQTT_/;
25             push @encode, $_ if m/^_(en|de)code/;
26             }
27             @EXPORT_OK = (@const, @encode);
28             $EXPORT_TAGS{'const'} = \@const;
29             $EXPORT_TAGS{'decode'} = \@encode;
30             }
31              
32             # 2.1.2 Control Packet type
33              
34 11     11   112 use constant MQTT_CONNECT => 0x01;
  11         29  
  11         1074  
35 11     11   98 use constant MQTT_CONNACK => 0x02;
  11         23  
  11         643  
36 11     11   71 use constant MQTT_PUBLISH => 0x03;
  11         38  
  11         623  
37 11     11   108 use constant MQTT_PUBACK => 0x04;
  11         22  
  11         763  
38 11     11   76 use constant MQTT_PUBREC => 0x05;
  11         21  
  11         592  
39 11     11   67 use constant MQTT_PUBREL => 0x06;
  11         21  
  11         635  
40 11     11   80 use constant MQTT_PUBCOMP => 0x07;
  11         43  
  11         597  
41 11     11   69 use constant MQTT_SUBSCRIBE => 0x08;
  11         22  
  11         711  
42 11     11   83 use constant MQTT_SUBACK => 0x09;
  11         27  
  11         704  
43 11     11   75 use constant MQTT_UNSUBSCRIBE => 0x0A;
  11         27  
  11         647  
44 11     11   68 use constant MQTT_UNSUBACK => 0x0B;
  11         22  
  11         648  
45 11     11   74 use constant MQTT_PINGREQ => 0x0C;
  11         21  
  11         503  
46 11     11   66 use constant MQTT_PINGRESP => 0x0D;
  11         30  
  11         553  
47 11     11   70 use constant MQTT_DISCONNECT => 0x0E;
  11         22  
  11         583  
48 11     11   85 use constant MQTT_AUTH => 0x0F;
  11         47  
  11         619  
49              
50             # 2.2.2.2 Properties
51              
52 11     11   72 use constant MQTT_PAYLOAD_FORMAT_INDICATOR => 0x01; # byte PUBLISH, Will Properties
  11         18  
  11         553  
53 11     11   75 use constant MQTT_MESSAGE_EXPIRY_INTERVAL => 0x02; # long int PUBLISH, Will Properties
  11         18  
  11         549  
54 11     11   68 use constant MQTT_CONTENT_TYPE => 0x03; # utf8 string PUBLISH, Will Properties
  11         20  
  11         622  
55 11     11   78 use constant MQTT_RESPONSE_TOPIC => 0x08; # utf8 string PUBLISH, Will Properties
  11         37  
  11         660  
56 11     11   74 use constant MQTT_CORRELATION_DATA => 0x09; # binary data PUBLISH, Will Properties
  11         31  
  11         543  
57 11     11   65 use constant MQTT_SUBSCRIPTION_IDENTIFIER => 0x0B; # variable int PUBLISH, SUBSCRIBE
  11         35  
  11         591  
58 11     11   69 use constant MQTT_SESSION_EXPIRY_INTERVAL => 0x11; # long int CONNECT, CONNACK, DISCONNECT
  11         24  
  11         624  
59 11     11   80 use constant MQTT_ASSIGNED_CLIENT_IDENTIFIER => 0x12; # utf8 string CONNACK
  11         28  
  11         604  
60 11     11   68 use constant MQTT_SERVER_KEEP_ALIVE => 0x13; # short int CONNACK
  11         20  
  11         648  
61 11     11   71 use constant MQTT_AUTHENTICATION_METHOD => 0x15; # utf8 string CONNECT, CONNACK, AUTH
  11         22  
  11         533  
62 11     11   64 use constant MQTT_AUTHENTICATION_DATA => 0x16; # binary data CONNECT, CONNACK, AUTH
  11         20  
  11         674  
63 11     11   88 use constant MQTT_REQUEST_PROBLEM_INFORMATION => 0x17; # byte CONNECT
  11         23  
  11         680  
64 11     11   74 use constant MQTT_WILL_DELAY_INTERVAL => 0x18; # long int Will Properties
  11         21  
  11         644  
65 11     11   108 use constant MQTT_REQUEST_RESPONSE_INFORMATION => 0x19; # byte CONNECT
  11         22  
  11         635  
66 11     11   86 use constant MQTT_RESPONSE_INFORMATION => 0x1A; # utf8 string CONNACK
  11         22  
  11         585  
67 11     11   90 use constant MQTT_SERVER_REFERENCE => 0x1C; # utf8 string CONNACK, DISCONNECT
  11         28  
  11         609  
68 11     11   73 use constant MQTT_REASON_STRING => 0x1F; # utf8 string CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
  11         20  
  11         570  
69 11     11   71 use constant MQTT_RECEIVE_MAXIMUM => 0x21; # short int CONNECT, CONNACK
  11         24  
  11         498  
70 11     11   73 use constant MQTT_TOPIC_ALIAS_MAXIMUM => 0x22; # short int CONNECT, CONNACK
  11         22  
  11         588  
71 11     11   69 use constant MQTT_TOPIC_ALIAS => 0x23; # short int PUBLISH
  11         19  
  11         552  
72 11     11   77 use constant MQTT_MAXIMUM_QOS => 0x24; # byte CONNACK
  11         23  
  11         544  
73 11     11   67 use constant MQTT_RETAIN_AVAILABLE => 0x25; # byte CONNACK
  11         21  
  11         621  
74 11     11   73 use constant MQTT_USER_PROPERTY => 0x26; # utf8 pair CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH
  11         39  
  11         604  
75 11     11   87 use constant MQTT_MAXIMUM_PACKET_SIZE => 0x27; # long int CONNECT, CONNACK
  11         49  
  11         665  
76 11     11   73 use constant MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE => 0x28; # byte CONNACK
  11         28  
  11         534  
77 11     11   68 use constant MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE => 0x29; # byte CONNACK
  11         31  
  11         504  
78 11     11   303 use constant MQTT_SHARED_SUBSCRIPTION_AVAILABLE => 0x2A; # byte CONNACK
  11         26  
  11         101304  
79              
80             # 2.4 Reason Code
81              
82             my %Reason_code = (
83             0x00 => 'Success', # CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH
84             # 0x00 => 'Normal disconnection', # DISCONNECT
85             # 0x00 => 'Granted QoS 0', # SUBACK
86             # 0x01 => 'Granted QoS 1', # SUBACK
87             # 0x02 => 'Granted QoS 2', # SUBACK
88             0x04 => 'Disconnect with Will Message', # DISCONNECT
89             0x10 => 'No matching subscribers', # PUBACK, PUBREC
90             0x11 => 'No subscription existed', # UNSUBACK
91             0x18 => 'Continue authentication', # AUTH
92             0x19 => 'Re-authenticate', # AUTH
93             0x80 => 'Unspecified error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
94             0x81 => 'Malformed Packet', # CONNACK, DISCONNECT
95             0x82 => 'Protocol Error', # CONNACK, DISCONNECT
96             0x83 => 'Implementation specific error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
97             0x84 => 'Unsupported Protocol Version', # CONNACK
98             0x85 => 'Client Identifier not valid', # CONNACK
99             0x86 => 'Bad User Name or Password', # CONNACK
100             0x87 => 'Not authorized', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
101             0x88 => 'Server unavailable', # CONNACK
102             0x89 => 'Server busy', # CONNACK, DISCONNECT
103             0x8A => 'Banned', # CONNACK
104             0x8B => 'Server shutting down', # DISCONNECT
105             0x8C => 'Bad authentication method', # CONNACK, DISCONNECT
106             0x8D => 'Keep Alive timeout', # DISCONNECT
107             0x8E => 'Session taken over', # DISCONNECT
108             0x8F => 'Topic Filter invalid', # SUBACK, UNSUBACK, DISCONNECT
109             0x90 => 'Topic Name invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT
110             0x91 => 'Packet Identifier in use', # PUBACK, PUBREC, SUBACK, UNSUBACK
111             0x92 => 'Packet Identifier not found', # PUBREL, PUBCOMP
112             0x93 => 'Receive Maximum exceeded', # DISCONNECT
113             0x94 => 'Topic Alias invalid', # DISCONNECT
114             0x95 => 'Packet too large', # CONNACK, DISCONNECT
115             0x96 => 'Message rate too high', # DISCONNECT
116             0x97 => 'Quota exceeded', # CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT
117             0x98 => 'Administrative action', # DISCONNECT
118             0x99 => 'Payload format invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT
119             0x9A => 'Retain not supported', # CONNACK, DISCONNECT
120             0x9B => 'QoS not supported', # CONNACK, DISCONNECT
121             0x9C => 'Use another server', # CONNACK, DISCONNECT
122             0x9D => 'Server moved', # CONNACK, DISCONNECT
123             0x9E => 'Shared Subscriptions not supported', # SUBACK, DISCONNECT
124             0x9F => 'Connection rate exceeded', # CONNACK, DISCONNECT
125             0xA0 => 'Maximum connect time', # DISCONNECT
126             0xA1 => 'Subscription Identifiers not supported', # SUBACK, DISCONNECT
127             0xA2 => 'Wildcard Subscriptions not supported', # SUBACK, DISCONNECT
128             );
129              
130             # 3.9.3 Subscribe Reason Codes
131              
132             my %Subscribe_reason_code = (
133             %Reason_code,
134             0x00 => 'Granted QoS 0',
135             0x01 => 'Granted QoS 1',
136             0x02 => 'Granted QoS 2',
137             );
138              
139             # 3.14.2.1 Disconnect Reason Code
140              
141             my %Disconnect_reason_code = (
142             %Reason_code,
143             0x00 => 'Normal disconnection',
144             );
145              
146             sub _decode_byte {
147 76     76   169 my ($packet, $offs) = @_;
148              
149 76         305 my $byte = unpack("C", substr($$packet, $$offs, 1));
150 76         160 $$offs += 1;
151              
152 76         372 return $byte;
153             }
154              
155             sub _decode_int_16 {
156 19     19   42 my ($packet, $offs) = @_;
157              
158 19         96 my $int = unpack("n", substr($$packet, $$offs, 2));
159 19         64 $$offs += 2;
160              
161 19         65 return $int;
162             }
163              
164             sub _decode_int_32 {
165 0     0   0 my ($packet, $offs) = @_;
166              
167 0         0 my $int = unpack("N", substr($$packet, $$offs, 4));
168 0         0 $$offs += 4;
169              
170 0         0 return $int;
171             }
172              
173             sub _decode_utf8_str {
174 0     0   0 my ($packet, $offs) = @_;
175              
176 0         0 my $str = unpack("n/a", substr($$packet, $$offs));
177 0         0 $$offs += 2 + length($str);
178 0         0 utf8::decode($str);
179              
180 0         0 return $str;
181             }
182              
183             sub _decode_binary_data {
184 0     0   0 my ($packet, $offs) = @_;
185              
186 0         0 my $data = unpack("n/a", substr($$packet, $$offs));
187 0         0 $$offs += 2 + length($data);
188              
189 0         0 return $data;
190             }
191              
192             sub _decode_var_int {
193 206     206   437 my ($packet, $offs) = @_;
194              
195 206         403 my $int = 0;
196 206         341 my $mult = 1;
197 206         331 my $byte;
198              
199 206         382 do {
200 206         825 $byte = unpack("C", substr($$packet, $$offs, 1));
201 206         765 $int += ($byte & 0x7F) * $mult;
202 206         522 $mult *= 128;
203 206         761 $$offs++;
204             } while ($byte & 0x80);
205              
206 206         911 return $int;
207             }
208              
209             sub _encode_var_int {
210 301 100   301   1880 return pack("C", $_[0]) if ($_[0] < 128);
211 30         272 my @a = unpack("C*", pack("w", $_[0]));
212 30         90 $a[0] &= 0x7F;
213 30         67 $a[-1] |= 0x80;
214 30         17088 return pack("C*", reverse @a);
215             }
216              
217              
218             sub new {
219 19     19 1 9634 my ($class, %args) = @_;
220              
221 19         593 my $self = {
222             bus_id => undef,
223             bus_role => undef,
224             handle => undef, # the socket
225             hosts => undef, # list of all hosts in cluster
226             is_connected => undef, # true once connected
227             try_hosts => undef, # list of hosts to try to connect
228             connect_err => undef, # count of connection errors
229             timeout_tmr => undef, # timer used for connection timeout
230             reconnect_tmr => undef, # timer used for connection retry
231             connack_cb => undef, # connack callback
232             error_cb => undef, # error callback
233             client_id => undef, # client id
234             server_prop => {}, # server properties
235             server_alias => {}, # server topic aliases
236             client_alias => {}, # client topic aliases
237             subscriptions => {}, # topic subscriptions
238             subscr_cb => {}, # subscription callbacks
239             packet_cb => {}, # packet callbacks
240             buffers => {}, # raw mqtt buffers
241             packet_seq => 1, # sequence used for packet ids
242             subscr_seq => 1, # sequence used for subscription ids
243             alias_seq => 1, # sequence used for topic alias ids
244             use_alias => 0, # topic alias enabled
245             config => \%args,
246             };
247              
248 19         82 $self->{bus_id} = delete $args{'bus_id'};
249 19   33     178 $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
250 19         77 $self->{error_cb} = delete $args{'on_error'};
251              
252 19         128 bless $self, $class;
253 19         150 return $self;
254             }
255              
256 0     0 0 0 sub bus_id { $_[0]->{bus_id} }
257 0     0 0 0 sub bus_role { $_[0]->{bus_role} }
258              
259             sub _fatal {
260 0     0   0 my ($self, $errstr) = @_;
261 0 0       0 die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
262 0         0 $self->{error_cb}->($errstr);
263             }
264              
265             our $BUSY_SINCE = undef;
266             our $BUSY_TIME = 0;
267              
268             sub connect {
269 19     19 1 304 my ($self, %args) = @_;
270              
271 19         235 $self->{connack_cb} = $args{'on_connack'};
272 19         791 $self->{connect_cv} = AnyEvent->condvar;
273              
274 19         107456 $self->_connect;
275              
276 19 50       225 $self->{connect_cv}->recv if $args{'blocking'};
277 19         776 $self->{connect_cv} = undef;
278              
279 19 50       122 return $args{'blocking'} ? $self->{is_connected} : 1;
280             }
281              
282             sub _connect {
283 19     19   70 my ($self) = @_;
284 19         107 weaken($self);
285              
286 19         51 my $config = $self->{config};
287              
288 19         82 my $timeout = $config->{'timeout'};
289 19 50       85 $timeout = 30 unless defined $timeout;
290              
291             # Ensure that timeout is set properly when the event loop was blocked
292 19         183 AnyEvent->now_update;
293              
294             # Connection timeout handler
295 19 50 33     427 if ($timeout && !$self->{timeout_tmr}) {
296             $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
297 0     0   0 $self->_reset_connection;
298 0         0 $self->{connect_cv}->send;
299 0         0 $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
300 19         519 });
301             }
302              
303 19 50       680 unless ($self->{hosts}) {
304             # Initialize the list of cluster hosts
305 19   50     92 my $hosts = $config->{'host'} || 'localhost';
306 19 50       102 my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
307 19         177 $self->{hosts} = [ shuffle @hosts ];
308             }
309              
310             # Determine next host of cluster to connect to
311 19   50     172 my $try_hosts = $self->{try_hosts} ||= [];
312 19 50       78 @$try_hosts = @{$self->{hosts}} unless @$try_hosts;
  19         65  
313              
314             # TCP connection args
315 19         55 my $host = shift @$try_hosts;
316 19   50     125 my $tls = $config->{'tls'} || 0;
317 19   33     83 my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
318              
319 19         159 ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
320 19         129 ($port) = ($port =~ m/^([0-9]+)$/s);
321              
322             $self->{handle} = AnyEvent::Handle->new(
323             connect => [ $host, $port ],
324             tls => $tls ? 'connect' : undef,
325             keepalive => 1,
326             no_delay => 1,
327             on_connect => sub {
328 19     19   7736 my ($fh, $host, $port) = @_;
329             # Send CONNECT packet
330 19         80 $self->{server_prop}->{host} = $host;
331 19         48 $self->{server_prop}->{port} = $port;
332 19         153 $self->_send_connect;
333             },
334             on_connect_error => sub {
335 0     0   0 my ($fh, $errmsg) = @_;
336             # Some error occurred while connection, such as an unresolved hostname
337             # or connection refused. Try next host of cluster immediately, or retry
338             # in few seconds if all hosts of the cluster are unresponsive
339 0         0 $self->{connect_err}++;
340 0 0       0 warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
  0         0  
341 0 0       0 my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
  0         0  
  0         0  
342             $self->{reconnect_tmr} = AnyEvent->timer(
343             after => ($delay < 10 ? $delay : 10),
344 0         0 cb => sub { $self->_connect },
345 0 0       0 );
346             },
347             on_error => sub {
348 0     0   0 my ($fh, $fatal, $errmsg) = @_;
349             # Some error occurred, such as a read error
350 0         0 $self->_reset_connection;
351 0         0 $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
352             },
353             on_eof => sub {
354 0     0   0 my ($fh) = @_;
355             # The server has closed the connection without sending DISCONNECT
356 0         0 $self->_reset_connection;
357 0         0 $self->_fatal("MQTT broker at $host:$port has gone away");
358             },
359             on_read => sub {
360 272     272   2173672 my ($fh) = @_;
361              
362 272         1955 my $packet_type;
363             my $packet_flags;
364              
365 272         0 my $rbuff_len;
366 272         0 my $packet_len;
367              
368 272         0 my $mult;
369 272         0 my $offs;
370 272         0 my $byte;
371              
372 272         0 my $timing_packets;
373              
374 272 50       941 unless (defined $BUSY_SINCE) {
375             # Measure time elapsed while processing incoming packets
376 272         834 $BUSY_SINCE = Time::HiRes::time;
377 272         582 $timing_packets = 1;
378             }
379              
380             PARSE_PACKET: {
381              
382 272         397 $rbuff_len = length $fh->{rbuf};
  464         1297  
383              
384 464 100       1678 last PARSE_PACKET unless $rbuff_len >= 2;
385              
386 286 50       809 unless ($packet_type) {
387              
388 286         463 $packet_len = 0;
389 286         434 $mult = 1;
390 286         445 $offs = 1;
391              
392             PARSE_LEN: {
393 286         444 $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
  580         2301  
394 580         2018 $packet_len += ($byte & 0x7f) * $mult;
395 580 100       1824 last unless ($byte & 0x80);
396 294 50       710 last PARSE_PACKET if ($offs >= $rbuff_len); # Not enough data
397 294         432 $mult *= 128;
398 294 50       676 redo if ($offs < 5);
399             }
400              
401             #TODO: Check max packet size
402              
403 286         1083 $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
404 286         896 $packet_type = $byte >> 4;
405 286         831 $packet_flags = $byte & 0x0F;
406             }
407              
408 286 100       1055 if ($rbuff_len < ($offs + $packet_len)) {
409             # Not enough data
410 94         207 last PARSE_PACKET;
411             }
412              
413             # Consume packet from buffer
414 192         16439 my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), '');
415              
416             # Trim fixed header from packet
417 192         559 substr($packet, 0, $offs, '');
418              
419 192 100       1208 if ($packet_type == MQTT_PUBLISH) {
    100          
    50          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    0          
    0          
420              
421 84         556 $self->_receive_publish(\$packet, $packet_flags);
422             }
423             elsif ($packet_type == MQTT_PUBACK) {
424              
425 70         506 $self->_receive_puback(\$packet);
426             }
427             elsif ($packet_type == MQTT_PUBREC) {
428              
429 0         0 $self->_receive_pubrec(\$packet);
430             }
431             elsif ($packet_type == MQTT_PUBREL) {
432            
433 0         0 $self->_receive_pubrel(\$packet);
434             }
435             elsif ($packet_type == MQTT_PUBCOMP) {
436              
437 0         0 $self->_receive_pubcomp(\$packet);
438             }
439             elsif ($packet_type == MQTT_PINGREQ) {
440              
441 0         0 $self->pingresp;
442             }
443             elsif ($packet_type == MQTT_PINGRESP) {
444              
445             # Client takes no action on receiving PINGRESP
446             }
447             elsif ($packet_type == MQTT_SUBACK) {
448              
449 19         162 $self->_receive_suback(\$packet);
450             }
451             elsif ($packet_type == MQTT_UNSUBACK) {
452              
453 0         0 $self->_receive_unsuback(\$packet);
454             }
455             elsif ($packet_type == MQTT_CONNACK) {
456              
457 19         114 $self->_receive_connack(\$packet);
458             }
459             elsif ($packet_type == MQTT_DISCONNECT) {
460              
461 0         0 $self->_receive_disconnect(\$packet);
462             }
463             elsif ($packet_type == MQTT_AUTH) {
464              
465 0         0 $self->_receive_auth(\$packet);
466             }
467             else {
468             # Protocol error
469 0         0 $self->_fatal("Received packet with unknown type $packet_type");
470             }
471              
472             # Prepare for next frame
473 192         20080 undef $packet_type;
474              
475             # Handle could have been destroyed at this point
476 192 50       4428 redo PARSE_PACKET if defined $fh->{rbuf};
477             }
478              
479 272 50       745 if (defined $timing_packets) {
480 272         841 $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
481 272         1005 undef $BUSY_SINCE;
482             }
483             },
484 19 50       832 );
485              
486 19         14446 1;
487             }
488              
489             sub _send_connect {
490 19     19   61 my ($self) = @_;
491              
492 19         46 my %prop = %{$self->{config}};
  19         182  
493              
494 19         74 my $username = delete $prop{'username'};
495 19         50 my $password = delete $prop{'password'};
496 19         52 my $client_id = delete $prop{'client_id'};
497 19         43 my $clean_start = delete $prop{'clean_start'};
498 19         43 my $keep_alive = delete $prop{'keep_alive'};
499 19         634 my $will = delete $prop{'will'};
500              
501 19 50       83 unless ($client_id) {
502 19         80 $client_id = '';
503 19         470 $client_id .= ('0'..'9','a'..'z','A'..'Z')[rand 62] for (1..22);
504             }
505              
506 19         348 $self->{client_id} = $client_id;
507              
508              
509             # 3.1.2.11 Properties
510              
511 19         80 my $raw_prop = '';
512              
513 19 100       90 if (exists $prop{'session_expiry_interval'}) {
514             # 3.1.2.11.2 Session Expiry Interval (long int)
515 6         43 $raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $prop{'session_expiry_interval'});
516             }
517              
518 19 100       63 if (exists $prop{'receive_maximum'}) {
519             # 3.1.2.11.3 Receive Maximum (short int)
520 8         36 $raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $prop{'receive_maximum'});
521             }
522              
523 19 50       68 if (exists $prop{'maximum_packet_size'}) {
524             # 3.1.2.11.4 Maximum Packet Size (long int)
525 0         0 $raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $prop{'maximum_packet_size'});
526             }
527              
528 19 100       61 if (exists $prop{'topic_alias_maximum'}) {
529             # 3.1.2.11.5 Topic Alias Maximum (short int)
530 6         24 $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $prop{'topic_alias_maximum'});
531             }
532              
533 19 50       69 if (exists $prop{'request_response_information'}) {
534             # 3.1.2.11.6 Request Response Information (byte)
535 0         0 $raw_prop .= pack("C C", MQTT_REQUEST_RESPONSE_INFORMATION, delete $prop{'request_response_information'});
536             }
537              
538 19 50       61 if (exists $prop{'request_problem_information'}) {
539             # 3.1.2.11.7 Request Problem Information (byte)
540 0         0 $raw_prop .= pack("C C", MQTT_REQUEST_PROBLEM_INFORMATION, delete $prop{'request_problem_information'});
541             }
542              
543 19 50       61 if (exists $prop{'authentication_method'}) {
544             # 3.1.2.11.9 Authentication Method (utf8 string)
545 0         0 utf8::encode( $prop{'authentication_method'} );
546 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $prop{'authentication_method'});
547             }
548              
549 19 50       66 if (exists $prop{'authentication_data'}) {
550             # 3.1.2.11.10 Authentication Data (binary data)
551 0         0 $raw_prop .= pack("C n", MQTT_AUTHENTICATION_DATA, delete $prop{'authentication_data'});
552             }
553              
554 19         104 foreach my $key (keys %prop) {
555             # 3.1.2.11.8 User Property (utf8 string pair)
556 38         111 my $val = $prop{$key};
557 38 50       99 next unless defined $val;
558 38         142 utf8::encode( $key );
559 38         90 utf8::encode( $val );
560 38         177 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
561             }
562              
563              
564             # 3.1.2 Variable Header
565              
566             # 3.1.2.1 Protocol Name (utf8 string)
567 19         74 my $raw_mqtt = pack("n/a*", "MQTT");
568              
569             # 3.1.2.2 Protocol Version (byte)
570 19         39 $raw_mqtt .= pack("C", 5);
571              
572             # 3.1.2.3 Connect Flags (byte)
573 19         37 my $flags = 0;
574 19 100       59 $flags |= 0x02 if $clean_start; # 3.1.2.4 Clean Start
575 19 50       159 $flags |= 0x80 if defined $username; # 3.1.2.8 User Name Flag
576 19 50       60 $flags |= 0x40 if defined $password; # 3.1.2.9 Password Flag
577              
578 19 50       53 if ($will) {
579 0         0 $flags |= 0x04; # 3.1.2.5 Will Flag
580 0         0 $flags |= $will->{'qos'} << 3; # 3.1.2.6 Will QoS
581 0 0       0 $flags |= 0x20 if $will->{'retain'}; # 3.1.2.7 Will Retain
582             }
583              
584 19         53 $raw_mqtt .= pack("C", $flags);
585              
586             # 3.1.2.10 Keep Alive (short int)
587 19   50     268 $raw_mqtt .= pack("n", $keep_alive || 0);
588              
589             # 3.1.2.11 Properties
590 19         94 $raw_mqtt .= _encode_var_int(length $raw_prop);
591 19         60 $raw_mqtt .= $raw_prop;
592              
593              
594             # 3.1.3 Payload
595              
596             # 3.1.3.1 Client Identifier (utf8 string)
597 19         74 $raw_mqtt .= pack("n/a*", $client_id);
598              
599 19 50       73 if ($will) {
600              
601             #TODO: 3.1.3.2 Will Properties
602 0         0 my $will_prop = '';
603              
604 0         0 $raw_mqtt .= _encode_var_int(length $will_prop);
605 0         0 $raw_mqtt .= $will_prop;
606              
607             # 3.1.3.3 Will Topic (utf8 string)
608 0         0 utf8::encode( $will->{'topic'});
609 0         0 $raw_mqtt .= pack("n/a*", $will->{'topic'});
610              
611             # 3.1.3.4 Will Payload (binary data)
612 0         0 $raw_mqtt .= pack("n/a*", $will->{'payload'});
613             }
614              
615 19 50       76 if (defined $username) {
616             # 3.1.3.5 Username (utf8 string)
617 19         69 utf8::encode( $username );
618 19         71 $raw_mqtt .= pack("n/a*", $username);
619             }
620            
621 19 50       86 if (defined $password) {
622             # 3.1.3.6 Password (binary data)
623 19         73 $raw_mqtt .= pack("n/a*", $password);
624             }
625              
626             $self->{handle}->push_write(
627 19         103 pack("C", MQTT_CONNECT << 4) . # 3.1.1 Packet type
628             _encode_var_int(length $raw_mqtt) . # 3.1.1 Packet length
629             $raw_mqtt
630             );
631             }
632              
633             sub _receive_connack {
634 19     19   62 my ($self, $packet) = @_;
635              
636 19         52 my $prop = $self->{server_prop};
637 19         45 my $offs = 0;
638              
639             # 3.2.2.1 Acknowledge flags (byte)
640 19         143 my $ack_flags = _decode_byte($packet, \$offs);
641 19         144 $prop->{'session_present'} = $ack_flags & 0x01;
642              
643             # 3.2.2.2 Reason code (byte)
644 19         70 my $reason_code = _decode_byte($packet, \$offs);
645 19         105 $prop->{'reason_code'} = $reason_code;
646 19         152 $prop->{'reason'} = $Reason_code{$reason_code};
647            
648             # 3.2.2.3.1 Properties Length (variable length int)
649 19         83 my $prop_len = _decode_var_int($packet, \$offs);
650 19         81 my $prop_end = $offs + $prop_len;
651              
652 19         94 while ($offs < $prop_end) {
653              
654 19         73 my $prop_id = _decode_byte($packet, \$offs);
655              
656 19 50       141 if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
657             # 3.2.2.3.2 Session Expiry Interval (long int)
658 0         0 $prop->{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
659             }
660             elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
661             # 3.2.2.3.3 Receive Maximum (short int)
662 0         0 $prop->{'receive_maximum'} = _decode_int_16($packet, \$offs);
663             }
664             elsif ($prop_id == MQTT_MAXIMUM_QOS) {
665             # 3.2.2.3.4 Maximum QoS (byte)
666 19         62 $prop->{'maximum_qos'} = _decode_byte($packet, \$offs);
667             }
668             elsif ($prop_id == MQTT_RETAIN_AVAILABLE) {
669             # 3.2.2.3.5 Retain Available (byte)
670 0         0 $prop->{'retain_available'} = _decode_byte($packet, \$offs);
671             }
672             elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
673             # 3.2.2.3.6 Maximum Packet Size (long int)
674 0         0 $prop->{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
675             }
676             elsif ($prop_id == MQTT_ASSIGNED_CLIENT_IDENTIFIER) {
677             # 3.2.2.3.7 Assigned Client Identifier (utf8 string)
678 0         0 $prop->{'assigned_client_identifier'} = _decode_utf8_str($packet, \$offs);
679             }
680             elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
681             # 3.2.2.3.8 Topic Alias Maximum (short int)
682 0         0 $prop->{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
683             }
684             elsif ($prop_id == MQTT_REASON_STRING) {
685             # 3.2.2.3.9 Reason String (utf8 string)
686 0         0 $prop->{'reason_string'} = _decode_utf8_str($packet, \$offs);
687             }
688             elsif ($prop_id == MQTT_USER_PROPERTY) {
689             # 3.2.2.3.10 User Property (utf8 string pair)
690 0         0 my $key = _decode_utf8_str($packet, \$offs);
691 0         0 my $val = _decode_utf8_str($packet, \$offs);
692 0         0 $prop->{$key} = $val;
693             }
694             elsif ($prop_id == MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE) {
695             # 3.2.2.3.11 Wildcard Subscription Available (byte)
696 0         0 $prop->{'wildcard_subscription_available'} = _decode_byte($packet, \$offs);
697             }
698             elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE) {
699             # 3.2.2.3.12 Subscription Identifiers Available (byte)
700 0         0 $prop->{'subscription_identifier_available'} = _decode_byte($packet, \$offs);
701             }
702             elsif ($prop_id == MQTT_SHARED_SUBSCRIPTION_AVAILABLE) {
703             # 3.2.2.3.13 Shared Subscription Available (byte)
704 0         0 $prop->{'shared_subscription_available'} = _decode_byte($packet, \$offs);
705             }
706             elsif ($prop_id == MQTT_SERVER_KEEP_ALIVE) {
707             # 3.2.2.3.14 Server Keep Alive (short int)
708 0         0 $prop->{'server_keep_alive'} = _decode_int_16($packet, \$offs);
709             }
710             elsif ($prop_id == MQTT_RESPONSE_INFORMATION) {
711             # 3.2.2.3.15 Response Information (utf8 string)
712 0         0 $prop->{'response_information'} = _decode_utf8_str($packet, \$offs);
713             }
714             elsif ($prop_id == MQTT_SERVER_REFERENCE) {
715             # 3.2.2.3.16 Server Reference (utf8 string)
716 0         0 $prop->{'server_reference'} = _decode_utf8_str($packet, \$offs);
717             }
718             elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
719             # 3.2.2.3.17 Authentication Method (utf8 string)
720 0         0 $prop->{'authentication_method'} = _decode_utf8_str($packet, \$offs);
721             }
722             elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
723             # 3.2.2.3.18 Authentication Data (binary data)
724 0         0 $prop->{'authentication_data'} = _decode_binary_data($packet, \$offs);
725             }
726             else {
727             # Protocol error
728 0         0 $self->_fatal("Received CONNACK with unknown property $prop_id");
729             }
730             }
731              
732 19         62 my $success = ($reason_code == 0x00);
733              
734 19 50       62 unless ( $success ) {
735             # Server will close the connection
736             # warn "Served refused CONNACK: $reason";
737             #TODO: handle
738             }
739              
740 19         49 $self->{is_connected} = 1;
741 19         168 $self->{timeout_tmr} = undef;
742 19         52 $self->{reconnect_tmr} = undef;
743 19         52 $self->{connect_err} = undef;
744              
745             #TODO: ... blocking connection
746 19 50       198 $self->{connect_cv}->send if $self->{connect_cv};
747              
748             # Execute CONNACK callback
749 19         252 my $connack_cb = $self->{connack_cb};
750 19 50       109 $connack_cb->($success, $prop) if $connack_cb;
751             }
752              
753              
754             sub disconnect {
755 13     13 1 19868601 my ($self, %args) = @_;
756              
757 13 50       70 unless (defined $self->{handle}) {
758 0         0 carp "Already disconnected from MQTT broker";
759 0         0 return;
760             }
761              
762 13         35 my $reason_code = delete $args{'reason_code'};
763              
764             # 3.14.2.2 Properties
765              
766 13         47 my $raw_prop = '';
767              
768 13 50       50 if (exists $args{'session_expiry_interval'}) {
769             # 3.14.2.2.2 Session Expiry Interval (long int)
770 0         0 utf8::encode( $args{'session_expiry_interval'} );
771 0         0 $raw_prop .= pack("C n/a*", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'});
772             }
773              
774 13 50       43 if (exists $args{'reason_string'}) {
775             # 3.14.2.2.3 Reason String (utf8 string)
776 0         0 utf8::encode( $args{'reason_string'} );
777 0         0 $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
778             }
779              
780 13 50       48 if (exists $args{'server_reference'}) {
781             # 3.14.2.2.5 Server Reference (utf8 string)
782 0         0 utf8::encode( $args{'server_reference'} );
783 0         0 $raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
784             }
785              
786 13         59 foreach my $key (keys %args) {
787             # 3.14.2.2.4 User Property (utf8 string pair)
788 0         0 my $val = $args{$key};
789 0 0       0 next unless defined $val;
790 0         0 utf8::encode( $key );
791 0         0 utf8::encode( $val );
792 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
793             }
794              
795             # 3.14.2 Variable Header
796              
797             # 3.14.2.1 Disconnect Reason Code (byte)
798 13   50     130 my $raw_mqtt = pack("C", $reason_code || 0);
799              
800             # 3.14.2.2 Properties
801 13         123 $raw_mqtt .= _encode_var_int(length $raw_prop);
802 13         41 $raw_mqtt .= $raw_prop;
803              
804             $self->{handle}->push_write(
805 13         49 pack("C", MQTT_DISCONNECT << 4) . # 3.14.1 Packet type
806             _encode_var_int(length $raw_mqtt) . # 3.14.1 Packet length
807             $raw_mqtt
808             );
809              
810 13         6792 $self->_reset_connection;
811             }
812              
813             sub _reset_connection {
814 13     13   41 my ($self) = @_;
815              
816 13         121 $self->{handle} = undef;
817              
818 13         5186 $self->{is_connected} = undef;
819 13         31 $self->{reconnect_tmr} = undef;
820 13         33 $self->{timeout_tmr} = undef;
821 13         33 $self->{connect_err} = undef;
822              
823 13         79 $self->{server_prop} = {};
824 13         44 $self->{server_alias} = {};
825 13         40 $self->{client_alias} = {};
826 13         54 $self->{subscriptions} = {};
827 13         112 $self->{subscr_cb} = {};
828 13         41 $self->{packet_cb} = {};
829 13         33 $self->{buffers} = {};
830 13         41 $self->{packet_seq} = 1;
831 13         28 $self->{subscr_seq} = 1;
832 13         31 $self->{alias_seq} = 1;
833 13         189 $self->{use_alias} = 0;
834             }
835              
836             sub _receive_disconnect {
837 0     0   0 my ($self, $packet) = @_;
838              
839             # Handle abbreviated packet
840 0 0       0 $$packet = "\x00\x00" if (length $$packet == 0);
841              
842             # 3.14.2.1 Reason Code (byte)
843 0         0 my $offs = 0;
844 0         0 my $reason_code = _decode_byte($packet, \$offs);
845 0         0 my $reason = $Disconnect_reason_code{$reason_code};
846              
847             # 3.14.2.2.1 Property Length (variable length int)
848 0         0 my $prop_len = _decode_var_int($packet, \$offs);
849 0         0 my $prop_end = $offs + $prop_len;
850              
851 0         0 my %prop = (
852             'reason_code' => $reason_code,
853             'reason' => $reason,
854             );
855              
856 0         0 while ($offs < $prop_end) {
857              
858 0         0 my $prop_id = _decode_byte($packet, \$offs);
859              
860 0 0       0 if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    0          
    0          
    0          
861             # 3.14.2.2.2 Session Expiry Interval (long int)
862 0         0 $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
863             }
864             elsif ($prop_id == MQTT_REASON_STRING) {
865             # 3.14.2.2.3 Reason String (utf8 string)
866 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
867             }
868             elsif ($prop_id == MQTT_USER_PROPERTY) {
869             # 3.14.2.2.4 User Property (utf8 string pair)
870 0         0 my $key = _decode_utf8_str($packet, \$offs);
871 0         0 my $val = _decode_utf8_str($packet, \$offs);
872 0         0 $prop{$key} = $val;
873             }
874             elsif ($prop_id == MQTT_SERVER_REFERENCE) {
875             # 3.14.2.2.5 Server Reference (utf8 string)
876 0         0 $prop{'server_reference'} = _decode_utf8_str($packet, \$offs);
877             }
878             else {
879             # Protocol error
880 0         0 $self->_fatal("Received DISCONNECT with unknown property $prop_id");
881             }
882             }
883              
884 0         0 $self->_reset_connection;
885              
886 0         0 my $disconn_cb = $self->{disconn_cb};
887              
888 0 0       0 if ($disconn_cb) {
889 0         0 $disconn_cb->(\%prop);
890             }
891             else {
892 0         0 $self->_fatal("Disconnected from MQTT broker: $prop{reason}");
893             }
894             }
895              
896              
897             sub pingreq {
898 0     0 0 0 my ($self) = @_;
899              
900             $self->{handle}->push_write(
901 0         0 pack( "C C",
902             MQTT_PINGREQ << 4, # 3.12.1 Packet type
903             0, # 3.12.1 Remaining length
904             )
905             );
906             }
907              
908             sub pingresp {
909 0     0 0 0 my ($self) = @_;
910              
911             $self->{handle}->push_write(
912 0         0 pack( "C C",
913             MQTT_PINGRESP << 4, # 3.13.1 Packet type
914             0, # 3.13.1 Remaining length
915             )
916             );
917             }
918              
919              
920             sub subscribe {
921 19     19 1 2001458 my ($self, %args) = @_;
922              
923 19         79 my $topic = delete $args{'topic'};
924 19         62 my $topics = delete $args{'topics'};
925 19         44 my $subscr_cb = delete $args{'on_publish'};
926 19         45 my $suback_cb = delete $args{'on_suback'};
927 19         40 my $max_qos = delete $args{'maximum_qos'};
928 19         63 my $no_local = delete $args{'no_local'};
929 19         39 my $retain_asp = delete $args{'retain_as_published'};
930 19         43 my $retain_hdl = delete $args{'retain_handling'};
931              
932 19 50       94 $topics = [] unless defined $topics;
933 19 50       90 push (@$topics, $topic) if defined $topic;
934              
935 19 50       62 croak "Subscription topics were not specified" unless @$topics;
936 19 50       82 croak "on_publish callback is required" unless $subscr_cb;
937              
938 19         69 foreach my $topic (@$topics) {
939 19 50       73 croak "Undefined subscription topic" unless defined $topic;
940 19 50       146 croak "Empty subscription topic" unless length $topic;
941             }
942              
943 19         57 my $packet_id = $self->{packet_seq}++;
944 19 50       84 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
945              
946             # Set callback for incomings PUBLISH
947 19         57 my $subscr_id = $self->{subscr_seq}++;
948 19         105 $self->{subscr_cb}->{$subscr_id} = $subscr_cb;
949              
950             # Parameters for expected SUBACK
951 19         206 $self->{packet_cb}->{$packet_id} = {
952             topics => [ @$topics ], # copy
953             subscr_id => $subscr_id,
954             suback_cb => $suback_cb,
955             };
956              
957             # 3.8.2.1.2 Subscription Identifier (variable len int)
958 19         69 my $raw_prop = pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) .
959             _encode_var_int($subscr_id);
960              
961             # 3.8.2.1.3 User Property (utf8 string pair)
962 19         90 foreach my $key (keys %args) {
963 0         0 my $val = $args{$key};
964 0 0       0 next unless defined $val;
965 0         0 utf8::encode( $key );
966 0         0 utf8::encode( $val );
967 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
968             }
969              
970 19         121 my $raw_mqtt = pack("n", $packet_id) . # 3.8.2 Packet identifier
971             _encode_var_int(length $raw_prop) . # 3.8.2.1 Properties Length
972             $raw_prop; # 3.8.2.1 Properties
973              
974             # 3.8.3.1 Subscription Options
975 19         45 my $options = 0;
976 19 100       67 $options |= ($max_qos & 0x03) if $max_qos; # Maximum QoS
977 19 50       57 $options |= 0x04 if $no_local; # No Local
978 19 50       46 $options |= 0x08 if $retain_asp; # Retain As Published
979 19 50       59 $options |= ($retain_hdl & 0x03) << 4 if $retain_hdl; # Retain Handling
980              
981             # 3.8.3 Payload
982 19         57 foreach my $topic (@$topics) {
983 19         143 utf8::encode( $topic );
984 19         112 $raw_mqtt .= pack("n/a* C", $topic, $options);
985             }
986              
987             $self->{handle}->push_write(
988 19         5567 pack("C", MQTT_SUBSCRIBE << 4 | 0x02) . # 3.8.1 Packet type
989             _encode_var_int(length $raw_mqtt) . # 3.8.1 Packet length
990             $raw_mqtt
991             );
992              
993 19         9372 1;
994             }
995              
996             sub _receive_suback {
997 19     19   67 my ($self, $packet) = @_;
998              
999             # 3.9.2 Packet id (short int)
1000 19         44 my $offs = 0;
1001 19         76 my $packet_id = _decode_int_16($packet, \$offs);
1002              
1003             # 3.9.2.1.1 Property Length (variable length int)
1004 19         76 my $prop_len = _decode_var_int($packet, \$offs);
1005 19         69 my $prop_end = $offs + $prop_len;
1006 19         46 my %prop;
1007              
1008 19         83 while ($offs < $prop_end) {
1009              
1010 0         0 my $prop_id = _decode_byte($packet, \$offs);
1011              
1012 0 0       0 if ($prop_id == MQTT_REASON_STRING) {
    0          
1013             # 3.9.2.1.2 Reason String (utf8 string)
1014 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1015             }
1016             elsif ($prop_id == MQTT_USER_PROPERTY) {
1017             # 3.9.2.1.3 User Property (utf8 string pair)
1018 0         0 my $key = _decode_utf8_str($packet, \$offs);
1019 0         0 my $val = _decode_utf8_str($packet, \$offs);
1020 0         0 $prop{$key} = $val;
1021             }
1022             else {
1023             # Protocol error
1024 0         0 $self->_fatal("Received SUBACK with unexpected property $prop_id");
1025             }
1026             }
1027              
1028             # 3.9.3 Payload
1029 19         118 my @reason_codes = unpack("C*", substr($$packet, $offs));
1030              
1031 19         5328 my $packet_cb = delete $self->{packet_cb}->{$packet_id};
1032 19 50       81 $self->_fatal("Received unexpected SUBACK") unless $packet_cb;
1033              
1034 19         57 my $topics = $packet_cb->{topics};
1035 19         41 my $suback_cb = $packet_cb->{suback_cb};
1036 19         49 my $subscr_id = $packet_cb->{subscr_id};
1037              
1038 19         37 my $success = 1;
1039 19         35 my @properties;
1040              
1041 19         81 foreach my $code (@reason_codes) {
1042              
1043 19         51 my $topic = shift @$topics;
1044 19         77 my $reason = $Subscribe_reason_code{$code};
1045 19         39 my $granted_qos;
1046              
1047 19 50       85 if ($code <= 2) {
1048             # Success
1049 19         53 $granted_qos = $code;
1050 19         89 $self->{subscriptions}->{$topic} = $subscr_id;
1051             }
1052             else {
1053             # Failure
1054 0         0 $success = 0;
1055 0         0 $granted_qos = undef;
1056 0 0       0 unless ($suback_cb) {
1057 0         0 $self->_fatal("Subscription to topic '$topic' failed: $reason");
1058             }
1059             }
1060              
1061 19 50       57 $DEBUG && warn "Subscribed to: $topic\n";
1062              
1063 19         171 push @properties, {
1064             topic => $topic,
1065             reason_code => $code,
1066             granted_qos => $granted_qos,
1067             reason => $reason,
1068             %prop
1069             };
1070             }
1071              
1072 19 100       142 $suback_cb->($success, @properties) if $suback_cb;
1073             }
1074              
1075              
1076             sub unsubscribe {
1077 0     0 1 0 my ($self, %args) = @_;
1078              
1079 0         0 my $topic = delete $args{'topic'};
1080 0         0 my $topics = delete $args{'topics'};
1081 0         0 my $unsuback_cb = delete $args{'on_unsuback'};
1082              
1083 0 0       0 $topics = [] unless defined $topics;
1084 0 0       0 push (@$topics, $topic) if defined $topic;
1085              
1086 0 0       0 croak "Unsubscription topics were not specified" unless @$topics;
1087 0 0       0 croak "on_unsuback callback is required" unless $unsuback_cb;
1088              
1089 0         0 foreach my $topic (@$topics) {
1090 0 0       0 croak "Undefined unsubscription topic" unless defined $topic;
1091 0 0       0 croak "Empty unsubscription topic" unless length $topic;
1092             }
1093              
1094 0         0 my $packet_id = $self->{packet_seq}++;
1095 0 0       0 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
1096              
1097             # Set callback for UNSUBACK
1098 0         0 $self->{packet_cb}->{$packet_id} = {
1099             topics => [ @$topics ], # copy
1100             unsuback_cb => $unsuback_cb,
1101             };
1102              
1103             # 3.10.2.1.2 User Property (utf8 string pair)
1104 0         0 my $raw_prop = '';
1105 0         0 foreach my $key (keys %args) {
1106 0         0 my $val = $args{$key};
1107 0 0       0 next unless defined $val;
1108 0         0 utf8::encode( $key );
1109 0         0 utf8::encode( $val );
1110 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1111             }
1112              
1113 0         0 my $raw_mqtt = pack("n", $packet_id) . # 3.10.2 Packet identifier
1114             _encode_var_int(length $raw_prop) . # 3.10.2.1 Property Length
1115             $raw_prop; # 3.10.2.1 Properties
1116              
1117             # 3.10.3 Payload
1118 0         0 foreach my $topic (@$topics) {
1119 0         0 utf8::encode($topic);
1120 0         0 $raw_mqtt .= pack("n/a*", $topic);
1121             }
1122              
1123             $self->{handle}->push_write(
1124 0         0 pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) . # 3.10.1 Packet type
1125             _encode_var_int(length $raw_mqtt) . # 3.10.1 Packet length
1126             $raw_mqtt
1127             );
1128              
1129 0         0 1;
1130             }
1131              
1132             sub _receive_unsuback {
1133 0     0   0 my ($self, $packet) = @_;
1134 0         0 weaken($self);
1135              
1136             # 3.11.2 Packet id (short int)
1137 0         0 my $offs = 0;
1138 0         0 my $packet_id = _decode_int_16($packet, \$offs);
1139              
1140             # 3.11.2.1.1 Property Length (variable length int)
1141 0         0 my $prop_len = _decode_var_int($packet, \$offs);
1142 0         0 my $prop_end = $offs + $prop_len;
1143 0         0 my %prop;
1144              
1145 0         0 while ($offs < $prop_end) {
1146              
1147 0         0 my $prop_id = _decode_byte($packet, \$offs);
1148              
1149 0 0       0 if ($prop_id == MQTT_REASON_STRING) {
    0          
1150             # 3.11.2.1.2 Reason String (utf8 string)
1151 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1152             }
1153             elsif ($prop_id == MQTT_USER_PROPERTY) {
1154             # 3.11.2.1.3 User Property (utf8 string pair)
1155 0         0 my $key = _decode_utf8_str($packet, \$offs);
1156 0         0 my $val = _decode_utf8_str($packet, \$offs);
1157 0         0 $prop{$key} = $val;
1158             }
1159             else {
1160             # Protocol error
1161 0         0 $self->_fatal("Received UNSUBACK with unexpected property $prop_id");
1162             }
1163             }
1164              
1165             # 3.11.3 Payload
1166 0         0 my @reason_codes = unpack("C*", substr($$packet, $offs));
1167              
1168 0         0 my $packet_cb = delete $self->{packet_cb}->{$packet_id};
1169 0 0       0 $self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;
1170              
1171 0         0 my $topics = $packet_cb->{topics};
1172 0         0 my $unsuback_cb = $packet_cb->{unsuback_cb};
1173              
1174 0         0 my $success = 1;
1175 0         0 my @properties;
1176              
1177 0         0 foreach my $code (@reason_codes) {
1178              
1179 0         0 my $topic = shift @$topics;
1180 0         0 my $reason = $Reason_code{$code};
1181              
1182 0 0       0 if ($code == 0) {
1183             # Success
1184 0         0 my $subs = $self->{subscriptions};
1185 0         0 my $subscr_id = delete $subs->{$topic};
1186 0 0       0 if ($subscr_id) {
1187             # Free on_publish callback if not used by another subscription
1188 0         0 my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
  0         0  
1189 0 0       0 unless (@still_used) {
1190             # But not right now, as broker may send some messages *after* unsubscription
1191             $self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
1192 0     0   0 delete $self->{_timers}->{"unsub-$subscr_id"};
1193 0         0 delete $self->{subscr_cb}->{$subscr_id};
1194 0         0 });
1195             }
1196             }
1197             }
1198             else {
1199             # Failure
1200 0         0 $success = 0;
1201 0 0       0 unless ($unsuback_cb) {
1202 0         0 $self->_fatal("Unsubscription to topic '$topic' failed: $reason");
1203             }
1204             }
1205              
1206 0         0 push @properties, {
1207             topic => $topic,
1208             reason_code => $code,
1209             reason => $reason,
1210             %prop
1211             };
1212             }
1213              
1214 0 0       0 $unsuback_cb->($success, @properties) if $unsuback_cb;
1215             }
1216              
1217             our $AE_WAITING;
1218              
1219             sub publish {
1220 90     90 1 28003241 my ($self, %args) = @_;
1221              
1222 90         331 my $topic = delete $args{'topic'};
1223 90         237 my $payload = delete $args{'payload'};
1224 90         279 my $qos = delete $args{'qos'};
1225 90         210 my $dup = delete $args{'duplicate'};
1226 90         202 my $retain = delete $args{'retain'};
1227 90         170 my $on_puback = delete $args{'on_puback'};
1228 90         5295 my $buffer_id = delete $args{'buffer_id'};
1229              
1230 90 50       286 croak "Message topic was not specified" unless defined $topic;
1231              
1232 90 50       271 $DEBUG && warn "Sent message to: $topic\n";
1233              
1234 90 50       240 $payload = '' unless defined $payload;
1235 90 100       382 my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload;
1236              
1237             # 3.3.2.3.4 Topic Alias
1238 90         159 my $topic_alias;
1239 90 50       332 if ($self->{use_alias}) {
1240 0         0 $topic_alias = $self->{client_alias}->{$topic};
1241 0 0       0 if ($topic_alias) {
    0          
1242             # Send topic alias only
1243 0         0 $topic = '';
1244             }
1245             elsif ($self->{server_prop}->{'topic_alias_maximum'}) {
1246             #TODO: Honor maximum
1247 0         0 $topic_alias = $self->{alias_seq}++;
1248 0         0 $self->{client_alias}->{$topic} = $topic_alias;
1249             }
1250             }
1251              
1252             # 3.3.1.2 QoS level
1253 90         172 my $flags = 0;
1254 90 100       324 $flags |= $qos << 1 if $qos;
1255 90 50       203 $flags |= 0x04 if $dup;
1256 90 50       205 $flags |= 0x01 if $retain;
1257              
1258 90         136 my $packet_id;
1259 90 100       239 if ($qos) {
1260 70         171 $packet_id = $self->{packet_seq}++;
1261 70 50       192 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
1262             }
1263              
1264 90         260 my $raw_prop = '';
1265              
1266 90 100       397 if (utf8::is_utf8( $$payload_ref )) {
1267             # 3.3.2.3.2 Payload Format Indicator (byte)
1268 1         3 $raw_prop .= pack("C C", MQTT_PAYLOAD_FORMAT_INDICATOR, 0x01);
1269 1         4 utf8::encode( $$payload_ref );
1270             }
1271              
1272 90 50       262 if (exists $args{'message_expiry_interval'}) {
1273             # 3.3.2.3.3 Message Expiry Interval (long int)
1274 0         0 $raw_prop .= pack("C N", MQTT_MESSAGE_EXPIRY_INTERVAL, delete $args{'message_expiry_interval'});
1275             }
1276              
1277 90 50       244 if ($topic_alias) {
1278             # 3.3.2.3.4 Topic Alias (short int)
1279 0         0 $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS, $topic_alias);
1280             }
1281              
1282 90 100       267 if (exists $args{'response_topic'}) {
1283             # 3.3.2.3.5 Response Topic (utf8 string)
1284 68         286 utf8::encode( $args{'response_topic'} );
1285 68         618 $raw_prop .= pack("C n/a*", MQTT_RESPONSE_TOPIC, delete $args{'response_topic'});
1286             }
1287              
1288 90 50       376 if (exists $args{'correlation_data'}) {
1289             # 3.3.2.3.6 Correlation Data (binary data)
1290 0         0 $raw_prop .= pack("C n/a*", MQTT_CORRELATION_DATA, delete $args{'correlation_data'});
1291             }
1292              
1293             # if (exists $args{'subscription_identifier'}) {
1294             # # 3.3.2.3.8 Subscription Identifier (variable int)
1295             # my $id = delete $args{'subscription_identifier'};
1296             # $raw_prop .= pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) . _encode_var_int($id);
1297             # }
1298              
1299 90 50       270 if (exists $args{'content_type'}) {
1300             # 3.3.2.3.9 Content Type (utf8 string)
1301 0         0 utf8::encode( $args{'content_type'} );
1302 0         0 $raw_prop .= pack("C n/a*", MQTT_CONTENT_TYPE, delete $args{'content_type'});
1303             }
1304              
1305 90         429 foreach my $key (keys %args) {
1306             # 3.3.2.3.7 User Property (utf8 string pair)
1307 20         72 my $val = $args{$key};
1308 20 50       63 next unless defined $val;
1309 20         78 utf8::encode( $key );
1310 20         60 utf8::encode( $val );
1311 20         120 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1312             }
1313              
1314             # 3.3.2.1 Topic name (utf8 string)
1315 90         304 utf8::encode( $topic );
1316 90         418 my $raw_mqtt = pack("n/a*", $topic);
1317              
1318             # 3.3.2.2 Packet identifier (short int)
1319 90 100       312 $raw_mqtt .= pack("n", $packet_id) if $packet_id;
1320              
1321             # 3.3.2.3 Properties
1322 90         454 $raw_mqtt .= _encode_var_int(length $raw_prop);
1323 90         219 $raw_mqtt .= $raw_prop;
1324              
1325             # 3.3.3 Payload
1326 90         14230 $raw_mqtt .= $$payload_ref;
1327              
1328 90         348 $raw_mqtt = pack("C", MQTT_PUBLISH << 4 | $flags) . # 3.3.1 Packet type
1329             _encode_var_int(length $raw_mqtt) . # 3.3.1 Packet length
1330             $raw_mqtt;
1331              
1332 90 50 66     597 if ($qos && $on_puback) {
1333             # Set PUBACK callback
1334 0         0 $self->{packet_cb}->{$packet_id} = $on_puback;
1335             }
1336              
1337 90 50       237 if ($buffer_id) {
1338             # Do not send right now, wait until flush_buffer
1339 0   0     0 my $buffer = $self->{buffers}->{$buffer_id} ||= {};
1340 0         0 $buffer->{raw_mqtt} .= $raw_mqtt;
1341 0 0       0 $buffer->{packets}->{$packet_id} = 1 if $packet_id;
1342 0         0 return 1;
1343             }
1344              
1345 90         790 $self->{handle}->push_write( $raw_mqtt );
1346              
1347 90 100 66     57343 if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {
1348             # push_write could not send all data to the handle because the kernel
1349             # write buffer is full. The size of kernel write bufer (which can be
1350             # queried with 'sysctl net.ipv4.tcp_wmem') is choosed by the kernel
1351             # based on available memory, and is 4MB in known production servers.
1352             # This will happen after sending more that 4MB of data very quickly.
1353             # As client may be syncronous, wait until entire message is sent.
1354              
1355             # Make AnyEvent allow one level of recursive condvar blocking
1356 1 50       10 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
1357 1         5 local $AE_WAITING = 1;
1358 1         4 local $AnyEvent::CondVar::Base::WAITING = 0;
1359              
1360 1         79 my $flushed = AnyEvent->condvar;
1361 1         17 $self->{handle}->on_drain( $flushed );
1362 1         22 $flushed->recv;
1363 1         3868 $self->{handle}->on_drain(); # clear
1364             }
1365             }
1366              
1367             sub _receive_publish {
1368 84     84   335 my ($self, $packet, $flags) = @_;
1369              
1370             # 3.3.2.1 Topic Name (utf8 str)
1371 84         531 my $topic = unpack("n/a", $$packet);
1372 84         363 my $offs = 2 + length $topic;
1373 84         376 utf8::decode($topic);
1374              
1375 84 50       267 $DEBUG && warn "Got message from: $topic\n";
1376              
1377 84         745 my %prop = (
1378             'topic' => $topic,
1379             'qos' => ($flags & 0x6) >> 1,
1380             'dup' => ($flags & 0x8) >> 3,
1381             );
1382              
1383             # 3.3.2.2 Packet Identifier (short int)
1384 84 100       341 if ($prop{'qos'} > 0) {
1385 3         16 $prop{'packet_id'} = unpack("n", substr($$packet, $offs, 2));
1386 3         12 $offs += 2;
1387             }
1388              
1389             # 3.3.2.3.1 Properties Length (variable length int)
1390 84         329 my $prop_len = _decode_var_int($packet, \$offs);
1391 84         263 my $prop_end = $offs + $prop_len;
1392              
1393 84         221 my @subscr_ids;
1394             my $prop_id;
1395              
1396 84         286 while ($offs < $prop_end) {
1397              
1398 173         583 $prop_id = unpack("C", substr($$packet, $offs, 1));
1399 173         394 $offs += 1;
1400              
1401 173 100       1203 if ($prop_id == MQTT_PAYLOAD_FORMAT_INDICATOR) {
    50          
    50          
    100          
    50          
    100          
    50          
    0          
1402             # 3.3.2.3.2 Payload Format Indicator (byte)
1403 1         29 $prop{'payload_format'} = unpack("C", substr($$packet, $offs, 1));
1404 1         8 $offs += 1;
1405             }
1406             elsif ($prop_id == MQTT_MESSAGE_EXPIRY_INTERVAL) {
1407             # 3.3.2.3.3 Message Expiry Interval (long int)
1408 0         0 $prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4));
1409 0         0 $offs += 4;
1410             }
1411             elsif ($prop_id == MQTT_TOPIC_ALIAS) {
1412             # 3.3.2.3.4 Topic Alias (short int)
1413 0         0 my $alias = unpack("n", substr($$packet, $offs, 2));
1414 0         0 $offs += 2;
1415 0 0       0 if (length $topic) {
1416 0         0 $self->{server_alias}->{$alias} = $topic;
1417             }
1418             else {
1419 0         0 $prop{'topic'} = $self->{server_alias}->{$alias};
1420             }
1421             }
1422             elsif ($prop_id == MQTT_RESPONSE_TOPIC) {
1423             # 3.3.2.3.5 Response Topic (utf8 string)
1424 1         7 my $resp_topic = unpack("n/a", substr($$packet, $offs));
1425 1         6 $offs += 2 + length $resp_topic;
1426 1         7 utf8::decode( $resp_topic );
1427 1         6 $prop{'response_topic'} = $resp_topic;
1428             }
1429             elsif ($prop_id == MQTT_CORRELATION_DATA) {
1430             # 3.3.2.3.6 Correlation Data (binary data)
1431 0         0 $prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs));
1432 0         0 $offs += 2 + length $prop{'correlation_data'};
1433             }
1434             elsif ($prop_id == MQTT_USER_PROPERTY) {
1435             # 3.3.2.3.7 User Property (utf8 string pair)
1436 87         18090 my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs));
1437 87         354 $offs += 4 + length($key) + length($val);
1438 87         327 utf8::decode( $key );
1439 87         242 utf8::decode( $val );
1440 87         490 $prop{$key} = $val;
1441             }
1442             elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
1443             # 3.3.2.3.8 Subscription Identifier (variable int)
1444 84         238 push @subscr_ids, _decode_var_int($packet, \$offs);
1445             }
1446             elsif ($prop_id == MQTT_CONTENT_TYPE) {
1447             # 3.3.2.3.9 Content Type (utf8 string)
1448 0         0 my $content_type = unpack("n/a", substr($$packet, $offs));
1449 0         0 $offs += 2 + length $content_type;
1450 0         0 utf8::decode( $content_type );
1451 0         0 $prop{'content_type'} = $content_type;
1452             }
1453             else {
1454             # Protocol error
1455 0         0 $self->_fatal("Received PUBLISH with unknown property $prop_id");
1456             }
1457             }
1458              
1459             # Trim variable header from packet, the remaining is the payload
1460 84         273 substr($$packet, 0, $prop_end, '');
1461              
1462 84 100       259 if ($prop{'payload_format'}) {
1463             # Payload is UTF-8 Encoded Character Data
1464 1         6 utf8::decode( $$packet );
1465             }
1466              
1467 84         250 foreach (@subscr_ids) {
1468             # Execute subscriptions callbacks
1469              
1470 84         591 $self->{subscr_cb}->{$_}->($packet, \%prop);
1471             }
1472             }
1473              
1474              
1475             sub puback {
1476 3     3 1 4003507 my ($self, %args) = @_;
1477              
1478 3 50       19 croak "Missing packet_id" unless $args{'packet_id'};
1479              
1480             my $raw_mqtt = pack(
1481             "C C n C",
1482             MQTT_PUBACK << 4, # 3.4.1 Packet type
1483             3, # 3.4.1 Remaining length
1484             $args{'packet_id'}, # 3.4.2 Packet identifier
1485 3   50     69 $args{'reason_code'} || 0, # 3.4.2.1 Reason code
1486             );
1487              
1488 3 50       14 if ($args{'buffer_id'}) {
1489             # Do not send right now, wait until flush_buffer
1490 0         0 $self->{buffers}->{$args{'buffer_id'}}->{raw_mqtt} .= $raw_mqtt;
1491 0         0 return 1;
1492             }
1493              
1494 3         27 $self->{handle}->push_write( $raw_mqtt );
1495              
1496 3         1701 1;
1497             }
1498              
1499             sub _receive_puback {
1500 70     70   233 my ($self, $packet) = @_;
1501              
1502 70         400 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1503 70 50       305 $reason_code = 0 unless defined $reason_code;
1504              
1505             #TODO: 3.5.2.2 Properties
1506              
1507 70         281 my $puback_cb = delete $self->{packet_cb}->{$packet_id};
1508 70 50       280 return unless defined $puback_cb;
1509              
1510 0         0 $puback_cb->($reason_code);
1511             }
1512              
1513             sub pubrec {
1514 0     0 0 0 my ($self, %args) = @_;
1515              
1516 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1517              
1518             my $raw_mqtt = pack(
1519             "C C n C",
1520             MQTT_PUBREC << 4, # 3.5.1 Packet type
1521             3, # 3.5.1 Remaining length
1522             $args{'packet_id'}, # 3.5.2 Packet identifier
1523 0   0     0 $args{'reason_code'} || 0, # 3.5.2.1 Reason code
1524             );
1525              
1526             #TODO: set PUBREL callback
1527              
1528 0         0 $self->{handle}->push_write( $raw_mqtt );
1529              
1530 0         0 1;
1531             }
1532              
1533             sub _receive_pubrec {
1534 0     0   0 my ($self, $packet) = @_;
1535              
1536 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1537 0 0       0 $reason_code = 0 unless defined $reason_code;
1538              
1539             #TODO: 3.5.2.2 Properties
1540              
1541 0         0 my $pubrec_cb = delete $self->{packet_cb}->{$packet_id};
1542 0 0       0 return unless defined $pubrec_cb;
1543              
1544 0         0 $pubrec_cb->($packet_id, $reason_code);
1545             }
1546              
1547             sub pubrel {
1548 0     0 0 0 my ($self, %args) = @_;
1549              
1550 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1551              
1552             my $raw_mqtt = pack(
1553             "C C n C",
1554             MQTT_PUBREL << 4, # 3.6.1 Packet type
1555             3, # 3.6.1 Remaining length
1556             $args{'packet_id'}, # 3.6.2 Packet identifier
1557 0   0     0 $args{'reason_code'} || 0, # 3.6.2.1 Reason code
1558             );
1559              
1560             #TODO: set PUBREC callback
1561              
1562 0         0 $self->{handle}->push_write( $raw_mqtt );
1563              
1564 0         0 1;
1565             }
1566              
1567             sub _receive_pubrel {
1568 0     0   0 my ($self, $packet) = @_;
1569              
1570 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1571 0 0       0 $reason_code = 0 unless defined $reason_code;
1572              
1573             #TODO: 3.6.2.2 Properties
1574              
1575 0         0 my $pubrel_cb = delete $self->{packet_cb}->{$packet_id};
1576 0 0       0 return unless defined $pubrel_cb;
1577              
1578 0         0 $pubrel_cb->($packet_id, $reason_code);
1579             }
1580              
1581             sub pubcomp {
1582 0     0 0 0 my ($self, %args) = @_;
1583              
1584 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1585              
1586             my $raw_mqtt = pack(
1587             "C C n C",
1588             MQTT_PUBCOMP << 4, # 3.7.1 Packet type
1589             3, # 3.7.1 Remaining length
1590             $args{'packet_id'}, # 3.7.2 Packet identifier
1591 0   0     0 $args{'reason_code'} || 0, # 3.7.2.1 Reason code
1592             );
1593              
1594 0         0 $self->{handle}->push_write( $raw_mqtt );
1595              
1596 0         0 1;
1597             }
1598              
1599             sub _receive_pubcomp {
1600 0     0   0 my ($self, $packet) = @_;
1601              
1602 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1603 0 0       0 $reason_code = 0 unless defined $reason_code;
1604              
1605             #TODO: 3.7.2.2 Properties
1606              
1607 0         0 my $pubcomp_cb = delete $self->{packet_cb}->{$packet_id};
1608 0 0       0 return unless defined $pubcomp_cb;
1609              
1610 0         0 $pubcomp_cb->($reason_code);
1611             }
1612              
1613              
1614             sub auth {
1615 0     0 0 0 my ($self, %args) = @_;
1616              
1617 0         0 my $reason_code = delete $args{'reason_code'};
1618 0         0 my $auth_cb = delete $args{'on_auth'};
1619              
1620             # Set callback to be executed when the server answers with an AUTH packet
1621 0 0       0 $self->{packet_cb}->{'auth'} = $auth_cb if $auth_cb;
1622              
1623 0         0 my $raw_prop = '';
1624              
1625 0 0       0 if (exists $args{'authentication_method'}) {
1626             # 3.15.2.2.2 Authentication Method (utf8 string)
1627 0         0 utf8::encode( $args{'authentication_method'} );
1628 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'});
1629             }
1630              
1631 0 0       0 if (exists $args{'authentication_data'}) {
1632             # 3.15.2.2.3 Authentication Data (binary data)
1633 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'});
1634             }
1635              
1636 0 0       0 if (exists $args{'reason_string'}) {
1637             # 3.15.2.2.4 Reason String (utf8 string)
1638 0         0 utf8::encode( $args{'reason_string'} );
1639 0         0 $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
1640             }
1641              
1642 0         0 foreach my $key (keys %args) {
1643             # 3.15.2.2.5 User Property (utf8 string pair)
1644 0         0 my $val = $args{$key};
1645 0 0       0 next unless defined $val;
1646 0         0 utf8::encode( $key );
1647 0         0 utf8::encode( $val );
1648 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1649             }
1650              
1651             # 3.15.2.1 Authenticate Reason Code (byte)
1652 0         0 my $raw_mqtt = pack("C", $reason_code);
1653              
1654             # 3.15.2.2 Properties
1655 0         0 $raw_mqtt .= _encode_var_int(length $raw_prop);
1656 0         0 $raw_mqtt .= $raw_prop;
1657              
1658             $self->{handle}->push_write(
1659 0         0 pack("C", MQTT_AUTH << 4) . # 3.15.1 Packet type
1660             _encode_var_int(length $raw_mqtt) . # 3.15.1 Packet length
1661             $raw_mqtt
1662             );
1663              
1664 0         0 1;
1665             }
1666              
1667             sub _receive_auth {
1668 0     0   0 my ($self, $packet) = @_;
1669              
1670             # Handle abbreviated packet
1671 0 0       0 $$packet = "\x00\x00" if (length $$packet == 0);
1672              
1673             # 3.15.2.1 Authenticate Reason Code (byte)
1674 0         0 my $offs = 0;
1675 0         0 my $reason_code = _decode_byte($packet, \$offs);
1676 0         0 my $reason = $Reason_code{$reason_code};
1677              
1678             # 3.15.2.2.1 Property Length (variable length int)
1679 0         0 my $prop_len = _decode_var_int($packet, \$offs);
1680 0         0 my $prop_end = $offs + $prop_len;
1681              
1682 0         0 my %prop = (
1683             reason_code => $reason_code,
1684             reason => $reason,
1685             );
1686              
1687 0         0 while ($offs < $prop_end) {
1688              
1689 0         0 my $prop_id = _decode_byte($packet, \$offs);
1690              
1691 0 0       0 if ($prop_id == MQTT_AUTHENTICATION_METHOD) {
    0          
    0          
    0          
1692             # 3.15.2.2.2 Authentication Method (utf8 string)
1693 0         0 $prop{'authentication_method'} = _decode_utf8_str($packet, \$offs);
1694             }
1695             elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
1696             # 3.15.2.2.3 Authentication Data (binary data)
1697 0         0 $prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
1698             }
1699             elsif ($prop_id == MQTT_REASON_STRING) {
1700             # 3.15.2.2.4 Reason String (utf8 string)
1701 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1702             }
1703             elsif ($prop_id == MQTT_USER_PROPERTY) {
1704             # 3.15.2.2.5 User Property (utf8 string pair)
1705 0         0 my $key = _decode_utf8_str($packet, \$offs);
1706 0         0 my $val = _decode_utf8_str($packet, \$offs);
1707 0         0 $prop{$key} = $val;
1708             }
1709             else {
1710             # Protocol error
1711 0         0 $self->_fatal("Received AUTH with unexpected property $prop_id");
1712             }
1713             }
1714              
1715 0         0 my $auth_cb = delete $self->{packet_cb}->{'auth'};
1716              
1717 0 0       0 $auth_cb->(\%prop) if $auth_cb;
1718             }
1719              
1720              
1721             sub flush_buffer {
1722 0     0 1 0 my ($self, %args) = @_;
1723              
1724 0         0 my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
1725              
1726             # Nothing to do if nothing was buffered
1727 0 0       0 return unless $buffer;
1728              
1729 0         0 $self->{handle}->push_write( $buffer->{raw_mqtt} );
1730              
1731 0 0 0     0 if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {
1732              
1733             # Kernel write buffer is full, see publish() above
1734              
1735             # Make AnyEvent allow one level of recursive condvar blocking
1736 0 0       0 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
1737 0         0 local $AE_WAITING = 1;
1738 0         0 local $AnyEvent::CondVar::Base::WAITING = 0;
1739              
1740 0         0 my $flushed = AnyEvent->condvar;
1741 0         0 $self->{handle}->on_drain( $flushed );
1742 0         0 $flushed->recv;
1743 0         0 $self->{handle}->on_drain(); # clear
1744             }
1745              
1746 0         0 1;
1747             }
1748              
1749             sub discard_buffer {
1750 0     0 1 0 my ($self, %args) = @_;
1751              
1752 0         0 my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
1753              
1754             # Nothing to do if nothing was buffered
1755 0 0       0 return unless $buffer;
1756              
1757             # Remove all pending puback callbacks, as those will never be executed
1758 0         0 foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
  0         0  
1759 0         0 delete $self->{packet_cb}->{$packet_id};
1760             }
1761              
1762 0         0 1;
1763             }
1764              
1765              
1766             sub DESTROY {
1767 13     13   41 my $self = shift;
1768             # Disconnect gracefully from server if already connected
1769 13 50       198 return unless defined $self->{handle};
1770 0           $self->disconnect;
1771             }
1772              
1773             1;
1774              
1775             __END__