File Coverage

blib/lib/Net/MQTT/Simple.pm
Criterion Covered Total %
statement 27 256 10.5
branch 6 140 4.2
condition 0 59 0.0
subroutine 7 37 18.9
pod 13 13 100.0
total 53 505 10.5


line stmt bran cond sub pod time code
1             package Net::MQTT::Simple;
2              
3 2     2   364127 use strict;
  2         4  
  2         89  
4 2     2   21 use warnings;
  2         4  
  2         140  
5              
6 2     2   1520 use IO::Socket::IP;
  2         103073  
  2         12  
7 2     2   1383 use Socket ();
  2         7  
  2         1325  
8              
9             our $VERSION = '1.33';
10              
11             # Please note that these are not documented and are subject to change:
12             our $KEEPALIVE_INTERVAL = 60;
13             our $PING_TIMEOUT = 10;
14             our $RECONNECT_INTERVAL = 5;
15             our $MAX_LENGTH = 2097152; # 2 MB
16             our $READ_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL recommendation
17             our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum
18             our $PROTOCOL_LEVEL = 0x04; # 0x03 in v3.1, 0x04 in v3.1.1
19             our $PROTOCOL_NAME = "MQTT"; # MQIsdp in v3.1, MQTT in v3.1.1
20              
21             my $global;
22              
23 0     0   0 sub _default_port { 1883 }
24 0     0   0 sub _socket_class { 'IO::Socket::IP' }
25 0     0   0 sub _socket_error { "$@" }
26 0     0   0 sub _secure { 0 }
27              
28 0     0   0 sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; }
  0         0  
