File Coverage

lib/Beekeeper/MQTT.pm
Criterion Covered Total %
statement 523 855 61.1
branch 132 370 35.6
condition 13 38 34.2
subroutine 71 97 73.2
pod 9 17 52.9
total 748 1377 54.3


line stmt bran cond sub pod time code
1             package Beekeeper::MQTT;
2              
3 11     11   1250 use strict;
  11         25  
  11         361  
4 11     11   60 use warnings;
  11         24  
  11         492  
5              
6             our $VERSION = '0.09';
7              
8 11     11   1239 use AnyEvent;
  11         5804  
  11         252  
9 11     11   9102 use AnyEvent::Handle;
  11         103917  
  11         653  
10 11     11   8135 use Time::HiRes;
  11         17095  
  11         51  
11 11     11   1499 use List::Util 'shuffle';
  11         28  
  11         1172  
12 11     11   128 use Scalar::Util 'weaken';
  11         24  
  11         551  
13 11     11   69 use Exporter 'import';
  11         122  
  11         318  
14 11     11   184 use Carp;
  11         45  
  11         2811  
15              
16             our @EXPORT_OK;
17             our %EXPORT_TAGS;
18              
19             our $DEBUG = 0;
20              
21             EXPORT: {
22             my (@const, @encode);
23             foreach (keys %{Beekeeper::MQTT::}) {
24             push @const, $_ if m/^MQTT_/;
25             push @encode, $_ if m/^_(en|de)code/;
26             }
27             @EXPORT_OK = (@const, @encode);
28             $EXPORT_TAGS{'const'} = \@const;
29             $EXPORT_TAGS{'decode'} = \@encode;
30             }
31              
32             # 2.1.2 Control Packet type
33              
34 11     11   99 use constant MQTT_CONNECT => 0x01;
  11         33  
  11         994  
35 11     11   69 use constant MQTT_CONNACK => 0x02;
  11         39  
  11         661  
36 11     11   77 use constant MQTT_PUBLISH => 0x03;
  11         21  
  11         697  
37 11     11   72 use constant MQTT_PUBACK => 0x04;
  11         26  
  11         675  
38 11     11   72 use constant MQTT_PUBREC => 0x05;
  11         26  
  11         672  
39 11     11   76 use constant MQTT_PUBREL => 0x06;
  11         21  
  11         750  
40 11     11   83 use constant MQTT_PUBCOMP => 0x07;
  11         46  
  11         589  
41 11     11   70 use constant MQTT_SUBSCRIBE => 0x08;
  11         34  
  11         734  
42 11     11   81 use constant MQTT_SUBACK => 0x09;
  11         24  
  11         714  
43 11     11   76 use constant MQTT_UNSUBSCRIBE => 0x0A;
  11         24  
  11         601  
44 11     11   68 use constant MQTT_UNSUBACK => 0x0B;
  11         30  
  11         587  
45 11     11   71 use constant MQTT_PINGREQ => 0x0C;
  11         21  
  11         500  
46 11     11   61 use constant MQTT_PINGRESP => 0x0D;
  11         21  
  11         536  
47 11     11   71 use constant MQTT_DISCONNECT => 0x0E;
  11         18  
  11         580  
48 11     11   87 use constant MQTT_AUTH => 0x0F;
  11         29  
  11         613  
49              
50             # 2.2.2.2 Properties
51              
52 11     11   69 use constant MQTT_PAYLOAD_FORMAT_INDICATOR => 0x01; # byte PUBLISH, Will Properties
  11         29  
  11         566  
53 11     11   72 use constant MQTT_MESSAGE_EXPIRY_INTERVAL => 0x02; # long int PUBLISH, Will Properties
  11         21  
  11         598  
54 11     11   68 use constant MQTT_CONTENT_TYPE => 0x03; # utf8 string PUBLISH, Will Properties
  11         22  
  11         577  
55 11     11   82 use constant MQTT_RESPONSE_TOPIC => 0x08; # utf8 string PUBLISH, Will Properties
  11         41  
  11         649  
56 11     11   72 use constant MQTT_CORRELATION_DATA => 0x09; # binary data PUBLISH, Will Properties
  11         34  
  11         582  
57 11     11   68 use constant MQTT_SUBSCRIPTION_IDENTIFIER => 0x0B; # variable int PUBLISH, SUBSCRIBE
  11         50  
  11         585  
58 11     11   73 use constant MQTT_SESSION_EXPIRY_INTERVAL => 0x11; # long int CONNECT, CONNACK, DISCONNECT
  11         18  
  11         643  
59 11     11   82 use constant MQTT_ASSIGNED_CLIENT_IDENTIFIER => 0x12; # utf8 string CONNACK
  11         31  
  11         745  
60 11     11   71 use constant MQTT_SERVER_KEEP_ALIVE => 0x13; # short int CONNACK
  11         26  
  11         598  
61 11     11   70 use constant MQTT_AUTHENTICATION_METHOD => 0x15; # utf8 string CONNECT, CONNACK, AUTH
  11         23  
  11         520  
62 11     11   65 use constant MQTT_AUTHENTICATION_DATA => 0x16; # binary data CONNECT, CONNACK, AUTH
  11         19  
  11         648  
63 11     11   77 use constant MQTT_REQUEST_PROBLEM_INFORMATION => 0x17; # byte CONNECT
  11         20  
  11         724  
64 11     11   73 use constant MQTT_WILL_DELAY_INTERVAL => 0x18; # long int Will Properties
  11         27  
  11         557  
65 11     11   114 use constant MQTT_REQUEST_RESPONSE_INFORMATION => 0x19; # byte CONNECT
  11         27  
  11         603  
66 11     11   70 use constant MQTT_RESPONSE_INFORMATION => 0x1A; # utf8 string CONNACK
  11         20  
  11         564  
67 11     11   86 use constant MQTT_SERVER_REFERENCE => 0x1C; # utf8 string CONNACK, DISCONNECT
  11         31  
  11         679  
68 11     11   74 use constant MQTT_REASON_STRING => 0x1F; # utf8 string CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
  11         20  
  11         536  
69 11     11   70 use constant MQTT_RECEIVE_MAXIMUM => 0x21; # short int CONNECT, CONNACK
  11         42  
  11         538  
70 11     11   66 use constant MQTT_TOPIC_ALIAS_MAXIMUM => 0x22; # short int CONNECT, CONNACK
  11         21  
  11         604  
71 11     11   67 use constant MQTT_TOPIC_ALIAS => 0x23; # short int PUBLISH
  11         20  
  11         546  
72 11     11   68 use constant MQTT_MAXIMUM_QOS => 0x24; # byte CONNACK
  11         27  
  11         525  
73 11     11   71 use constant MQTT_RETAIN_AVAILABLE => 0x25; # byte CONNACK
  11         20  
  11         556  
74 11     11   68 use constant MQTT_USER_PROPERTY => 0x26; # utf8 pair CONNECT, CONNACK, PUBLISH, Will Properties, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, AUTH
  11         38  
  11         629  
75 11     11   86 use constant MQTT_MAXIMUM_PACKET_SIZE => 0x27; # long int CONNECT, CONNACK
  11         58  
  11         676  
76 11     11   75 use constant MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE => 0x28; # byte CONNACK
  11         27  
  11         539  
77 11     11   71 use constant MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE => 0x29; # byte CONNACK
  11         18  
  11         530  
78 11     11   323 use constant MQTT_SHARED_SUBSCRIPTION_AVAILABLE => 0x2A; # byte CONNACK
  11         30  
  11         100929  
