File Coverage

blib/lib/Net/MQTT/Simple.pm
Criterion Covered Total %
statement 24 214 11.2
branch 6 128 4.6
condition 0 54 0.0
subroutine 6 32 18.7
pod 11 11 100.0
total 47 439 10.7


line stmt bran cond sub pod time code
1             package Net::MQTT::Simple;
2              
3 2     2   67204 use strict;
  2         6  
  2         56  
4 2     2   10 use warnings;
  2         4  
  2         50  
5              
6 2     2   1131 use IO::Socket::IP;
  2         70851  
  2         11  
7              
8             our $VERSION = '1.29';
9              
10             # Please note that these are not documented and are subject to change:
11             our $KEEPALIVE_INTERVAL = 60;
12             our $PING_TIMEOUT = 10;
13             our $RECONNECT_INTERVAL = 5;
14             our $MAX_LENGTH = 2097152; # 2 MB
15             our $READ_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL recommendation
16             our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum
17             our $PROTOCOL_LEVEL = 0x04; # 0x03 in v3.1, 0x04 in v3.1.1
18             our $PROTOCOL_NAME = "MQTT"; # MQIsdp in v3.1, MQTT in v3.1.1
19              
20             my $global;
21              
22 0     0   0 sub _default_port { 1883 }
23 0     0   0 sub _socket_class { 'IO::Socket::IP' }
24 0     0   0 sub _socket_error { "$@" }
25 0     0   0 sub _secure { 0 }
26              
27 0     0   0 sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; }
  0         0  