29              
30             # Carp might not be available either.
31             sub _croak {
32 0     0   0 die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2];
33             }
34              
35             sub filter_as_regex {
36 555     555 1 595285 my ($filter) = @_;
37              
38 555 100       1775 return "^(?!\\\$)" if $filter eq '#'; # Match everything except /^\$/
39 535 100       1092 return "^/" if $filter eq '/#'; # Parent (empty topic) is invalid
40              
41 518         987 $filter = quotemeta $filter;
42              
43 518         2636 $filter =~ s{ \z (?<! \\ \/ \\ \# ) } {\\z}x; # Anchor unless /#$/
44 518         1261 $filter =~ s{ \\ \/ \\ \# } {}x;
45 518         1213 $filter =~ s{ \\ \+ } {[^/]*+}xg;
46 518         1246 $filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x; # No /^\$/ if /^\+/
47              
48 518         1420 return "^$filter";
49             }
50              
51             sub import {
52 2     2   31 my ($class, $server) = @_;
53 2 50       11 @_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__;
54              
55 2 50       80 $server or return;
56              
57 0           $global = $class->new($server);
58              
59 2     2   21 no strict 'refs';
  2         4  
  2         9014  
60 0           *{ (caller)[0] . "::publish" } = \&publish;
  0            
61 0           *{ (caller)[0] . "::retain" } = \&retain;
  0            
62 0           *{ (caller)[0] . "::mqtt_get" } = \&get;
  0            
63             }
64              
65             sub new {
66 0     0 1   my ($class, $server, $sockopts) = @_;
67 0 0 0       @_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new";
68              
69 0           my $port = $class->_default_port;
70              
71             # Add port for bare IPv6 address
72 0 0 0       $server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/;
73              
74             # Add port for bare IPv4 address or bracketed IPv6 address
75 0 0 0       $server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/;
76              
77             # Create a random ID for the instance of the object
78 0           my $random_id = join "", map chr 65 + int rand 26, 1 .. 10;
79              
80 0   0       return bless {
81             server => $server,
82             last_connect => 0,
83             sockopts => $sockopts // {},
84             random_id => $random_id
85             }, $class;
86             }
87              
88             sub last_will {
89 0     0 1   my ($self, $topic, $message, $retain) = @_;
90              
91 0           my %old;
92 0 0         %old = %{ $self->{will} } if $self->{will};
  0            
93              
94 0 0         _croak "Wrong number of arguments for last_will" if @_ > 4;
95              
96 0 0         if (@_ >= 2) {
97 0 0 0       if (not defined $topic and not defined $message) {
98 0           delete $self->{will};
99 0           delete $self->{encoded_will};
100              
101 0           return;
102             } else {
103             $self->{will} = {
104             topic => $topic // $old{topic} // '',
105             message => $message // $old{message} // '',
106 0   0       retain => !!$retain // $old{retain} // 0,
      0        
      0        
      0        
      0        
      0        
107             };
108 0 0         _croak("Topic is empty") if not length $self->{will}->{topic};
109              
110 0           my $e = $self->{encoded_will} = { %{ $self->{will} } };
  0            
111 0           utf8::encode($e->{topic});
112 0 0         utf8::downgrade($e->{message}, 1) or do {
113 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
114 0           warn "Wide character in $method at $file line $line.\n";
115 0           utf8::encode($e->{message});
116             };
117             }
118             }
119              
120 0           return @{ $self->{will} }{qw/topic message retain/};
  0            
121             }
122              
123             sub login {
124 0     0 1   my ($self, $username, $password) = @_;
125              
126              
127 0 0         if (@_ > 1) {
128             _croak "Password login is disabled for insecure connections"
129             if defined $password
130 0 0 0       and not $self->_secure || $ENV{MQTT_SIMPLE_ALLOW_INSECURE_LOGIN};
      0        
131              
132 0           utf8::encode($username);
133 0           $self->{username} = $username;
134 0           $self->{password} = $password;
135             }
136              
137 0           return $username;
138             }
139              
140             sub _connect {
141 0     0     my ($self) = @_;
142              
143 0 0 0       return if $self->{socket} and $self->{socket}->connected;
144              
145 0 0         if ($self->{last_connect} > time() - $RECONNECT_INTERVAL) {
146 0           select undef, undef, undef, .01;
147 0           return;
148             }
149              
150             # Reset state
151 0           $self->{last_connect} = time;
152 0           $self->{buffer} = "";
153 0           $self->{actually_subscribed} = {};
154 0           delete $self->{ping};
155              
156             # Connect
157 0           my $socket_class = $self->_socket_class;
158             my %socket_options = (
159             PeerAddr => $self->{server},
160 0           %{ $self->{sockopts} }
  0            
161             );
162 0           $self->{socket} = $socket_class->new( %socket_options );
163              
164 0 0         if (not $self->{socket}) {
165 0           warn "$0: connect: " . $self->_socket_error . "\n";
166 0           return;
167             }
168              
169             # Say hello
170 0           local $self->{skip_connect} = 1; # avoid infinite recursion :-)
171 0           $self->_send_connect;
172 0           $self->_send_subscribe;
173             }
174              
175             sub _prepend_variable_length {
176             # Copied from Net::MQTT::Constants
177 0     0     my ($data) = @_;
178 0           my $v = length $data;
179 0           my $o = "";
180 0           my $d;
181 0           do {
182 0           $d = $v % 128;
183 0           $v = int($v/128);
184 0 0         $d |= 0x80 if $v;
185 0           $o .= pack "C", $d;
186             } while $d & 0x80;
187 0           return "$o$data";
188             }
189              
190             sub _send {
191 0     0     my ($self, $data) = @_;
192              
193 0 0         $self->_connect unless exists $self->{skip_connect};
194              
195 0 0         my $socket = $self->{socket} or return;
196              
197 0           while (my $chunk = substr $data, 0, $WRITE_BYTES, "") {
198 0 0         syswrite $socket, $chunk
199             or $self->_drop_connection; # reconnect on next message
200             }
201              
202 0           $self->{last_send} = time;
203             }
204              
205             sub _send_connect {
206 0     0     my ($self) = @_;
207              
208 0           my $will = $self->{encoded_will};
209 0           my $flags = 0x02;
210 0 0         $flags |= 0x04 if $will;
211 0 0 0       $flags |= 0x20 if $will and $will->{retain};
212              
213 0 0         $flags |= 0x80 if defined $self->{username};
214 0 0 0       $flags |= 0x40 if defined $self->{username} and defined $self->{password};
215              
216             $self->_send("\x10" . _prepend_variable_length(pack(
217             "x C/a* C C n n/a*"
218             . ($flags & 0x04 ? "n/a* n/a*" : "")
219             . ($flags & 0x80 ? "n/a*" : "")
220             . ($flags & 0x40 ? "n/a*" : ""),
221             $PROTOCOL_NAME,
222             $PROTOCOL_LEVEL,
223             $flags,
224             $KEEPALIVE_INTERVAL,
225             $self->_client_identifier,
226             ($flags & 0x04 ? ($will->{topic}, $will->{message}) : ()),
227             ($flags & 0x80 ? $self->{username} : ()),
228 0 0         ($flags & 0x40 ? $self->{password} : ()),
    0          
    0          
    0          
    0          
    0          
229             )));
230             }
231              
232             sub _send_subscribe {
233 0     0     my ($self) = @_;
234              
235             my @topics = grep {
236 0           not exists $self->{actually_subscribed}->{$_}
237 0           } keys %{ $self->{subscriptions} };
  0            
238              
239 0 0         @topics or return;
240              
241 0           utf8::encode($_) for @topics;
242              
243             # Hardcoded "packet identifier" \0\x01 for now (was \0\0 but the spec
244             # disallows it for subscribe packets and mosquitto started enforcing that.)
245 0           $self->_send("\x82" . _prepend_variable_length("\0\x01" .
246             pack("(n/a* x)*", @topics) # x = QoS 0
247             ));
248              
249 0           @{ $self->{actually_subscribed} }{ @topics } = ();
  0            
250             }
251              
252             sub _send_unsubscribe {
253 0     0     my ($self, @topics) = @_;
254              
255 0 0         return if not @topics;
256              
257 0           utf8::encode($_) for @topics;
258              
259             # Hardcoded "packet identifier" \0\0x01 for now; see above.
260 0           $self->_send("\xa2" . _prepend_variable_length("\0\x01" .
261             pack("(n/a*)*", @topics)
262             ));
263              
264 0           delete @{ $self->{actually_subscribed} }{ @topics };
  0            
265             }
266              
267             sub _parse {
268 0     0     my ($self) = @_;
269              
270 0           my $bufref = \$self->{buffer};
271              
272 0 0         return if length $$bufref < 2;
273              
274 0           my $offset = 1;
275              
276 0           my $length = do {
277 0           my $multiplier = 1;
278 0           my $v = 0;
279 0           my $d;
280 0           do {
281 0 0         return if $offset >= length $$bufref; # not enough data yet
282 0           $d = unpack "C", substr $$bufref, $offset++, 1;
283 0           $v += ($d & 0x7f) * $multiplier;
284 0           $multiplier *= 128;
285             } while ($d & 0x80);
286 0           $v;
287             };
288              
289 0 0         if ($length > $MAX_LENGTH) {
290             # On receiving an enormous packet, just disconnect to avoid exhausting
291             # RAM on tiny systems.
292             # TODO: just slurp and drop the data
293 0           $self->_drop_connection;
294 0           return;
295             }
296              
297 0 0         return if length($$bufref) < $offset + $length; # not enough data yet
298              
299 0           my $first_byte = unpack "C", substr $$bufref, 0, 1;
300              
301 0           my $packet = {
302             type => ($first_byte & 0xF0) >> 4,
303             dup => ($first_byte & 0x08) >> 3,
304             qos => ($first_byte & 0x06) >> 1,
305             retain => ($first_byte & 0x01),
306             data => substr($$bufref, $offset, $length),
307             };
308              
309 0           substr $$bufref, 0, $offset + $length, ""; # remove the parsed bits.
310              
311 0           return $packet;
312             }
313              
314             sub _incoming_publish {
315 0     0     my ($self, $packet) = @_;
316              
317             # Because QoS is not supported, no packed ID in the data. It would
318             # have been 16 bits between $topic and $message.
319 0           my ($topic, $message) = unpack "n/a a*", $packet->{data};
320              
321 0           utf8::decode($topic);
322              
323 0           for my $cb (@{ $self->{callbacks} }) {
  0            
324 0 0         if ($topic =~ /$cb->{regex}/) {
325 0           $cb->{callback}->($topic, $message, $packet->{retain});
326 0           return;
327             }
328             }
329             }
330              
331             sub _publish {
332 0     0     my ($self, $retain, $topic, $message) = @_;
333              
334 0 0 0       $message //= "" if $retain;
335              
336 0           utf8::encode($topic);
337 0 0         utf8::downgrade($message, 1) or do {
338 0           my ($file, $line, $method) = (caller 1)[1, 2, 3];
339 0           warn "Wide character in $method at $file line $line.\n";
340 0           utf8::encode($message);
341             };
342              
343 0 0         $self->_send(
344             ($retain ? "\x31" : "\x30")
345             . _prepend_variable_length(
346             pack("n/a*", $topic) . $message
347             )
348             );
349             }
350              
351             sub publish {
352 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
353 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for publish";
    0          
354              
355 0 0         ($method ? shift : $global)->_publish(0, @_);
356             }
357              
358             sub retain {
359 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
360 0 0         @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for retain";
    0          
361              
362 0 0         ($method ? shift : $global)->_publish(1, @_);
363             }
364              
365             sub run {
366 0     0 1   my ($self, @subscribe_args) = @_;
367              
368 0 0         $self->subscribe(@subscribe_args) if @subscribe_args;
369              
370 0           until ($self->{stop_loop}) {
371 0           my @timeouts;
372             push @timeouts, $KEEPALIVE_INTERVAL - (time() - $self->{last_send})
373 0 0         if exists $self->{last_send};
374             push @timeouts, $PING_TIMEOUT - (time() - $self->{ping})
375 0 0         if exists $self->{ping};
376              
377             my $timeout = @timeouts
378 0 0         ? (sort { $a <=> $b } @timeouts)[0]
  0            
379             : 1; # default to 1
380              
381 0           $self->tick($timeout);
382             }
383              
384 0           delete $self->{stop_loop};
385             }
386              
387             sub subscribe {
388 0     0 1   my ($self, @kv) = @_;
389              
390 0           while (my ($topic, $callback) = splice @kv, 0, 2) {
391 0           $self->{subscriptions}->{ $topic } = undef;
392 0           push @{ $self->{callbacks} }, {
  0            
393             topic => $topic,
394             regex => filter_as_regex($topic),
395             callback => $callback,
396             };
397             }
398              
399 0 0         $self->_send_subscribe() if $self->{socket};
400             }
401              
402             sub unsubscribe {
403 0     0 1   my ($self, @topics) = @_;
404              
405 0           $self->_send_unsubscribe(@topics);
406              
407 0           my $cb = $self->{callbacks};
408 0           for my $topic ( @topics ) {
409 0           @$cb = grep {$_->{topic} ne $topic} @$cb;
  0            
410             }
411              
412 0           delete @{ $self->{subscriptions} }{ @topics };
  0            
413             }
414              
415             sub tick {
416 0     0 1   my ($self, $timeout) = @_;
417              
418 0           $self->_connect;
419              
420 0 0         my $socket = $self->{socket} or return;
421 0           my $bufref = \$self->{buffer};
422              
423 0           my $r = '';
424 0           vec($r, fileno($socket), 1) = 1;
425              
426 0 0 0       if (select($r, undef, undef, $timeout // 0) > 0) {
427             sysread $socket, $$bufref, $READ_BYTES, length $$bufref
428 0 0         or delete $self->{socket};
429              
430 0           while (length $$bufref) {
431 0 0         my $packet = $self->_parse() or last;
432 0 0         $self->_incoming_publish($packet) if $packet->{type} == 3;
433 0 0         delete $self->{ping} if $packet->{type} == 13;
434             }
435             }
436              
437 0 0         if (time() >= $self->{last_send} + $KEEPALIVE_INTERVAL) {
438 0           $self->_send("\xc0\0"); # PINGREQ
439 0           $self->{ping} = time;
440             }
441 0 0 0       if ($self->{ping} and time() >= $self->{ping} + $PING_TIMEOUT) {
442 0           $self->_drop_connection;
443             }
444              
445 0           return !! $self->{socket};
446             }
447              
448             sub once {
449 0     0 1   my ($self, $subscribe_topic, $callback) = @_;
450              
451             $self->subscribe($subscribe_topic, sub {
452 0     0     my ($received_topic, $message, $retain) = @_;
453              
454 0           $callback->($received_topic, $message, $retain);
455 0           $self->unsubscribe($subscribe_topic);
456 0           });
457             }
458              
459             sub get {
460 0     0 1   my $method = UNIVERSAL::isa($_[0], __PACKAGE__);
461 0 0         @_ >= ($method ? 2 : 1)
    0          
    0          
462             or _croak "Wrong number of arguments for " . ($method ? "get" : "mqtt_get");
463              
464 0 0         my $self = ($method ? shift : $global);
465 0           my ($topics, $timeout) = @_;
466              
467 0   0       $timeout ||= 1;
468              
469 0           my $got_ref = ref $topics;
470 0 0         $topics = [ $topics ] if not $got_ref;
471              
472 0           my @messages;
473             my %waiting;
474 0           @waiting{ @$topics } = ();
475              
476 0           for my $i (0 .. $#$topics) {
477             # Close over $topic in case the callback gets a different one
478             # (e.g. due to wildcards)
479 0           my $topic = $topics->[$i];
480              
481             $self->once($topic, sub {
482 0     0     my (undef, $message, undef) = @_;
483              
484 0           $messages[$i] = $message;
485 0           delete $waiting{$topic};
486 0           });
487             }
488              
489 0           my $stop = time() + $timeout;
490              
491 0           while (time() <= $stop) {
492 0           $self->tick(.05);
493 0 0         last if not %waiting;
494             }
495              
496 0 0 0       return @messages if $got_ref and wantarray;
497 0 0         return \@messages if $got_ref;
498 0           return $messages[0];
499             }
500              
501             sub disconnect {
502 0     0 1   my ($self) = @_;
503              
504 0 0 0       if ($self->{socket} and $self->{socket}->connected) {
505 0           $self->_send(pack "C x", 0xe0);
506 0           $self->{socket}->shutdown(Socket::SHUT_WR);
507 0           $self->{socket}->close;
508             }
509              
510 0           $self->_drop_connection;
511             }
512              
513             sub _drop_connection {
514 0     0     my ($self) = @_;
515              
516 0           delete $self->{socket};
517 0           $self->{last_connect} = 0;
518             }
519              
520             1;
521              
522             __END__
523              
524             =head1 NAME
525              
526             Net::MQTT::Simple - Minimal MQTT version 3 interface
527              
528             =head1 SYNOPSIS
529              
530             # One-liner that publishes sensor values from STDIN
531              
532             perl -MNet::MQTT::Simple=mosquitto.example.org \
533             -nle'retain "topic/here" => $_'
534              
535              
536             # Functional (single server only)
537              
538             use Net::MQTT::Simple "mosquitto.example.org";
539              
540             publish "topic/here" => "Message here";
541             retain "topic/here" => "Retained message here";
542              
543             my $value = mqtt_get "topic/here";
544             my @values = mqtt_get [ "topic/one", "topic/two" ];
545              
546              
547             # Object oriented (supports subscribing to topics)
548              
549             use Net::MQTT::Simple;
550              
551             my $mqtt = Net::MQTT::Simple->new("mosquitto.example.org");
552              
553             my $value = $mqtt->get("topic/here");
554             my @values = $mqtt->get([ "topic/one", "topic/two" ]);
555              
556             $mqtt->publish("topic/here" => "Message here");
557             $mqtt->retain( "topic/here" => "Message here");
558              
559             $mqtt->subscribe("topic/here" => sub {
560             my ($topic, $message) = @_;
561             ...
562             });
563              
564             $mqtt->run(
565             "sensors/+/temperature" => sub {
566             my ($topic, $message) = @_;
567             die "The building's on fire" if $message > 150;
568             },
569             "#" => sub {
570             my ($topic, $message) = @_;
571             print "[$topic] $message\n";
572             },
573             );
574              
575             =head1 DESCRIPTION
576              
577             This module consists of only one file and has no dependencies except core Perl
578             modules, making it suitable for embedded installations where CPAN installers
579             are unavailable and resources are limited.
580              
581             Only basic MQTT functionality is provided; if you need more, you'll have to
582             use the full-featured L<Net::MQTT> instead.
583              
584             Connections are set up on demand, automatically reconnecting to the server if a
585             previous connection had been lost.
586              
587             Because sensor scripts often run unattended, connection failures will result in
588             warnings (on STDERR if you didn't override that) without throwing an exception.
589              
590             Please refer to L<Net::MQTT::Simple::SSL> for more information about encrypted
591             and authenticated connections.
592              
593             =head2 Functional interface
594              
595             This will suffice for most simple sensor scripts. A socket is kept open for
596             reuse until the script has finished. The functional interface cannot be used
597             for subscriptions, only for publishing.
598              
599             Instead of requesting symbols to be imported, provide the MQTT server on the
600             C<use Net::MQTT::Simple> line. A non-standard port can be specified with a
601             colon. The functions C<publish>, C<retain>, and C<mqtt_get> will be exported,
602             that will call the methods C<publish>, C<retain>, and C<get>.
603              
604             =head2 Object oriented interface
605              
606             =head3 new(server[, sockopts])
607              
608             Specify the server (possibly with a colon and port number) to the constructor,
609             C<< Net::MQTT::Simple->new >>. The socket is disconnected when the object goes
610             out of scope.
611              
612             Optionally, a reference to a hash of socket options can be passed. Options
613             specified in this hash are passed on to the socket constructor.
614              
615             =head3 last_will([$topic, $message[, $retain]])
616              
617             Set a "Last Will and Testament", to be used on subsequent connections. Note
618             that the last will cannot be updated for a connection that is already
619             established.
620              
621             A last will is a message that is published by the broker on behalf of the
622             client, if the connection is dropped without an explicit call to C<disconnect>.
623              
624             Without arguments, returns the current values without changing the
625             active configuration.
626              
627             When the given topic and message are both undef, the last will is deconfigured.
628             In other cases, only arguments which are C<defined> are updated with the given
629             value. For the first setting, the topic is mandatory, the message defaults to
630             an empty string, and the retain flag defaults to false.
631              
632             Returns a list of the three values in the same order as the arguments.
633              
634             =head3 login($username[, $password])
635              
636             Sets authentication credentials, to be used on subsequent connections. Note
637             that the credentials cannot be updated for a connection that is already
638             established.
639              
640             The username is text, the password is binary.
641              
642             See L<Net::MQTT::Simple::SSL> for information about secure connections. To
643             enable insecure password authenticated connections, set the environment
644             variable MQTT_SIMPLE_ALLOW_INSECURE_LOGIN to a true value.
645              
646             Returns the username.
647              
648             =head1 DISCONNECTING GRACEFULLY
649              
650             =head2 disconnect
651              
652             Performs a graceful disconnect, which ensures that the server does NOT send
653             the registered "Last Will" message.
654              
655             Subsequent calls that require a connection, will cause a new connection to be
656             set up.
657              
658             =head1 PUBLISHING MESSAGES
659              
660             The two methods for publishing messages are the same, except for the state of
661             the C<retain> flag.
662              
663             =head2 retain(topic, message)
664              
665             Publish the message with the C<retain> flag on. Use this for sensor values or
666             anything else where the message indicates the current status of something.
667              
668             To discard a retained topic, provide an empty or undefined message.
669              
670             =head2 publish(topic, message)
671              
672             Publishes the message with the C<retain> flag off. Use this for ephemeral
673             messages about events that occur (like that a button was pressed).
674              
675             =head1 SUBSCRIPTIONS
676              
677             Note that any one topic should be used with C<subscribe>, C<once>, or C<get>,
678             but not multiple methods simultaneously.
679              
680             =head2 subscribe(topic, handler[, topic, handler, ...])
681              
682             Subscribes to the given topic(s) and registers the callbacks. Note that only
683             the first matching handler will be called for every message, even if filter
684             patterns overlap.
685              
686             =head2 unsubscribe(topic[, topic, ...])
687              
688             Unsubscribes from the given topic(s) and unregisters the corresponding
689             callbacks. The given topics must exactly match topics that were previously
690             used with the C<subscribe> method.
691              
692             =head2 once(topic, handler)
693              
694             Subscribes to a topic, and unsubscribes from that topic after the first message
695             that matches the topic has been received and the handler has been called.
696              
697             =head2 get(topics[, timeout])
698              
699             Returns the first message received for one or more topics. Specifically useful
700             when combined with retained messages.
701              
702             When given a single topic (string), returns the single message.
703              
704             When given multiple topics (reference to an array of strings), returns a
705             reference to an array of messages in scalar context, or the messages themselves
706             in list context.
707              
708             The timeout is expressed in whole seconds and defaults to 1. The actual time
709             waited may be up to 1 second longer than specified.
710              
711             =head2 run(...)
712              
713             Enters an infinite loop, which calls C<tick> repeatedly. If any arguments
714             are given, they will be passed to C<subscribe> first.
715              
716             =head2 tick(timeout)
717              
718             Test the socket to see if there's any incoming message, waiting at most
719             I<timeout> seconds (can be fractional). Use a timeout of C<0> to avoid
720             blocking, but note that blocking automatic reconnection may take place, which
721             may take much longer.
722              
723             If C<tick> returns false, this means that the socket was no longer connected
724             and that the next call will cause a reconnection attempt. However, a true value
725             does not necessarily mean that the socket is still functional. The only way to
726             reliably determine that a TCP stream is still connected, is to actually
727             communicate with the server, e.g. with a ping, which is only done periodically.
728              
729             =head1 UTILITY FUNCTIONS
730              
731             =head2 Net::MQTT::Simple::filter_as_regex(topic_filter)
732              
733             Given a valid MQTT topic filter, returns the corresponding regular expression.
734              
735             =head1 IPv6 PREREQUISITE
736              
737             For IPv6 support, the module L<IO::Socket::IP> needs to be installed. It comes
738             with Perl 5.20 and is available from CPAN for older Perls. If this module is
739             not available, the older L<IO::Socket::INET> will be used, which only supports
740             Legacy IP (IPv4).
741              
742             =head1 MANUAL INSTALLATION
743              
744             If you can't use the CPAN installer, you can actually install this module by
745             creating a directory C<Net/MQTT> and putting C<Simple.pm> in it. Please note
746             that this method does not work for every Perl module and should be used only
747             as a last resort on systems where proper installers are not available.
748              
749             To view the list of C<@INC> paths where Perl searches for modules, run C<perl
750             -V>. This list includes the current working directory (C<.>). Additional
751             include paths can be specified in the C<PERL5LIB> environment variable; see
752             L<perlenv>.
753              
754             =head1 NOT SUPPORTED
755              
756             =over 4
757              
758             =item QoS (Quality of Service)
759              
760             Every message is published at QoS level 0, that is, "at most once", also known
761             as "fire and forget".
762              
763             =item DUP (Duplicate message)
764              
765             Since QoS is not supported, no retransmissions are done, and no message will
766             indicate that it has already been sent before.
767              
768             =item Large data
769              
770             Because everything is handled in memory and there's no way to indicate to the
771             server that large messages are not desired, the connection is dropped as soon
772             as the server announces a packet larger than 2 megabytes.
773              
774             =item Validation of server-to-client communication
775              
776             The MQTT spec prescribes mandatory validation of all incoming data, and
777             disconnecting if anything (really, anything) is wrong with it. However, this
778             minimal implementation silently ignores anything it doesn't specifically
779             handle, which may result in weird behaviour if the server sends out bad data.
780              
781             Most clients do not adhere to this part of the specifications.
782              
783             =back
784              
785             =head1 CAVEATS
786              
787             =head2 Automatic reconnection
788              
789             Connection and reconnection are handled automatically, but without retries. If
790             anything goes wrong, this will cause a single reconnection attempt before the
791             following action. For example, if sending a message fails because of a
792             disconnected socket, the message will not be resent, but the next message might
793             succeed. Only one new connection attempt is done per approximately 5 seconds.
794             This behaviour is intended.
795              
796             =head2 Unicode
797              
798             This module uses the proper Perl Unicode abstractions for parts that according
799             to the MQTT specification are UTF-8 encoded. This includes I<topic>s, but not
800             I<message>s. Published messages are binary data, which you may have to encode
801             and decode yourself.
802              
803             This means that if you have UTF-8 encoded string literals in your code, you
804             should C<use utf8;> and that any of those strings which is a I<message> will
805             need to be encoded by you, for example with C<utf8::encode($message);>.
806              
807             It also means that a I<message> should never contain any character with an
808             ordinal value of greater than 255, because those cannot be used in binary
809             communication. If you're passing non-ASCII text strings, encode them before
810             publishing, decode them after receiving. A character greater than 255 results in
811             a warning
812              
813             Wide character in publish at yourfile.pl line 42.
814              
815             while the UTF-8 encoded data is passed through. To get rid of the warning, use
816             C<utf8::encode($message);>.
817              
818             =head1 LICENSE
819              
820             This software may be redistributed under the terms of the GPL, LGPL, modified
821             BSD, or Artistic license, or any of the other OSI approved licenses listed at
822             http://www.opensource.org/licenses/alphabetical. Distribution is allowed under
823             all of these licenses, or any smaller subset of multiple or just one of these
824             licenses.
825              
826             When using a packaged version, please refer to the package metadata to see
827             under which license terms it was distributed. Alternatively, a distributor may
828             choose to replace the LICENSE section of the documentation and/or include a
829             LICENSE file to reflect the license(s) they chose to redistribute under.
830              
831             =head1 AUTHOR
832              
833             Juerd Waalboer <juerd@tnx.nl>
834              
835             =head1 SEE ALSO
836              
837             L<Net::MQTT>, L<Net::MQTT::Simple::SSL>