79              
80             # 2.4 Reason Code
81              
82             my %Reason_code = (
83             0x00 => 'Success', # CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH
84             # 0x00 => 'Normal disconnection', # DISCONNECT
85             # 0x00 => 'Granted QoS 0', # SUBACK
86             # 0x01 => 'Granted QoS 1', # SUBACK
87             # 0x02 => 'Granted QoS 2', # SUBACK
88             0x04 => 'Disconnect with Will Message', # DISCONNECT
89             0x10 => 'No matching subscribers', # PUBACK, PUBREC
90             0x11 => 'No subscription existed', # UNSUBACK
91             0x18 => 'Continue authentication', # AUTH
92             0x19 => 'Re-authenticate', # AUTH
93             0x80 => 'Unspecified error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
94             0x81 => 'Malformed Packet', # CONNACK, DISCONNECT
95             0x82 => 'Protocol Error', # CONNACK, DISCONNECT
96             0x83 => 'Implementation specific error', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
97             0x84 => 'Unsupported Protocol Version', # CONNACK
98             0x85 => 'Client Identifier not valid', # CONNACK
99             0x86 => 'Bad User Name or Password', # CONNACK
100             0x87 => 'Not authorized', # CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
101             0x88 => 'Server unavailable', # CONNACK
102             0x89 => 'Server busy', # CONNACK, DISCONNECT
103             0x8A => 'Banned', # CONNACK
104             0x8B => 'Server shutting down', # DISCONNECT
105             0x8C => 'Bad authentication method', # CONNACK, DISCONNECT
106             0x8D => 'Keep Alive timeout', # DISCONNECT
107             0x8E => 'Session taken over', # DISCONNECT
108             0x8F => 'Topic Filter invalid', # SUBACK, UNSUBACK, DISCONNECT
109             0x90 => 'Topic Name invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT
110             0x91 => 'Packet Identifier in use', # PUBACK, PUBREC, SUBACK, UNSUBACK
111             0x92 => 'Packet Identifier not found', # PUBREL, PUBCOMP
112             0x93 => 'Receive Maximum exceeded', # DISCONNECT
113             0x94 => 'Topic Alias invalid', # DISCONNECT
114             0x95 => 'Packet too large', # CONNACK, DISCONNECT
115             0x96 => 'Message rate too high', # DISCONNECT
116             0x97 => 'Quota exceeded', # CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT
117             0x98 => 'Administrative action', # DISCONNECT
118             0x99 => 'Payload format invalid', # CONNACK, PUBACK, PUBREC, DISCONNECT
119             0x9A => 'Retain not supported', # CONNACK, DISCONNECT
120             0x9B => 'QoS not supported', # CONNACK, DISCONNECT
121             0x9C => 'Use another server', # CONNACK, DISCONNECT
122             0x9D => 'Server moved', # CONNACK, DISCONNECT
123             0x9E => 'Shared Subscriptions not supported', # SUBACK, DISCONNECT
124             0x9F => 'Connection rate exceeded', # CONNACK, DISCONNECT
125             0xA0 => 'Maximum connect time', # DISCONNECT
126             0xA1 => 'Subscription Identifiers not supported', # SUBACK, DISCONNECT
127             0xA2 => 'Wildcard Subscriptions not supported', # SUBACK, DISCONNECT
128             );
129              
130             # 3.9.3 Subscribe Reason Codes
131              
132             my %Subscribe_reason_code = (
133             %Reason_code,
134             0x00 => 'Granted QoS 0',
135             0x01 => 'Granted QoS 1',
136             0x02 => 'Granted QoS 2',
137             );
138              
139             # 3.14.2.1 Disconnect Reason Code
140              
141             my %Disconnect_reason_code = (
142             %Reason_code,
143             0x00 => 'Normal disconnection',
144             );
145              
146             sub _decode_byte {
147 76     76   168 my ($packet, $offs) = @_;
148              
149 76         279 my $byte = unpack("C", substr($$packet, $$offs, 1));
150 76         204 $$offs += 1;
151              
152 76         340 return $byte;
153             }
154              
155             sub _decode_int_16 {
156 19     19   45 my ($packet, $offs) = @_;
157              
158 19         91 my $int = unpack("n", substr($$packet, $$offs, 2));
159 19         81 $$offs += 2;
160              
161 19         71 return $int;
162             }
163              
164             sub _decode_int_32 {
165 0     0   0 my ($packet, $offs) = @_;
166              
167 0         0 my $int = unpack("N", substr($$packet, $$offs, 4));
168 0         0 $$offs += 4;
169              
170 0         0 return $int;
171             }
172              
173             sub _decode_utf8_str {
174 0     0   0 my ($packet, $offs) = @_;
175              
176 0         0 my $str = unpack("n/a", substr($$packet, $$offs));
177 0         0 $$offs += 2 + length($str);
178 0         0 utf8::decode($str);
179              
180 0         0 return $str;
181             }
182              
183             sub _decode_binary_data {
184 0     0   0 my ($packet, $offs) = @_;
185              
186 0         0 my $data = unpack("n/a", substr($$packet, $$offs));
187 0         0 $$offs += 2 + length($data);
188              
189 0         0 return $data;
190             }
191              
192             sub _decode_var_int {
193 206     206   474 my ($packet, $offs) = @_;
194              
195 206         396 my $int = 0;
196 206         333 my $mult = 1;
197 206         325 my $byte;
198              
199 206         367 do {
200 206         867 $byte = unpack("C", substr($$packet, $$offs, 1));
201 206         847 $int += ($byte & 0x7F) * $mult;
202 206         401 $mult *= 128;
203 206         815 $$offs++;
204             } while ($byte & 0x80);
205              
206 206         978 return $int;
207             }
208              
209             sub _encode_var_int {
210 301 100   301   1894 return pack("C", $_[0]) if ($_[0] < 128);
211 30         321 my @a = unpack("C*", pack("w", $_[0]));
212 30         113 $a[0] &= 0x7F;
213 30         67 $a[-1] |= 0x80;
214 30         16714 return pack("C*", reverse @a);
215             }
216              
217              
218             sub new {
219 19     19 1 10372 my ($class, %args) = @_;
220              
221 19         714 my $self = {
222             bus_id => undef,
223             bus_role => undef,
224             handle => undef, # the socket
225             hosts => undef, # list of all hosts in cluster
226             is_connected => undef, # true once connected
227             try_hosts => undef, # list of hosts to try to connect
228             connect_err => undef, # count of connection errors
229             timeout_tmr => undef, # timer used for connection timeout
230             reconnect_tmr => undef, # timer used for connection retry
231             connack_cb => undef, # connack callback
232             error_cb => undef, # error callback
233             client_id => undef, # client id
234             server_prop => {}, # server properties
235             server_alias => {}, # server topic aliases
236             client_alias => {}, # client topic aliases
237             subscriptions => {}, # topic subscriptions
238             subscr_cb => {}, # subscription callbacks
239             packet_cb => {}, # packet callbacks
240             buffers => {}, # raw mqtt buffers
241             packet_seq => 1, # sequence used for packet ids
242             subscr_seq => 1, # sequence used for subscription ids
243             alias_seq => 1, # sequence used for topic alias ids
244             use_alias => 0, # topic alias enabled
245             config => \%args,
246             };
247              
248 19         132 $self->{bus_id} = delete $args{'bus_id'};
249 19   33     218 $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
250 19         78 $self->{error_cb} = delete $args{'on_error'};
251              
252 19         122 bless $self, $class;
253 19         175 return $self;
254             }
255              
256 0     0 0 0 sub bus_id { $_[0]->{bus_id} }
257 0     0 0 0 sub bus_role { $_[0]->{bus_role} }
258              
259             sub _fatal {
260 0     0   0 my ($self, $errstr) = @_;
261 0 0       0 die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
262 0         0 $self->{error_cb}->($errstr);
263             }
264              
265             our $BUSY_SINCE = undef;
266             our $BUSY_TIME = 0;
267              
268             sub connect {
269 19     19 1 209 my ($self, %args) = @_;
270              
271 19         305 $self->{connack_cb} = $args{'on_connack'};
272 19         804 $self->{connect_cv} = AnyEvent->condvar;
273              
274 19         112045 $self->_connect;
275              
276 19 50       237 $self->{connect_cv}->recv if $args{'blocking'};
277 19         755 $self->{connect_cv} = undef;
278              
279 19 50       119 return $args{'blocking'} ? $self->{is_connected} : 1;
280             }
281              
282             sub _connect {
283 19     19   79 my ($self) = @_;
284 19         131 weaken($self);
285              
286 19         56 my $config = $self->{config};
287              
288 19         51 my $timeout = $config->{'timeout'};
289 19 50       91 $timeout = 30 unless defined $timeout;
290              
291             # Ensure that timeout is set properly when the event loop was blocked
292 19         185 AnyEvent->now_update;
293              
294             # Connection timeout handler
295 19 50 33     447 if ($timeout && !$self->{timeout_tmr}) {
296             $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub {
297 0     0   0 $self->_reset_connection;
298 0         0 $self->{connect_cv}->send;
299 0         0 $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
300 19         542 });
301             }
302              
303 19 50       756 unless ($self->{hosts}) {
304             # Initialize the list of cluster hosts
305 19   50     91 my $hosts = $config->{'host'} || 'localhost';
306 19 50       124 my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
307 19         113 $self->{hosts} = [ shuffle @hosts ];
308             }
309              
310             # Determine next host of cluster to connect to
311 19   50     166 my $try_hosts = $self->{try_hosts} ||= [];
312 19 50       84 @$try_hosts = @{$self->{hosts}} unless @$try_hosts;
  19         67  