28              
29             # Carp might not be available either.
30             sub _croak {
31 0     0   0 die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2];
32             }
33              
34             sub filter_as_regex {
35 555     555 1 309654 my ($filter) = @_;
36              
37 555 100       1512 return "^(?!\\\$)" if $filter eq '#'; # Match everything except /^\$/
38 535 100       1035 return "^/" if $filter eq '/#'; # Parent (empty topic) is invalid
39              
40 518         1055 $filter = quotemeta $filter;
41              
42 518         2526 $filter =~ s{ \z (?
43 518         1309 $filter =~ s{ \\ \/ \\ \# } {}x;
44 518         1251 $filter =~ s{ \\ \+ } {[^/]*+}xg;
45 518         1137 $filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x; # No /^\$/ if /^\+/
46              
47 518         1411 return "^$filter";
48             }
49              
50             sub import {
51 2     2   39 my ($class, $server) = @_;
52 2 50       11 @_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__;
53              
54 2 50       56 $server or return;
55              
56 0           $global = $class->new($server);
57              
58 2     2   1846 no strict 'refs';
  2         4  
  2         5469  
59 0           *{ (caller)[0] . "::publish" } = \&publish;
  0            
60 0           *{ (caller)[0] . "::retain" } = \&retain;
  0            
61             }
62              
63             sub new {
64 0     0 1   my ($class, $server, $sockopts) = @_;
65 0 0 0       @_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new";
66              
67 0           my $port = $class->_default_port;
68              
69             # Add port for bare IPv6 address
70 0 0 0       $server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/;
71              
72             # Add port for bare IPv4 address or bracketed IPv6 address
73 0 0 0       $server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/;
74              
75             # Create a random ID for the instance of the object
76 0           my $random_id = join "", map chr 65 + int rand 26, 1 .. 10;
77            
78 0   0       return bless {
79             server => $server,
80             last_connect => 0,
81             sockopts => $sockopts // {},
82             random_id => $random_id
83             }, $class;
84             }
85              
86             sub last_will {
87 0     0 1   my ($self, $topic, $message, $retain) = @_;
88              
89 0           my %old;
90 0 0         %old = %{ $self->{will} } if $self->{will};
  0            
91              
92 0 0         _croak "Wrong number of arguments for last_will" if @_ > 4;
93              
94 0 0         if (@_ >= 2) {
95 0 0 0       if (not defined $topic and not defined $message) {
96 0           delete $self->{will};
97 0           delete $self->{encoded_will};
98              
99 0           return;
100             } else {
101             $self->{will} = {
102             topic => $topic // $old{topic} // '',
103             message => $message // $old{message} // '',
104 0   0       retain => !!$retain // $old{retain} // 0,
      0        
      0        
      0        
      0        
      0        
105             };
106 0 0         _croak("Topic is empty") if not length $self->{will}->{topic};
107              
108 0           my $e = $self->{encoded_will} = { %{ $self->{will} } };
  0            
109 0           utf8::encode($e->{topic});
110 0 0         utf8::downgrade($e->{message}, 1) or do {
111 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
112 0           warn "Wide character in $method at $file line $line.\n";
113 0           utf8::encode($e->{message});
114             };
115             }
116             }
117              
118 0           return @{ $self->{will} }{qw/topic message retain/};
  0            
119             }
120              
121             sub login {
122 0     0 1   my ($self, $username, $password) = @_;
123              
124              
125 0 0         if (@_ > 1) {
126             _croak "Password login is disabled for insecure connections"
127             if defined $password
128 0 0 0       and not $self->_secure || $ENV{MQTT_SIMPLE_ALLOW_INSECURE_LOGIN};
      0        
129              
130 0           utf8::encode($username);
131 0           $self->{username} = $username;
132 0           $self->{password} = $password;
133             }
134              
135 0           return $username;
136             }
137              
138             sub _connect {
139 0     0     my ($self) = @_;
140              
141 0 0 0       return if $self->{socket} and $self->{socket}->connected;
142              
143 0 0         if ($self->{last_connect} > time() - $RECONNECT_INTERVAL) {
144 0           select undef, undef, undef, .01;
145 0           return;
146             }
147              
148             # Reset state
149 0           $self->{last_connect} = time;
150 0           $self->{buffer} = "";
151 0           delete $self->{ping};
152              
153             # Connect
154 0           my $socket_class = $self->_socket_class;
155             my %socket_options = (
156             PeerAddr => $self->{server},
157 0           %{ $self->{sockopts} }
  0            
158             );
159 0 0         $self->{socket} = $socket_class->new( %socket_options )
160             or warn "$0: connect: " . $self->_socket_error . "\n";
161              
162             # Say hello
163 0           local $self->{skip_connect} = 1; # avoid infinite recursion :-)
164 0           $self->_send_connect;
165 0           $self->_send_subscribe;
166             }
167              
168             sub _prepend_variable_length {
169             # Copied from Net::MQTT::Constants
170 0     0     my ($data) = @_;
171 0           my $v = length $data;
172 0           my $o = "";
173 0           my $d;
174 0           do {
175 0           $d = $v % 128;
176 0           $v = int($v/128);
177 0 0         $d |= 0x80 if $v;
178 0           $o .= pack "C", $d;
179             } while $d & 0x80;
180 0           return "$o$data";
181             }
182              
183             sub _send {
184 0     0     my ($self, $data) = @_;
185              
186 0 0         $self->_connect unless exists $self->{skip_connect};
187 0           delete $self->{skip_connect};
188              
189 0 0         my $socket = $self->{socket} or return;
190              
191 0           while (my $chunk = substr $data, 0, $WRITE_BYTES, "") {
192 0 0         syswrite $socket, $chunk
193             or $self->_drop_connection; # reconnect on next message
194             }
195              
196 0           $self->{last_send} = time;
197             }
198              
199             sub _send_connect {
200 0     0     my ($self) = @_;
201              
202 0           my $will = $self->{encoded_will};
203 0           my $flags = 0x02;
204 0 0         $flags |= 0x04 if $will;
205 0 0 0       $flags |= 0x20 if $will and $will->{retain};
206              
207 0 0         $flags |= 0x80 if defined $self->{username};
208 0 0 0       $flags |= 0x40 if defined $self->{username} and defined $self->{password};
209              
210             $self->_send("\x10" . _prepend_variable_length(pack(
211             "x C/a* C C n n/a*"
212             . ($flags & 0x04 ? "n/a* n/a*" : "")
213             . ($flags & 0x80 ? "n/a*" : "")
214             . ($flags & 0x40 ? "n/a*" : ""),
215             $PROTOCOL_NAME,
216             $PROTOCOL_LEVEL,
217             $flags,
218             $KEEPALIVE_INTERVAL,
219             $self->_client_identifier,
220             ($flags & 0x04 ? ($will->{topic}, $will->{message}) : ()),
221             ($flags & 0x80 ? $self->{username} : ()),
222 0 0         ($flags & 0x40 ? $self->{password} : ()),
    0          
    0          
    0          
    0          
    0          
223             )));
224             }
225              
226             sub _send_subscribe {
227 0     0     my ($self, @topics) = @_;
228              
229 0 0         if (not @topics) {
230 0 0         @topics = keys %{ $self->{sub} } or return;
  0            
231             }
232 0 0         return if not @topics;
233              
234 0           utf8::encode($_) for @topics;
235              
236             # Hardcoded "packet identifier" \0\x01 for now (was \0\0 but the spec
237             # disallows it for subscribe packets and mosquitto started enforcing that.)
238 0           $self->_send("\x82" . _prepend_variable_length("\0\x01" .
239             pack("(n/a* x)*", @topics) # x = QoS 0
240             ));
241             }
242              
243             sub _send_unsubscribe {
244 0     0     my ($self, @topics) = @_;
245              
246 0 0         return if not @topics;
247              
248 0           utf8::encode($_) for @topics;
249              
250             # Hardcoded "packet identifier" \0\0x01 for now; see above.
251 0           $self->_send("\xa2" . _prepend_variable_length("\0\x01" .
252             pack("(n/a*)*", @topics)
253             ));
254             }
255              
256             sub _parse {
257 0     0     my ($self) = @_;
258              
259 0           my $bufref = \$self->{buffer};
260              
261 0 0         return if length $$bufref < 2;
262              
263 0           my $offset = 1;
264              
265 0           my $length = do {
266 0           my $multiplier = 1;
267 0           my $v = 0;
268 0           my $d;
269 0           do {
270 0 0         return if $offset >= length $$bufref; # not enough data yet
271 0           $d = unpack "C", substr $$bufref, $offset++, 1;
272 0           $v += ($d & 0x7f) * $multiplier;
273 0           $multiplier *= 128;
274             } while ($d & 0x80);
275 0           $v;
276             };
277              
278 0 0         if ($length > $MAX_LENGTH) {
279             # On receiving an enormous packet, just disconnect to avoid exhausting
280             # RAM on tiny systems.
281             # TODO: just slurp and drop the data
282 0           $self->_drop_connection;
283 0           return;
284             }
285              
286 0 0         return if length($$bufref) < $offset + $length; # not enough data yet
287              
288 0           my $first_byte = unpack "C", substr $$bufref, 0, 1;
289              
290 0           my $packet = {
291             type => ($first_byte & 0xF0) >> 4,
292             dup => ($first_byte & 0x08) >> 3,
293             qos => ($first_byte & 0x06) >> 1,
294             retain => ($first_byte & 0x01),
295             data => substr($$bufref, $offset, $length),
296             };
297              
298 0           substr $$bufref, 0, $offset + $length, ""; # remove the parsed bits.
299              
300 0           return $packet;
301             }
302              
303             sub _incoming_publish {
304 0     0     my ($self, $packet) = @_;
305              
306             # Because QoS is not supported, no packed ID in the data. It would
307             # have been 16 bits between $topic and $message.
308 0           my ($topic, $message) = unpack "n/a a*", $packet->{data};
309              
310 0           utf8::decode($topic);
311              
312 0           for my $cb (@{ $self->{callbacks} }) {
  0            
313 0 0         if ($topic =~ /$cb->{regex}/) {
314 0           $cb->{callback}->($topic, $message, $packet->{retain});
315 0           return;
316             }
317             }
318             }
319              
320             sub _publish {
321 0     0     my ($self, $retain, $topic, $message) = @_;
322              
323 0 0 0       $message //= "" if $retain;
324              
325 0           utf8::encode($topic);
326 0 0         utf8::downgrade($message, 1) or do {
327 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
328 0           warn "Wide character in $method at $file line $line.\n";
329 0           utf8::encode($message);
330             };
331              
332 0 0         $self->_send(
333             ($retain ? "\x31" : "\x30")
334             . _prepend_variable_length(
335             pack("n/a*", $topic) . $message
336             )
337             );
338             }
339              
340             sub publish {
341 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
342 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for publish";
    0          
343              
344 0 0         ($method ? shift : $global)->_publish(0, @_);
345             }
346              
347             sub retain {
348 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
349 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for retain";
    0          
350              
351 0 0         ($method ? shift : $global)->_publish(1, @_);
352             }
353              
354             sub run {
355 0     0 1   my ($self, @subscribe_args) = @_;
356              
357 0 0         $self->subscribe(@subscribe_args) if @subscribe_args;
358              
359 0           until ($self->{stop_loop}) {
360 0           my @timeouts;
361             push @timeouts, $KEEPALIVE_INTERVAL - (time() - $self->{last_send})
362 0 0         if exists $self->{last_send};
363             push @timeouts, $PING_TIMEOUT - (time() - $self->{ping})
364 0 0         if exists $self->{ping};
365              
366             my $timeout = @timeouts
367 0 0         ? (sort { $a <=> $b } @timeouts)[0]
  0            
368             : 1; # default to 1
369              
370 0           $self->tick($timeout);
371             }
372              
373 0           delete $self->{stop_loop};
374             }
375              
376             sub subscribe {
377 0     0 1   my ($self, @kv) = @_;
378              
379 0           while (my ($topic, $callback) = splice @kv, 0, 2) {
380 0           $self->{sub}->{ $topic } = 1;
381 0           push @{ $self->{callbacks} }, {
  0            
382             topic => $topic,
383             regex => filter_as_regex($topic),
384             callback => $callback,
385             };
386             }
387              
388 0 0         $self->_send_subscribe() if $self->{socket};
389             }
390              
391             sub unsubscribe {
392 0     0 1   my ($self, @topics) = @_;
393              
394 0           $self->_send_unsubscribe(@topics);
395              
396 0           my $cb = $self->{callbacks};
397 0           for my $topic ( @topics ) {
398 0           @$cb = grep {$_->{topic} ne $topic} @$cb;
  0            
399             }
400              
401 0           delete @{ $self->{sub} }{ @topics };
  0            
402             }
403              
404             sub tick {
405 0     0 1   my ($self, $timeout) = @_;
406              
407 0           $self->_connect;
408              
409 0 0         my $socket = $self->{socket} or return;
410 0           my $bufref = \$self->{buffer};
411              
412 0           my $r = '';
413 0           vec($r, fileno($socket), 1) = 1;
414              
415 0 0 0       if (select($r, undef, undef, $timeout // 0) > 0) {
416             sysread $socket, $$bufref, $READ_BYTES, length $$bufref
417 0 0         or delete $self->{socket};
418              
419 0           while (length $$bufref) {
420 0 0         my $packet = $self->_parse() or last;
421 0 0         $self->_incoming_publish($packet) if $packet->{type} == 3;
422 0 0         delete $self->{ping} if $packet->{type} == 13;
423             }
424             }
425              
426 0 0         if (time() >= $self->{last_send} + $KEEPALIVE_INTERVAL) {
427 0           $self->_send("\xc0\0"); # PINGREQ
428 0           $self->{ping} = time;
429             }
430 0 0 0       if ($self->{ping} and time() >= $self->{ping} + $PING_TIMEOUT) {
431 0           $self->_drop_connection;
432             }
433              
434 0           return !! $self->{socket};
435             }
436              
437             sub disconnect {
438 0     0 1   my ($self) = @_;
439              
440             $self->_send(pack "C x", 0xe0)
441 0 0 0       if $self->{socket} and $self->{socket}->connected;
442              
443 0           $self->_drop_connection;
444             }
445              
446             sub _drop_connection {
447 0     0     my ($self) = @_;
448              
449 0           delete $self->{socket};
450 0           $self->{last_connect} = 0;
451             }
452              
453             1;
454              
455             __END__