313              
314             # TCP connection args
315 19         67 my $host = shift @$try_hosts;
316 19   50     114 my $tls = $config->{'tls'} || 0;
317 19   33     75 my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );
318              
319 19         163 ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
320 19         130 ($port) = ($port =~ m/^([0-9]+)$/s);
321              
322             $self->{handle} = AnyEvent::Handle->new(
323             connect => [ $host, $port ],
324             tls => $tls ? 'connect' : undef,
325             keepalive => 1,
326             no_delay => 1,
327             on_connect => sub {
328 19     19   7837 my ($fh, $host, $port) = @_;
329             # Send CONNECT packet
330 19         77 $self->{server_prop}->{host} = $host;
331 19         49 $self->{server_prop}->{port} = $port;
332 19         122 $self->_send_connect;
333             },
334             on_connect_error => sub {
335 0     0   0 my ($fh, $errmsg) = @_;
336             # Some error occurred while connection, such as an unresolved hostname
337             # or connection refused. Try next host of cluster immediately, or retry
338             # in few seconds if all hosts of the cluster are unresponsive
339 0         0 $self->{connect_err}++;
340 0 0       0 warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
  0         0  
341 0 0       0 my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
  0         0  
  0         0  
342             $self->{reconnect_tmr} = AnyEvent->timer(
343             after => ($delay < 10 ? $delay : 10),
344 0         0 cb => sub { $self->_connect },
345 0 0       0 );
346             },
347             on_error => sub {
348 0     0   0 my ($fh, $fatal, $errmsg) = @_;
349             # Some error occurred, such as a read error
350 0         0 $self->_reset_connection;
351 0         0 $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
352             },
353             on_eof => sub {
354 0     0   0 my ($fh) = @_;
355             # The server has closed the connection without sending DISCONNECT
356 0         0 $self->_reset_connection;
357 0         0 $self->_fatal("MQTT broker at $host:$port has gone away");
358             },
359             on_read => sub {
360 272     272   2243605 my ($fh) = @_;
361              
362 272         2035 my $packet_type;
363             my $packet_flags;
364              
365 272         0 my $rbuff_len;
366 272         0 my $packet_len;
367              
368 272         0 my $mult;
369 272         0 my $offs;
370 272         0 my $byte;
371              
372 272         0 my $timing_packets;
373              
374 272 50       947 unless (defined $BUSY_SINCE) {
375             # Measure time elapsed while processing incoming packets
376 272         856 $BUSY_SINCE = Time::HiRes::time;
377 272         584 $timing_packets = 1;
378             }
379              
380             PARSE_PACKET: {
381              
382 272         819 $rbuff_len = length $fh->{rbuf};
  464         1346  
383              
384 464 100       1865 last PARSE_PACKET unless $rbuff_len >= 2;
385              
386 285 50       859 unless ($packet_type) {
387              
388 285         475 $packet_len = 0;
389 285         458 $mult = 1;
390 285         487 $offs = 1;
391              
392             PARSE_LEN: {
393 285         443 $byte = unpack "C", substr( $fh->{rbuf}, $offs++, 1 );
  577         2412  
394 577         2130 $packet_len += ($byte & 0x7f) * $mult;
395 577 100       1942 last unless ($byte & 0x80);
396 292 50       669 last PARSE_PACKET if ($offs >= $rbuff_len); # Not enough data
397 292         414 $mult *= 128;
398 292 50       722 redo if ($offs < 5);
399             }
400              
401             #TODO: Check max packet size
402              
403 285         1219 $byte = unpack('C', substr( $fh->{rbuf}, 0, 1 ));
404 285         886 $packet_type = $byte >> 4;
405 285         889 $packet_flags = $byte & 0x0F;
406             }
407              
408 285 100       1045 if ($rbuff_len < ($offs + $packet_len)) {
409             # Not enough data
410 93         204 last PARSE_PACKET;
411             }
412              
413             # Consume packet from buffer
414 192         15354 my $packet = substr($fh->{rbuf}, 0, ($offs + $packet_len), '');
415              
416             # Trim fixed header from packet
417 192         573 substr($packet, 0, $offs, '');
418              
419 192 100       1321 if ($packet_type == MQTT_PUBLISH) {
    100          
    50          
    50          
    50          
    50          
    50          
    100          
    50          
    50          
    0          
    0          
420              
421 84         580 $self->_receive_publish(\$packet, $packet_flags);
422             }
423             elsif ($packet_type == MQTT_PUBACK) {
424              
425 70         529 $self->_receive_puback(\$packet);
426             }
427             elsif ($packet_type == MQTT_PUBREC) {
428              
429 0         0 $self->_receive_pubrec(\$packet);
430             }
431             elsif ($packet_type == MQTT_PUBREL) {
432            
433 0         0 $self->_receive_pubrel(\$packet);
434             }
435             elsif ($packet_type == MQTT_PUBCOMP) {
436              
437 0         0 $self->_receive_pubcomp(\$packet);
438             }
439             elsif ($packet_type == MQTT_PINGREQ) {
440              
441 0         0 $self->pingresp;
442             }
443             elsif ($packet_type == MQTT_PINGRESP) {
444              
445             # Client takes no action on receiving PINGRESP
446             }
447             elsif ($packet_type == MQTT_SUBACK) {
448              
449 19         158 $self->_receive_suback(\$packet);
450             }
451             elsif ($packet_type == MQTT_UNSUBACK) {
452              
453 0         0 $self->_receive_unsuback(\$packet);
454             }
455             elsif ($packet_type == MQTT_CONNACK) {
456              
457 19         122 $self->_receive_connack(\$packet);
458             }
459             elsif ($packet_type == MQTT_DISCONNECT) {
460              
461 0         0 $self->_receive_disconnect(\$packet);
462             }
463             elsif ($packet_type == MQTT_AUTH) {
464              
465 0         0 $self->_receive_auth(\$packet);
466             }
467             else {
468             # Protocol error
469 0         0 $self->_fatal("Received packet with unknown type $packet_type");
470             }
471              
472             # Prepare for next frame
473 192         19382 undef $packet_type;
474              
475             # Handle could have been destroyed at this point
476 192 50       4242 redo PARSE_PACKET if defined $fh->{rbuf};
477             }
478              
479 272 50       786 if (defined $timing_packets) {
480 272         873 $BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
481 272         926 undef $BUSY_SINCE;
482             }
483             },
484 19 50       838 );
485              
486 19         15430 1;
487             }
488              
489             sub _send_connect {
490 19     19   56 my ($self) = @_;
491              
492 19         41 my %prop = %{$self->{config}};
  19         189  
493              
494 19         91 my $username = delete $prop{'username'};
495 19         48 my $password = delete $prop{'password'};
496 19         60 my $client_id = delete $prop{'client_id'};
497 19         46 my $clean_start = delete $prop{'clean_start'};
498 19         41 my $keep_alive = delete $prop{'keep_alive'};
499 19         572 my $will = delete $prop{'will'};
500              
501 19 50       80 unless ($client_id) {
502 19         82 $client_id = '';
503 19         474 $client_id .= ('0'..'9','a'..'z','A'..'Z')[rand 62] for (1..22);
504             }
505              
506 19         322 $self->{client_id} = $client_id;
507              
508              
509             # 3.1.2.11 Properties
510              
511 19         86 my $raw_prop = '';
512              
513 19 100       97 if (exists $prop{'session_expiry_interval'}) {
514             # 3.1.2.11.2 Session Expiry Interval (long int)
515 6         45 $raw_prop .= pack("C N", MQTT_SESSION_EXPIRY_INTERVAL, delete $prop{'session_expiry_interval'});
516             }
517              
518 19 100       62 if (exists $prop{'receive_maximum'}) {
519             # 3.1.2.11.3 Receive Maximum (short int)
520 8         38 $raw_prop .= pack("C n", MQTT_RECEIVE_MAXIMUM, delete $prop{'receive_maximum'});
521             }
522              
523 19 50       57 if (exists $prop{'maximum_packet_size'}) {
524             # 3.1.2.11.4 Maximum Packet Size (long int)
525 0         0 $raw_prop .= pack("C N", MQTT_MAXIMUM_PACKET_SIZE, delete $prop{'maximum_packet_size'});
526             }
527              
528 19 100       58 if (exists $prop{'topic_alias_maximum'}) {
529             # 3.1.2.11.5 Topic Alias Maximum (short int)
530 6         25 $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS_MAXIMUM, delete $prop{'topic_alias_maximum'});
531             }
532              
533 19 50       63 if (exists $prop{'request_response_information'}) {
534             # 3.1.2.11.6 Request Response Information (byte)
535 0         0 $raw_prop .= pack("C C", MQTT_REQUEST_RESPONSE_INFORMATION, delete $prop{'request_response_information'});
536             }
537              
538 19 50       61 if (exists $prop{'request_problem_information'}) {
539             # 3.1.2.11.7 Request Problem Information (byte)
540 0         0 $raw_prop .= pack("C C", MQTT_REQUEST_PROBLEM_INFORMATION, delete $prop{'request_problem_information'});
541             }
542              
543 19 50       67 if (exists $prop{'authentication_method'}) {
544             # 3.1.2.11.9 Authentication Method (utf8 string)
545 0         0 utf8::encode( $prop{'authentication_method'} );
546 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $prop{'authentication_method'});
547             }
548              
549 19 50       55 if (exists $prop{'authentication_data'}) {
550             # 3.1.2.11.10 Authentication Data (binary data)
551 0         0 $raw_prop .= pack("C n", MQTT_AUTHENTICATION_DATA, delete $prop{'authentication_data'});
552             }
553              
554 19         112 foreach my $key (keys %prop) {
555             # 3.1.2.11.8 User Property (utf8 string pair)
556 38         97 my $val = $prop{$key};
557 38 50       97 next unless defined $val;
558 38         119 utf8::encode( $key );
559 38         89 utf8::encode( $val );
560 38         176 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
561             }
562              
563              
564             # 3.1.2 Variable Header
565              
566             # 3.1.2.1 Protocol Name (utf8 string)
567 19         57 my $raw_mqtt = pack("n/a*", "MQTT");
568              
569             # 3.1.2.2 Protocol Version (byte)
570 19         40 $raw_mqtt .= pack("C", 5);
571              
572             # 3.1.2.3 Connect Flags (byte)
573 19         35 my $flags = 0;
574 19 100       64 $flags |= 0x02 if $clean_start; # 3.1.2.4 Clean Start
575 19 50       160 $flags |= 0x80 if defined $username; # 3.1.2.8 User Name Flag
576 19 50       57 $flags |= 0x40 if defined $password; # 3.1.2.9 Password Flag
577              
578 19 50       49 if ($will) {
579 0         0 $flags |= 0x04; # 3.1.2.5 Will Flag
580 0         0 $flags |= $will->{'qos'} << 3; # 3.1.2.6 Will QoS
581 0 0       0 $flags |= 0x20 if $will->{'retain'}; # 3.1.2.7 Will Retain
582             }
583              
584 19         48 $raw_mqtt .= pack("C", $flags);
585              
586             # 3.1.2.10 Keep Alive (short int)
587 19   50     205 $raw_mqtt .= pack("n", $keep_alive || 0);
588              
589             # 3.1.2.11 Properties
590 19         90 $raw_mqtt .= _encode_var_int(length $raw_prop);
591 19         56 $raw_mqtt .= $raw_prop;
592              
593              
594             # 3.1.3 Payload
595              
596             # 3.1.3.1 Client Identifier (utf8 string)
597 19         63 $raw_mqtt .= pack("n/a*", $client_id);
598              
599 19 50       73 if ($will) {
600              
601             #TODO: 3.1.3.2 Will Properties
602 0         0 my $will_prop = '';
603              
604 0         0 $raw_mqtt .= _encode_var_int(length $will_prop);
605 0         0 $raw_mqtt .= $will_prop;
606              
607             # 3.1.3.3 Will Topic (utf8 string)
608 0         0 utf8::encode( $will->{'topic'});
609 0         0 $raw_mqtt .= pack("n/a*", $will->{'topic'});
610              
611             # 3.1.3.4 Will Payload (binary data)
612 0         0 $raw_mqtt .= pack("n/a*", $will->{'payload'});
613             }
614              
615 19 50       71 if (defined $username) {
616             # 3.1.3.5 Username (utf8 string)
617 19         57 utf8::encode( $username );
618 19         61 $raw_mqtt .= pack("n/a*", $username);
619             }
620            
621 19 50       77 if (defined $password) {
622             # 3.1.3.6 Password (binary data)
623 19         49 $raw_mqtt .= pack("n/a*", $password);
624             }
625              
626             $self->{handle}->push_write(
627 19         93 pack("C", MQTT_CONNECT << 4) . # 3.1.1 Packet type
628             _encode_var_int(length $raw_mqtt) . # 3.1.1 Packet length
629             $raw_mqtt
630             );
631             }
632              
633             sub _receive_connack {
634 19     19   70 my ($self, $packet) = @_;
635              
636 19         61 my $prop = $self->{server_prop};
637 19         32 my $offs = 0;
638              
639             # 3.2.2.1 Acknowledge flags (byte)
640 19         140 my $ack_flags = _decode_byte($packet, \$offs);
641 19         136 $prop->{'session_present'} = $ack_flags & 0x01;
642              
643             # 3.2.2.2 Reason code (byte)
644 19         69 my $reason_code = _decode_byte($packet, \$offs);
645 19         112 $prop->{'reason_code'} = $reason_code;
646 19         153 $prop->{'reason'} = $Reason_code{$reason_code};
647            
648             # 3.2.2.3.1 Properties Length (variable length int)
649 19         83 my $prop_len = _decode_var_int($packet, \$offs);
650 19         83 my $prop_end = $offs + $prop_len;
651              
652 19         106 while ($offs < $prop_end) {
653              
654 19         60 my $prop_id = _decode_byte($packet, \$offs);
655              
656 19 50       169 if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    50          
    50          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
    0          
657             # 3.2.2.3.2 Session Expiry Interval (long int)
658 0         0 $prop->{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
659             }
660             elsif ($prop_id == MQTT_RECEIVE_MAXIMUM) {
661             # 3.2.2.3.3 Receive Maximum (short int)
662 0         0 $prop->{'receive_maximum'} = _decode_int_16($packet, \$offs);
663             }
664             elsif ($prop_id == MQTT_MAXIMUM_QOS) {
665             # 3.2.2.3.4 Maximum QoS (byte)
666 19         62 $prop->{'maximum_qos'} = _decode_byte($packet, \$offs);
667             }
668             elsif ($prop_id == MQTT_RETAIN_AVAILABLE) {
669             # 3.2.2.3.5 Retain Available (byte)
670 0         0 $prop->{'retain_available'} = _decode_byte($packet, \$offs);
671             }
672             elsif ($prop_id == MQTT_MAXIMUM_PACKET_SIZE) {
673             # 3.2.2.3.6 Maximum Packet Size (long int)
674 0         0 $prop->{'maximum_packet_size'} = _decode_int_32($packet, \$offs);
675             }
676             elsif ($prop_id == MQTT_ASSIGNED_CLIENT_IDENTIFIER) {
677             # 3.2.2.3.7 Assigned Client Identifier (utf8 string)
678 0         0 $prop->{'assigned_client_identifier'} = _decode_utf8_str($packet, \$offs);
679             }
680             elsif ($prop_id == MQTT_TOPIC_ALIAS_MAXIMUM) {
681             # 3.2.2.3.8 Topic Alias Maximum (short int)
682 0         0 $prop->{'topic_alias_maximum'} = _decode_int_16($packet, \$offs);
683             }
684             elsif ($prop_id == MQTT_REASON_STRING) {
685             # 3.2.2.3.9 Reason String (utf8 string)
686 0         0 $prop->{'reason_string'} = _decode_utf8_str($packet, \$offs);
687             }
688             elsif ($prop_id == MQTT_USER_PROPERTY) {
689             # 3.2.2.3.10 User Property (utf8 string pair)
690 0         0 my $key = _decode_utf8_str($packet, \$offs);
691 0         0 my $val = _decode_utf8_str($packet, \$offs);
692 0         0 $prop->{$key} = $val;
693             }
694             elsif ($prop_id == MQTT_WILDCARD_SUBSCRIPTION_AVAILABLE) {
695             # 3.2.2.3.11 Wildcard Subscription Available (byte)
696 0         0 $prop->{'wildcard_subscription_available'} = _decode_byte($packet, \$offs);
697             }
698             elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER_AVAILABLE) {
699             # 3.2.2.3.12 Subscription Identifiers Available (byte)
700 0         0 $prop->{'subscription_identifier_available'} = _decode_byte($packet, \$offs);
701             }
702             elsif ($prop_id == MQTT_SHARED_SUBSCRIPTION_AVAILABLE) {
703             # 3.2.2.3.13 Shared Subscription Available (byte)
704 0         0 $prop->{'shared_subscription_available'} = _decode_byte($packet, \$offs);
705             }
706             elsif ($prop_id == MQTT_SERVER_KEEP_ALIVE) {
707             # 3.2.2.3.14 Server Keep Alive (short int)
708 0         0 $prop->{'server_keep_alive'} = _decode_int_16($packet, \$offs);
709             }
710             elsif ($prop_id == MQTT_RESPONSE_INFORMATION) {
711             # 3.2.2.3.15 Response Information (utf8 string)
712 0         0 $prop->{'response_information'} = _decode_utf8_str($packet, \$offs);
713             }
714             elsif ($prop_id == MQTT_SERVER_REFERENCE) {
715             # 3.2.2.3.16 Server Reference (utf8 string)
716 0         0 $prop->{'server_reference'} = _decode_utf8_str($packet, \$offs);
717             }
718             elsif ($prop_id == MQTT_AUTHENTICATION_METHOD) {
719             # 3.2.2.3.17 Authentication Method (utf8 string)
720 0         0 $prop->{'authentication_method'} = _decode_utf8_str($packet, \$offs);
721             }
722             elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
723             # 3.2.2.3.18 Authentication Data (binary data)
724 0         0 $prop->{'authentication_data'} = _decode_binary_data($packet, \$offs);
725             }
726             else {
727             # Protocol error
728 0         0 $self->_fatal("Received CONNACK with unknown property $prop_id");
729             }
730             }
731              
732 19         77 my $success = ($reason_code == 0x00);
733              
734 19 50       60 unless ( $success ) {
735             # Server will close the connection
736             # warn "Served refused CONNACK: $reason";
737             #TODO: handle
738             }
739              
740 19         64 $self->{is_connected} = 1;
741 19         155 $self->{timeout_tmr} = undef;
742 19         50 $self->{reconnect_tmr} = undef;
743 19         49 $self->{connect_err} = undef;
744              
745             #TODO: ... blocking connection
746 19 50       165 $self->{connect_cv}->send if $self->{connect_cv};
747              
748             # Execute CONNACK callback
749 19         260 my $connack_cb = $self->{connack_cb};
750 19 50       91 $connack_cb->($success, $prop) if $connack_cb;
751             }
752              
753              
754             sub disconnect {
755 13     13 1 19869794 my ($self, %args) = @_;
756              
757 13 50       71 unless (defined $self->{handle}) {
758 0         0 carp "Already disconnected from MQTT broker";
759 0         0 return;
760             }
761              
762 13         31 my $reason_code = delete $args{'reason_code'};
763              
764             # 3.14.2.2 Properties
765              
766 13         43 my $raw_prop = '';
767              
768 13 50       45 if (exists $args{'session_expiry_interval'}) {
769             # 3.14.2.2.2 Session Expiry Interval (long int)
770 0         0 utf8::encode( $args{'session_expiry_interval'} );
771 0         0 $raw_prop .= pack("C n/a*", MQTT_SESSION_EXPIRY_INTERVAL, delete $args{'session_expiry_interval'});
772             }
773              
774 13 50       43 if (exists $args{'reason_string'}) {
775             # 3.14.2.2.3 Reason String (utf8 string)
776 0         0 utf8::encode( $args{'reason_string'} );
777 0         0 $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
778             }
779              
780 13 50       46 if (exists $args{'server_reference'}) {
781             # 3.14.2.2.5 Server Reference (utf8 string)
782 0         0 utf8::encode( $args{'server_reference'} );
783 0         0 $raw_prop .= pack("C n/a*", MQTT_SERVER_REFERENCE, delete $args{'server_reference'});
784             }
785              
786 13         74 foreach my $key (keys %args) {
787             # 3.14.2.2.4 User Property (utf8 string pair)
788 0         0 my $val = $args{$key};
789 0 0       0 next unless defined $val;
790 0         0 utf8::encode( $key );
791 0         0 utf8::encode( $val );
792 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
793             }
794              
795             # 3.14.2 Variable Header
796              
797             # 3.14.2.1 Disconnect Reason Code (byte)
798 13   50     143 my $raw_mqtt = pack("C", $reason_code || 0);
799              
800             # 3.14.2.2 Properties
801 13         63 $raw_mqtt .= _encode_var_int(length $raw_prop);
802 13         44 $raw_mqtt .= $raw_prop;
803              
804             $self->{handle}->push_write(
805 13         51 pack("C", MQTT_DISCONNECT << 4) . # 3.14.1 Packet type
806             _encode_var_int(length $raw_mqtt) . # 3.14.1 Packet length
807             $raw_mqtt
808             );
809              
810 13         6478 $self->_reset_connection;
811             }
812              
813             sub _reset_connection {
814 13     13   50 my ($self) = @_;
815              
816 13         91 $self->{handle} = undef;
817              
818 13         4900 $self->{is_connected} = undef;
819 13         37 $self->{reconnect_tmr} = undef;
820 13         33 $self->{timeout_tmr} = undef;
821 13         30 $self->{connect_err} = undef;
822              
823 13         91 $self->{server_prop} = {};
824 13         41 $self->{server_alias} = {};
825 13         35 $self->{client_alias} = {};
826 13         55 $self->{subscriptions} = {};
827 13         126 $self->{subscr_cb} = {};
828 13         38 $self->{packet_cb} = {};
829 13         37 $self->{buffers} = {};
830 13         39 $self->{packet_seq} = 1;
831 13         34 $self->{subscr_seq} = 1;
832 13         29 $self->{alias_seq} = 1;
833 13         182 $self->{use_alias} = 0;
834             }
835              
836             sub _receive_disconnect {
837 0     0   0 my ($self, $packet) = @_;
838              
839             # Handle abbreviated packet
840 0 0       0 $$packet = "\x00\x00" if (length $$packet == 0);
841 0 0       0 $$packet .= "\x00" if (length $$packet == 1);
842              
843             # 3.14.2.1 Reason Code (byte)
844 0         0 my $offs = 0;
845 0         0 my $reason_code = _decode_byte($packet, \$offs);
846 0         0 my $reason = $Disconnect_reason_code{$reason_code};
847              
848             # 3.14.2.2.1 Property Length (variable length int)
849 0         0 my $prop_len = _decode_var_int($packet, \$offs);
850 0         0 my $prop_end = $offs + $prop_len;
851              
852 0         0 my %prop = (
853             'reason_code' => $reason_code,
854             'reason' => $reason,
855             );
856              
857 0         0 while ($offs < $prop_end) {
858              
859 0         0 my $prop_id = _decode_byte($packet, \$offs);
860              
861 0 0       0 if ($prop_id == MQTT_SESSION_EXPIRY_INTERVAL) {
    0          
    0          
    0          
862             # 3.14.2.2.2 Session Expiry Interval (long int)
863 0         0 $prop{'session_expiry_interval'} = _decode_int_32($packet, \$offs);
864             }
865             elsif ($prop_id == MQTT_REASON_STRING) {
866             # 3.14.2.2.3 Reason String (utf8 string)
867 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
868             }
869             elsif ($prop_id == MQTT_USER_PROPERTY) {
870             # 3.14.2.2.4 User Property (utf8 string pair)
871 0         0 my $key = _decode_utf8_str($packet, \$offs);
872 0         0 my $val = _decode_utf8_str($packet, \$offs);
873 0         0 $prop{$key} = $val;
874             }
875             elsif ($prop_id == MQTT_SERVER_REFERENCE) {
876             # 3.14.2.2.5 Server Reference (utf8 string)
877 0         0 $prop{'server_reference'} = _decode_utf8_str($packet, \$offs);
878             }
879             else {
880             # Protocol error
881 0         0 $self->_fatal("Received DISCONNECT with unknown property $prop_id");
882             }
883             }
884              
885 0         0 $self->_reset_connection;
886              
887 0         0 my $disconn_cb = $self->{disconn_cb};
888              
889 0 0       0 if ($disconn_cb) {
890 0         0 $disconn_cb->(\%prop);
891             }
892             else {
893 0         0 $self->_fatal("Disconnected from MQTT broker: $prop{reason}");
894             }
895             }
896              
897              
898             sub pingreq {
899 0     0 0 0 my ($self) = @_;
900              
901             $self->{handle}->push_write(
902 0         0 pack( "C C",
903             MQTT_PINGREQ << 4, # 3.12.1 Packet type
904             0, # 3.12.1 Remaining length
905             )
906             );
907             }
908              
909             sub pingresp {
910 0     0 0 0 my ($self) = @_;
911              
912             $self->{handle}->push_write(
913 0         0 pack( "C C",
914             MQTT_PINGRESP << 4, # 3.13.1 Packet type
915             0, # 3.13.1 Remaining length
916             )
917             );
918             }
919              
920              
921             sub subscribe {
922 19     19 1 1998262 my ($self, %args) = @_;
923              
924 19         78 my $topic = delete $args{'topic'};
925 19         60 my $topics = delete $args{'topics'};
926 19         45 my $subscr_cb = delete $args{'on_publish'};
927 19         45 my $suback_cb = delete $args{'on_suback'};
928 19         38 my $max_qos = delete $args{'maximum_qos'};
929 19         43 my $no_local = delete $args{'no_local'};
930 19         51 my $retain_asp = delete $args{'retain_as_published'};
931 19         37 my $retain_hdl = delete $args{'retain_handling'};
932              
933 19 50       101 $topics = [] unless defined $topics;
934 19 50       99 push (@$topics, $topic) if defined $topic;
935              
936 19 50       95 croak "Subscription topics were not specified" unless @$topics;
937 19 50       55 croak "on_publish callback is required" unless $subscr_cb;
938              
939 19         65 foreach my $topic (@$topics) {
940 19 50       64 croak "Undefined subscription topic" unless defined $topic;
941 19 50       97 croak "Empty subscription topic" unless length $topic;
942             }
943              
944 19         66 my $packet_id = $self->{packet_seq}++;
945 19 50       75 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
946              
947             # Set callback for incomings PUBLISH
948 19         51 my $subscr_id = $self->{subscr_seq}++;
949 19         100 $self->{subscr_cb}->{$subscr_id} = $subscr_cb;
950              
951             # Parameters for expected SUBACK
952 19         207 $self->{packet_cb}->{$packet_id} = {
953             topics => [ @$topics ], # copy
954             subscr_id => $subscr_id,
955             suback_cb => $suback_cb,
956             };
957              
958             # 3.8.2.1.2 Subscription Identifier (variable len int)
959 19         78 my $raw_prop = pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) .
960             _encode_var_int($subscr_id);
961              
962             # 3.8.2.1.3 User Property (utf8 string pair)
963 19         85 foreach my $key (keys %args) {
964 0         0 my $val = $args{$key};
965 0 0       0 next unless defined $val;
966 0         0 utf8::encode( $key );
967 0         0 utf8::encode( $val );
968 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
969             }
970              
971 19         99 my $raw_mqtt = pack("n", $packet_id) . # 3.8.2 Packet identifier
972             _encode_var_int(length $raw_prop) . # 3.8.2.1 Properties Length
973             $raw_prop; # 3.8.2.1 Properties
974              
975             # 3.8.3.1 Subscription Options
976 19         56 my $options = 0;
977 19 100       52 $options |= ($max_qos & 0x03) if $max_qos; # Maximum QoS
978 19 50       57 $options |= 0x04 if $no_local; # No Local
979 19 50       60 $options |= 0x08 if $retain_asp; # Retain As Published
980 19 50       61 $options |= ($retain_hdl & 0x03) << 4 if $retain_hdl; # Retain Handling
981              
982             # 3.8.3 Payload
983 19         56 foreach my $topic (@$topics) {
984 19         90 utf8::encode( $topic );
985 19         110 $raw_mqtt .= pack("n/a* C", $topic, $options);
986             }
987              
988             $self->{handle}->push_write(
989 19         5125 pack("C", MQTT_SUBSCRIBE << 4 | 0x02) . # 3.8.1 Packet type
990             _encode_var_int(length $raw_mqtt) . # 3.8.1 Packet length
991             $raw_mqtt
992             );
993              
994 19         8678 1;
995             }
996              
997             sub _receive_suback {
998 19     19   64 my ($self, $packet) = @_;
999              
1000             # 3.9.2 Packet id (short int)
1001 19         54 my $offs = 0;
1002 19         68 my $packet_id = _decode_int_16($packet, \$offs);
1003              
1004             # 3.9.2.1.1 Property Length (variable length int)
1005 19         91 my $prop_len = _decode_var_int($packet, \$offs);
1006 19         64 my $prop_end = $offs + $prop_len;
1007 19         45 my %prop;
1008              
1009 19         95 while ($offs < $prop_end) {
1010              
1011 0         0 my $prop_id = _decode_byte($packet, \$offs);
1012              
1013 0 0       0 if ($prop_id == MQTT_REASON_STRING) {
    0          
1014             # 3.9.2.1.2 Reason String (utf8 string)
1015 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1016             }
1017             elsif ($prop_id == MQTT_USER_PROPERTY) {
1018             # 3.9.2.1.3 User Property (utf8 string pair)
1019 0         0 my $key = _decode_utf8_str($packet, \$offs);
1020 0         0 my $val = _decode_utf8_str($packet, \$offs);
1021 0         0 $prop{$key} = $val;
1022             }
1023             else {
1024             # Protocol error
1025 0         0 $self->_fatal("Received SUBACK with unexpected property $prop_id");
1026             }
1027             }
1028              
1029             # 3.9.3 Payload
1030 19         127 my @reason_codes = unpack("C*", substr($$packet, $offs));
1031              
1032 19         80 my $packet_cb = delete $self->{packet_cb}->{$packet_id};
1033 19 50       63 $self->_fatal("Received unexpected SUBACK") unless $packet_cb;
1034              
1035 19         5996 my $topics = $packet_cb->{topics};
1036 19         40 my $suback_cb = $packet_cb->{suback_cb};
1037 19         44 my $subscr_id = $packet_cb->{subscr_id};
1038              
1039 19         37 my $success = 1;
1040 19         31 my @properties;
1041              
1042 19         118 foreach my $code (@reason_codes) {
1043              
1044 19         59 my $topic = shift @$topics;
1045 19         85 my $reason = $Subscribe_reason_code{$code};
1046 19         34 my $granted_qos;
1047              
1048 19 50       71 if ($code <= 2) {
1049             # Success
1050 19         58 $granted_qos = $code;
1051 19         91 $self->{subscriptions}->{$topic} = $subscr_id;
1052             }
1053             else {
1054             # Failure
1055 0         0 $success = 0;
1056 0         0 $granted_qos = undef;
1057 0 0       0 unless ($suback_cb) {
1058 0         0 $self->_fatal("Subscription to topic '$topic' failed: $reason");
1059             }
1060             }
1061              
1062 19 50       59 $DEBUG && warn "Subscribed to: $topic\n";
1063              
1064 19         166 push @properties, {
1065             topic => $topic,
1066             reason_code => $code,
1067             granted_qos => $granted_qos,
1068             reason => $reason,
1069             %prop
1070             };
1071             }
1072              
1073 19 100       133 $suback_cb->($success, @properties) if $suback_cb;
1074             }
1075              
1076              
1077             sub unsubscribe {
1078 0     0 1 0 my ($self, %args) = @_;
1079              
1080 0         0 my $topic = delete $args{'topic'};
1081 0         0 my $topics = delete $args{'topics'};
1082 0         0 my $unsuback_cb = delete $args{'on_unsuback'};
1083              
1084 0 0       0 $topics = [] unless defined $topics;
1085 0 0       0 push (@$topics, $topic) if defined $topic;
1086              
1087 0 0       0 croak "Unsubscription topics were not specified" unless @$topics;
1088 0 0       0 croak "on_unsuback callback is required" unless $unsuback_cb;
1089              
1090 0         0 foreach my $topic (@$topics) {
1091 0 0       0 croak "Undefined unsubscription topic" unless defined $topic;
1092 0 0       0 croak "Empty unsubscription topic" unless length $topic;
1093             }
1094              
1095 0         0 my $packet_id = $self->{packet_seq}++;
1096 0 0       0 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
1097              
1098             # Set callback for UNSUBACK
1099 0         0 $self->{packet_cb}->{$packet_id} = {
1100             topics => [ @$topics ], # copy
1101             unsuback_cb => $unsuback_cb,
1102             };
1103              
1104             # 3.10.2.1.2 User Property (utf8 string pair)
1105 0         0 my $raw_prop = '';
1106 0         0 foreach my $key (keys %args) {
1107 0         0 my $val = $args{$key};
1108 0 0       0 next unless defined $val;
1109 0         0 utf8::encode( $key );
1110 0         0 utf8::encode( $val );
1111 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1112             }
1113              
1114 0         0 my $raw_mqtt = pack("n", $packet_id) . # 3.10.2 Packet identifier
1115             _encode_var_int(length $raw_prop) . # 3.10.2.1 Property Length
1116             $raw_prop; # 3.10.2.1 Properties
1117              
1118             # 3.10.3 Payload
1119 0         0 foreach my $topic (@$topics) {
1120 0         0 utf8::encode($topic);
1121 0         0 $raw_mqtt .= pack("n/a*", $topic);
1122             }
1123              
1124             $self->{handle}->push_write(
1125 0         0 pack("C", MQTT_UNSUBSCRIBE << 4 | 0x02) . # 3.10.1 Packet type
1126             _encode_var_int(length $raw_mqtt) . # 3.10.1 Packet length
1127             $raw_mqtt
1128             );
1129              
1130 0         0 1;
1131             }
1132              
1133             sub _receive_unsuback {
1134 0     0   0 my ($self, $packet) = @_;
1135 0         0 weaken($self);
1136              
1137             # 3.11.2 Packet id (short int)
1138 0         0 my $offs = 0;
1139 0         0 my $packet_id = _decode_int_16($packet, \$offs);
1140              
1141             # 3.11.2.1.1 Property Length (variable length int)
1142 0         0 my $prop_len = _decode_var_int($packet, \$offs);
1143 0         0 my $prop_end = $offs + $prop_len;
1144 0         0 my %prop;
1145              
1146 0         0 while ($offs < $prop_end) {
1147              
1148 0         0 my $prop_id = _decode_byte($packet, \$offs);
1149              
1150 0 0       0 if ($prop_id == MQTT_REASON_STRING) {
    0          
1151             # 3.11.2.1.2 Reason String (utf8 string)
1152 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1153             }
1154             elsif ($prop_id == MQTT_USER_PROPERTY) {
1155             # 3.11.2.1.3 User Property (utf8 string pair)
1156 0         0 my $key = _decode_utf8_str($packet, \$offs);
1157 0         0 my $val = _decode_utf8_str($packet, \$offs);
1158 0         0 $prop{$key} = $val;
1159             }
1160             else {
1161             # Protocol error
1162 0         0 $self->_fatal("Received UNSUBACK with unexpected property $prop_id");
1163             }
1164             }
1165              
1166             # 3.11.3 Payload
1167 0         0 my @reason_codes = unpack("C*", substr($$packet, $offs));
1168              
1169 0         0 my $packet_cb = delete $self->{packet_cb}->{$packet_id};
1170 0 0       0 $self->_fatal("Received unexpected UNSUBACK") unless $packet_cb;
1171              
1172 0         0 my $topics = $packet_cb->{topics};
1173 0         0 my $unsuback_cb = $packet_cb->{unsuback_cb};
1174              
1175 0         0 my $success = 1;
1176 0         0 my @properties;
1177              
1178 0         0 foreach my $code (@reason_codes) {
1179              
1180 0         0 my $topic = shift @$topics;
1181 0         0 my $reason = $Reason_code{$code};
1182              
1183 0 0       0 if ($code == 0) {
1184             # Success
1185 0         0 my $subs = $self->{subscriptions};
1186 0         0 my $subscr_id = delete $subs->{$topic};
1187 0 0       0 if ($subscr_id) {
1188             # Free on_publish callback if not used by another subscription
1189 0         0 my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
  0         0  
1190 0 0       0 unless (@still_used) {
1191             # But not right now, as broker may send some messages *after* unsubscription
1192             $self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
1193 0     0   0 delete $self->{_timers}->{"unsub-$subscr_id"};
1194 0         0 delete $self->{subscr_cb}->{$subscr_id};
1195 0         0 });
1196             }
1197             }
1198             }
1199             else {
1200             # Failure
1201 0         0 $success = 0;
1202 0 0       0 unless ($unsuback_cb) {
1203 0         0 $self->_fatal("Unsubscription to topic '$topic' failed: $reason");
1204             }
1205             }
1206              
1207 0         0 push @properties, {
1208             topic => $topic,
1209             reason_code => $code,
1210             reason => $reason,
1211             %prop
1212             };
1213             }
1214              
1215 0 0       0 $unsuback_cb->($success, @properties) if $unsuback_cb;
1216             }
1217              
1218             our $AE_WAITING;
1219              
1220             sub publish {
1221 90     90 1 28009875 my ($self, %args) = @_;
1222              
1223 90         394 my $topic = delete $args{'topic'};
1224 90         257 my $payload = delete $args{'payload'};
1225 90         268 my $qos = delete $args{'qos'};
1226 90         215 my $dup = delete $args{'duplicate'};
1227 90         215 my $retain = delete $args{'retain'};
1228 90         4961 my $on_puback = delete $args{'on_puback'};
1229 90         205 my $buffer_id = delete $args{'buffer_id'};
1230              
1231 90 50       339 croak "Message topic was not specified" unless defined $topic;
1232              
1233 90 50       317 $DEBUG && warn "Sent message to: $topic\n";
1234              
1235 90 50       279 $payload = '' unless defined $payload;
1236 90 100       460 my $payload_ref = (ref $payload eq 'SCALAR') ? $payload : \$payload;
1237              
1238             # 3.3.2.3.4 Topic Alias
1239 90         189 my $topic_alias;
1240 90 50       318 if ($self->{use_alias}) {
1241 0         0 $topic_alias = $self->{client_alias}->{$topic};
1242 0 0       0 if ($topic_alias) {
    0          
1243             # Send topic alias only
1244 0         0 $topic = '';
1245             }
1246             elsif ($self->{server_prop}->{'topic_alias_maximum'}) {
1247             #TODO: Honor maximum
1248 0         0 $topic_alias = $self->{alias_seq}++;
1249 0         0 $self->{client_alias}->{$topic} = $topic_alias;
1250             }
1251             }
1252              
1253             # 3.3.1.2 QoS level
1254 90         198 my $flags = 0;
1255 90 100       344 $flags |= $qos << 1 if $qos;
1256 90 50       253 $flags |= 0x04 if $dup;
1257 90 50       216 $flags |= 0x01 if $retain;
1258              
1259 90         183 my $packet_id;
1260 90 100       252 if ($qos) {
1261 70         183 $packet_id = $self->{packet_seq}++;
1262 70 50       313 $self->{packet_seq} = 1 if $packet_id == 0xFFFF;
1263             }
1264              
1265 90         267 my $raw_prop = '';
1266              
1267 90 100       449 if (utf8::is_utf8( $$payload_ref )) {
1268             # 3.3.2.3.2 Payload Format Indicator (byte)
1269 1         5 $raw_prop .= pack("C C", MQTT_PAYLOAD_FORMAT_INDICATOR, 0x01);
1270 1         8 utf8::encode( $$payload_ref );
1271             }
1272              
1273 90 50       286 if (exists $args{'message_expiry_interval'}) {
1274             # 3.3.2.3.3 Message Expiry Interval (long int)
1275 0         0 $raw_prop .= pack("C N", MQTT_MESSAGE_EXPIRY_INTERVAL, delete $args{'message_expiry_interval'});
1276             }
1277              
1278 90 50       263 if ($topic_alias) {
1279             # 3.3.2.3.4 Topic Alias (short int)
1280 0         0 $raw_prop .= pack("C n", MQTT_TOPIC_ALIAS, $topic_alias);
1281             }
1282              
1283 90 100       301 if (exists $args{'response_topic'}) {
1284             # 3.3.2.3.5 Response Topic (utf8 string)
1285 68         404 utf8::encode( $args{'response_topic'} );
1286 68         626 $raw_prop .= pack("C n/a*", MQTT_RESPONSE_TOPIC, delete $args{'response_topic'});
1287             }
1288              
1289 90 50       333 if (exists $args{'correlation_data'}) {
1290             # 3.3.2.3.6 Correlation Data (binary data)
1291 0         0 $raw_prop .= pack("C n/a*", MQTT_CORRELATION_DATA, delete $args{'correlation_data'});
1292             }
1293              
1294             # if (exists $args{'subscription_identifier'}) {
1295             # # 3.3.2.3.8 Subscription Identifier (variable int)
1296             # my $id = delete $args{'subscription_identifier'};
1297             # $raw_prop .= pack("C", MQTT_SUBSCRIPTION_IDENTIFIER) . _encode_var_int($id);
1298             # }
1299              
1300 90 50       292 if (exists $args{'content_type'}) {
1301             # 3.3.2.3.9 Content Type (utf8 string)
1302 0         0 utf8::encode( $args{'content_type'} );
1303 0         0 $raw_prop .= pack("C n/a*", MQTT_CONTENT_TYPE, delete $args{'content_type'});
1304             }
1305              
1306 90         510 foreach my $key (keys %args) {
1307             # 3.3.2.3.7 User Property (utf8 string pair)
1308 20         75 my $val = $args{$key};
1309 20 50       82 next unless defined $val;
1310 20         93 utf8::encode( $key );
1311 20         69 utf8::encode( $val );
1312 20         170 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1313             }
1314              
1315             # 3.3.2.1 Topic name (utf8 string)
1316 90         338 utf8::encode( $topic );
1317 90         537 my $raw_mqtt = pack("n/a*", $topic);
1318              
1319             # 3.3.2.2 Packet identifier (short int)
1320 90 100       362 $raw_mqtt .= pack("n", $packet_id) if $packet_id;
1321              
1322             # 3.3.2.3 Properties
1323 90         535 $raw_mqtt .= _encode_var_int(length $raw_prop);
1324 90         287 $raw_mqtt .= $raw_prop;
1325              
1326             # 3.3.3 Payload
1327 90         14476 $raw_mqtt .= $$payload_ref;
1328              
1329 90         414 $raw_mqtt = pack("C", MQTT_PUBLISH << 4 | $flags) . # 3.3.1 Packet type
1330             _encode_var_int(length $raw_mqtt) . # 3.3.1 Packet length
1331             $raw_mqtt;
1332              
1333 90 50 66     721 if ($qos && $on_puback) {
1334             # Set PUBACK callback
1335 0         0 $self->{packet_cb}->{$packet_id} = $on_puback;
1336             }
1337              
1338 90 50       263 if ($buffer_id) {
1339             # Do not send right now, wait until flush_buffer
1340 0   0     0 my $buffer = $self->{buffers}->{$buffer_id} ||= {};
1341 0         0 $buffer->{raw_mqtt} .= $raw_mqtt;
1342 0 0       0 $buffer->{packets}->{$packet_id} = 1 if $packet_id;
1343 0         0 return 1;
1344             }
1345              
1346 90         916 $self->{handle}->push_write( $raw_mqtt );
1347              
1348 90 100 66     56813 if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {
1349             # push_write could not send all data to the handle because the kernel
1350             # write buffer is full. The size of kernel write bufer (which can be
1351             # queried with 'sysctl net.ipv4.tcp_wmem') is choosed by the kernel
1352             # based on available memory, and is 4MB in known production servers.
1353             # This will happen after sending more that 4MB of data very quickly.
1354             # As client may be syncronous, wait until entire message is sent.
1355              
1356             # Make AnyEvent allow one level of recursive condvar blocking
1357 1 50       10 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
1358 1         4 local $AE_WAITING = 1;
1359 1         4 local $AnyEvent::CondVar::Base::WAITING = 0;
1360              
1361 1         72 my $flushed = AnyEvent->condvar;
1362 1         15 $self->{handle}->on_drain( $flushed );
1363 1         21 $flushed->recv;
1364 1         4189 $self->{handle}->on_drain(); # clear
1365             }
1366             }
1367              
1368             sub _receive_publish {
1369 84     84   372 my ($self, $packet, $flags) = @_;
1370              
1371             # 3.3.2.1 Topic Name (utf8 str)
1372 84         581 my $topic = unpack("n/a", $$packet);
1373 84         407 my $offs = 2 + length $topic;
1374 84         397 utf8::decode($topic);
1375              
1376 84 50       294 $DEBUG && warn "Got message from: $topic\n";
1377              
1378 84         855 my %prop = (
1379             'topic' => $topic,
1380             'qos' => ($flags & 0x6) >> 1,
1381             'dup' => ($flags & 0x8) >> 3,
1382             );
1383              
1384             # 3.3.2.2 Packet Identifier (short int)
1385 84 100       386 if ($prop{'qos'} > 0) {
1386 3         32 $prop{'packet_id'} = unpack("n", substr($$packet, $offs, 2));
1387 3         14 $offs += 2;
1388             }
1389              
1390             # 3.3.2.3.1 Properties Length (variable length int)
1391 84         346 my $prop_len = _decode_var_int($packet, \$offs);
1392 84         334 my $prop_end = $offs + $prop_len;
1393              
1394 84         210 my @subscr_ids;
1395             my $prop_id;
1396              
1397 84         308 while ($offs < $prop_end) {
1398              
1399 173         697 $prop_id = unpack("C", substr($$packet, $offs, 1));
1400 173         437 $offs += 1;
1401              
1402 173 100       1343 if ($prop_id == MQTT_PAYLOAD_FORMAT_INDICATOR) {
    50          
    50          
    100          
    50          
    100          
    50          
    0          
1403             # 3.3.2.3.2 Payload Format Indicator (byte)
1404 1         12 $prop{'payload_format'} = unpack("C", substr($$packet, $offs, 1));
1405 1         9 $offs += 1;
1406             }
1407             elsif ($prop_id == MQTT_MESSAGE_EXPIRY_INTERVAL) {
1408             # 3.3.2.3.3 Message Expiry Interval (long int)
1409 0         0 $prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4));
1410 0         0 $offs += 4;
1411             }
1412             elsif ($prop_id == MQTT_TOPIC_ALIAS) {
1413             # 3.3.2.3.4 Topic Alias (short int)
1414 0         0 my $alias = unpack("n", substr($$packet, $offs, 2));
1415 0         0 $offs += 2;
1416 0 0       0 if (length $topic) {
1417 0         0 $self->{server_alias}->{$alias} = $topic;
1418             }
1419             else {
1420 0         0 $prop{'topic'} = $self->{server_alias}->{$alias};
1421             }
1422             }
1423             elsif ($prop_id == MQTT_RESPONSE_TOPIC) {
1424             # 3.3.2.3.5 Response Topic (utf8 string)
1425 1         7 my $resp_topic = unpack("n/a", substr($$packet, $offs));
1426 1         5 $offs += 2 + length $resp_topic;
1427 1         4 utf8::decode( $resp_topic );
1428 1         5 $prop{'response_topic'} = $resp_topic;
1429             }
1430             elsif ($prop_id == MQTT_CORRELATION_DATA) {
1431             # 3.3.2.3.6 Correlation Data (binary data)
1432 0         0 $prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs));
1433 0         0 $offs += 2 + length $prop{'correlation_data'};
1434             }
1435             elsif ($prop_id == MQTT_USER_PROPERTY) {
1436             # 3.3.2.3.7 User Property (utf8 string pair)
1437 87         16912 my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs));
1438 87         410 $offs += 4 + length($key) + length($val);
1439 87         344 utf8::decode( $key );
1440 87         271 utf8::decode( $val );
1441 87         565 $prop{$key} = $val;
1442             }
1443             elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
1444             # 3.3.2.3.8 Subscription Identifier (variable int)
1445 84         251 push @subscr_ids, _decode_var_int($packet, \$offs);
1446             }
1447             elsif ($prop_id == MQTT_CONTENT_TYPE) {
1448             # 3.3.2.3.9 Content Type (utf8 string)
1449 0         0 my $content_type = unpack("n/a", substr($$packet, $offs));
1450 0         0 $offs += 2 + length $content_type;
1451 0         0 utf8::decode( $content_type );
1452 0         0 $prop{'content_type'} = $content_type;
1453             }
1454             else {
1455             # Protocol error
1456 0         0 $self->_fatal("Received PUBLISH with unknown property $prop_id");
1457             }
1458             }
1459              
1460             # Trim variable header from packet, the remaining is the payload
1461 84         290 substr($$packet, 0, $prop_end, '');
1462              
1463 84 100       299 if ($prop{'payload_format'}) {
1464             # Payload is UTF-8 Encoded Character Data
1465 1         7 utf8::decode( $$packet );
1466             }
1467              
1468 84         243 foreach (@subscr_ids) {
1469             # Execute subscriptions callbacks
1470              
1471 84         604 $self->{subscr_cb}->{$_}->($packet, \%prop);
1472             }
1473             }
1474              
1475              
1476             sub puback {
1477 3     3 1 4002920 my ($self, %args) = @_;
1478              
1479 3 50       19 croak "Missing packet_id" unless $args{'packet_id'};
1480              
1481             my $raw_mqtt = pack(
1482             "C C n C",
1483             MQTT_PUBACK << 4, # 3.4.1 Packet type
1484             3, # 3.4.1 Remaining length
1485             $args{'packet_id'}, # 3.4.2 Packet identifier
1486 3   50     75 $args{'reason_code'} || 0, # 3.4.2.1 Reason code
1487             );
1488              
1489 3 50       13 if ($args{'buffer_id'}) {
1490             # Do not send right now, wait until flush_buffer
1491 0         0 $self->{buffers}->{$args{'buffer_id'}}->{raw_mqtt} .= $raw_mqtt;
1492 0         0 return 1;
1493             }
1494              
1495 3         25 $self->{handle}->push_write( $raw_mqtt );
1496              
1497 3         1805 1;
1498             }
1499              
1500             sub _receive_puback {
1501 70     70   258 my ($self, $packet) = @_;
1502              
1503 70         449 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1504 70 50       364 $reason_code = 0 unless defined $reason_code;
1505              
1506             #TODO: 3.5.2.2 Properties
1507              
1508 70         302 my $puback_cb = delete $self->{packet_cb}->{$packet_id};
1509 70 50       322 return unless defined $puback_cb;
1510              
1511 0         0 $puback_cb->($reason_code);
1512             }
1513              
1514             sub pubrec {
1515 0     0 0 0 my ($self, %args) = @_;
1516              
1517 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1518              
1519             my $raw_mqtt = pack(
1520             "C C n C",
1521             MQTT_PUBREC << 4, # 3.5.1 Packet type
1522             3, # 3.5.1 Remaining length
1523             $args{'packet_id'}, # 3.5.2 Packet identifier
1524 0   0     0 $args{'reason_code'} || 0, # 3.5.2.1 Reason code
1525             );
1526              
1527             #TODO: set PUBREL callback
1528              
1529 0         0 $self->{handle}->push_write( $raw_mqtt );
1530              
1531 0         0 1;
1532             }
1533              
1534             sub _receive_pubrec {
1535 0     0   0 my ($self, $packet) = @_;
1536              
1537 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1538 0 0       0 $reason_code = 0 unless defined $reason_code;
1539              
1540             #TODO: 3.5.2.2 Properties
1541              
1542 0         0 my $pubrec_cb = delete $self->{packet_cb}->{$packet_id};
1543 0 0       0 return unless defined $pubrec_cb;
1544              
1545 0         0 $pubrec_cb->($packet_id, $reason_code);
1546             }
1547              
1548             sub pubrel {
1549 0     0 0 0 my ($self, %args) = @_;
1550              
1551 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1552              
1553             my $raw_mqtt = pack(
1554             "C C n C",
1555             MQTT_PUBREL << 4, # 3.6.1 Packet type
1556             3, # 3.6.1 Remaining length
1557             $args{'packet_id'}, # 3.6.2 Packet identifier
1558 0   0     0 $args{'reason_code'} || 0, # 3.6.2.1 Reason code
1559             );
1560              
1561             #TODO: set PUBREC callback
1562              
1563 0         0 $self->{handle}->push_write( $raw_mqtt );
1564              
1565 0         0 1;
1566             }
1567              
1568             sub _receive_pubrel {
1569 0     0   0 my ($self, $packet) = @_;
1570              
1571 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1572 0 0       0 $reason_code = 0 unless defined $reason_code;
1573              
1574             #TODO: 3.6.2.2 Properties
1575              
1576 0         0 my $pubrel_cb = delete $self->{packet_cb}->{$packet_id};
1577 0 0       0 return unless defined $pubrel_cb;
1578              
1579 0         0 $pubrel_cb->($packet_id, $reason_code);
1580             }
1581              
1582             sub pubcomp {
1583 0     0 0 0 my ($self, %args) = @_;
1584              
1585 0 0       0 croak "Missing packet_id" unless $args{'packet_id'};
1586              
1587             my $raw_mqtt = pack(
1588             "C C n C",
1589             MQTT_PUBCOMP << 4, # 3.7.1 Packet type
1590             3, # 3.7.1 Remaining length
1591             $args{'packet_id'}, # 3.7.2 Packet identifier
1592 0   0     0 $args{'reason_code'} || 0, # 3.7.2.1 Reason code
1593             );
1594              
1595 0         0 $self->{handle}->push_write( $raw_mqtt );
1596              
1597 0         0 1;
1598             }
1599              
1600             sub _receive_pubcomp {
1601 0     0   0 my ($self, $packet) = @_;
1602              
1603 0         0 my ($packet_id, $reason_code) = unpack("n C", $$packet);
1604 0 0       0 $reason_code = 0 unless defined $reason_code;
1605              
1606             #TODO: 3.7.2.2 Properties
1607              
1608 0         0 my $pubcomp_cb = delete $self->{packet_cb}->{$packet_id};
1609 0 0       0 return unless defined $pubcomp_cb;
1610              
1611 0         0 $pubcomp_cb->($reason_code);
1612             }
1613              
1614              
1615             sub auth {
1616 0     0 0 0 my ($self, %args) = @_;
1617              
1618 0         0 my $reason_code = delete $args{'reason_code'};
1619 0         0 my $auth_cb = delete $args{'on_auth'};
1620              
1621             # Set callback to be executed when the server answers with an AUTH packet
1622 0 0       0 $self->{packet_cb}->{'auth'} = $auth_cb if $auth_cb;
1623              
1624 0         0 my $raw_prop = '';
1625              
1626 0 0       0 if (exists $args{'authentication_method'}) {
1627             # 3.15.2.2.2 Authentication Method (utf8 string)
1628 0         0 utf8::encode( $args{'authentication_method'} );
1629 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_METHOD, delete $args{'authentication_method'});
1630             }
1631              
1632 0 0       0 if (exists $args{'authentication_data'}) {
1633             # 3.15.2.2.3 Authentication Data (binary data)
1634 0         0 $raw_prop .= pack("C n/a*", MQTT_AUTHENTICATION_DATA, delete $args{'authentication_data'});
1635             }
1636              
1637 0 0       0 if (exists $args{'reason_string'}) {
1638             # 3.15.2.2.4 Reason String (utf8 string)
1639 0         0 utf8::encode( $args{'reason_string'} );
1640 0         0 $raw_prop .= pack("C n/a*", MQTT_REASON_STRING, delete $args{'reason_string'});
1641             }
1642              
1643 0         0 foreach my $key (keys %args) {
1644             # 3.15.2.2.5 User Property (utf8 string pair)
1645 0         0 my $val = $args{$key};
1646 0 0       0 next unless defined $val;
1647 0         0 utf8::encode( $key );
1648 0         0 utf8::encode( $val );
1649 0         0 $raw_prop .= pack("C n/a* n/a*", MQTT_USER_PROPERTY, $key, $val);
1650             }
1651              
1652             # 3.15.2.1 Authenticate Reason Code (byte)
1653 0         0 my $raw_mqtt = pack("C", $reason_code);
1654              
1655             # 3.15.2.2 Properties
1656 0         0 $raw_mqtt .= _encode_var_int(length $raw_prop);
1657 0         0 $raw_mqtt .= $raw_prop;
1658              
1659             $self->{handle}->push_write(
1660 0         0 pack("C", MQTT_AUTH << 4) . # 3.15.1 Packet type
1661             _encode_var_int(length $raw_mqtt) . # 3.15.1 Packet length
1662             $raw_mqtt
1663             );
1664              
1665 0         0 1;
1666             }
1667              
1668             sub _receive_auth {
1669 0     0   0 my ($self, $packet) = @_;
1670              
1671             # Handle abbreviated packet
1672 0 0       0 $$packet = "\x00\x00" if (length $$packet == 0);
1673              
1674             # 3.15.2.1 Authenticate Reason Code (byte)
1675 0         0 my $offs = 0;
1676 0         0 my $reason_code = _decode_byte($packet, \$offs);
1677 0         0 my $reason = $Reason_code{$reason_code};
1678              
1679             # 3.15.2.2.1 Property Length (variable length int)
1680 0         0 my $prop_len = _decode_var_int($packet, \$offs);
1681 0         0 my $prop_end = $offs + $prop_len;
1682              
1683 0         0 my %prop = (
1684             reason_code => $reason_code,
1685             reason => $reason,
1686             );
1687              
1688 0         0 while ($offs < $prop_end) {
1689              
1690 0         0 my $prop_id = _decode_byte($packet, \$offs);
1691              
1692 0 0       0 if ($prop_id == MQTT_AUTHENTICATION_METHOD) {
    0          
    0          
    0          
1693             # 3.15.2.2.2 Authentication Method (utf8 string)
1694 0         0 $prop{'authentication_method'} = _decode_utf8_str($packet, \$offs);
1695             }
1696             elsif ($prop_id == MQTT_AUTHENTICATION_DATA) {
1697             # 3.15.2.2.3 Authentication Data (binary data)
1698 0         0 $prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
1699             }
1700             elsif ($prop_id == MQTT_REASON_STRING) {
1701             # 3.15.2.2.4 Reason String (utf8 string)
1702 0         0 $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
1703             }
1704             elsif ($prop_id == MQTT_USER_PROPERTY) {
1705             # 3.15.2.2.5 User Property (utf8 string pair)
1706 0         0 my $key = _decode_utf8_str($packet, \$offs);
1707 0         0 my $val = _decode_utf8_str($packet, \$offs);
1708 0         0 $prop{$key} = $val;
1709             }
1710             else {
1711             # Protocol error
1712 0         0 $self->_fatal("Received AUTH with unexpected property $prop_id");
1713             }
1714             }
1715              
1716 0         0 my $auth_cb = delete $self->{packet_cb}->{'auth'};
1717              
1718 0 0       0 $auth_cb->(\%prop) if $auth_cb;
1719             }
1720              
1721              
1722             sub flush_buffer {
1723 0     0 1 0 my ($self, %args) = @_;
1724              
1725 0         0 my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
1726              
1727             # Nothing to do if nothing was buffered
1728 0 0       0 return unless $buffer;
1729              
1730 0         0 $self->{handle}->push_write( $buffer->{raw_mqtt} );
1731              
1732 0 0 0     0 if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {
1733              
1734             # Kernel write buffer is full, see publish() above
1735              
1736             # Make AnyEvent allow one level of recursive condvar blocking
1737 0 0       0 $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
1738 0         0 local $AE_WAITING = 1;
1739 0         0 local $AnyEvent::CondVar::Base::WAITING = 0;
1740              
1741 0         0 my $flushed = AnyEvent->condvar;
1742 0         0 $self->{handle}->on_drain( $flushed );
1743 0         0 $flushed->recv;
1744 0         0 $self->{handle}->on_drain(); # clear
1745             }
1746              
1747 0         0 1;
1748             }
1749              
1750             sub discard_buffer {
1751 0     0 1 0 my ($self, %args) = @_;
1752              
1753 0         0 my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
1754              
1755             # Nothing to do if nothing was buffered
1756 0 0       0 return unless $buffer;
1757              
1758             # Remove all pending puback callbacks, as those will never be executed
1759 0         0 foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
  0         0  
1760 0         0 delete $self->{packet_cb}->{$packet_id};
1761             }
1762              
1763 0         0 1;
1764             }
1765              
1766              
1767             sub DESTROY {
1768 13     13   38 my $self = shift;
1769             # Disconnect gracefully from server if already connected
1770 13 50       243 return unless defined $self->{handle};
1771 0           $self->disconnect;
1772             }
1773              
1774             1;
1775              
1776             __END__