| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package Net::MQTT::Simple; | 
| 2 |  |  |  |  |  |  |  | 
| 3 | 2 |  |  | 2 |  | 68657 | use strict; | 
|  | 2 |  |  |  |  | 6 |  | 
|  | 2 |  |  |  |  | 57 |  | 
| 4 | 2 |  |  | 2 |  | 10 | use warnings; | 
|  | 2 |  |  |  |  | 4 |  | 
|  | 2 |  |  |  |  | 52 |  | 
| 5 |  |  |  |  |  |  |  | 
| 6 | 2 |  |  | 2 |  | 1189 | use IO::Socket::IP; | 
|  | 2 |  |  |  |  | 71802 |  | 
|  | 2 |  |  |  |  | 10 |  | 
| 7 |  |  |  |  |  |  |  | 
| 8 |  |  |  |  |  |  | our $VERSION = '1.28'; | 
| 9 |  |  |  |  |  |  |  | 
| 10 |  |  |  |  |  |  | # Please note that these are not documented and are subject to change: | 
| 11 |  |  |  |  |  |  | our $KEEPALIVE_INTERVAL = 60; | 
| 12 |  |  |  |  |  |  | our $PING_TIMEOUT = 10; | 
| 13 |  |  |  |  |  |  | our $RECONNECT_INTERVAL = 5; | 
| 14 |  |  |  |  |  |  | our $MAX_LENGTH = 2097152;    # 2 MB | 
| 15 |  |  |  |  |  |  | our $READ_BYTES = 16 * 1024;  # 16 kB per IO::Socket::SSL recommendation | 
| 16 |  |  |  |  |  |  | our $WRITE_BYTES = 16 * 1024; # 16 kB per IO::Socket::SSL maximum | 
| 17 |  |  |  |  |  |  | our $PROTOCOL_LEVEL = 0x04;   # 0x03 in v3.1, 0x04 in v3.1.1 | 
| 18 |  |  |  |  |  |  | our $PROTOCOL_NAME = "MQTT";  # MQIsdp in v3.1, MQTT in v3.1.1 | 
| 19 |  |  |  |  |  |  |  | 
| 20 |  |  |  |  |  |  | my $global; | 
| 21 |  |  |  |  |  |  |  | 
| 22 | 0 |  |  | 0 |  | 0 | sub _default_port { 1883 } | 
| 23 | 0 |  |  | 0 |  | 0 | sub _socket_class { 'IO::Socket::IP' } | 
| 24 | 0 |  |  | 0 |  | 0 | sub _socket_error { "$@" } | 
| 25 | 0 |  |  | 0 |  | 0 | sub _secure { 0 } | 
| 26 |  |  |  |  |  |  |  | 
| 27 | 0 |  |  | 0 |  | 0 | sub _client_identifier { my ($class) = @_; return "Net::MQTT::Simple[" . $class->{random_id} . "]"; } | 
|  | 0 |  |  |  |  | 0 |  | 
| 28 |  |  |  |  |  |  |  | 
| 29 |  |  |  |  |  |  | # Carp might not be available either. | 
| 30 |  |  |  |  |  |  | sub _croak { | 
| 31 | 0 |  |  | 0 |  | 0 | die sprintf "%s at %s line %d.\n", "@_", (caller 1)[1, 2]; | 
| 32 |  |  |  |  |  |  | } | 
| 33 |  |  |  |  |  |  |  | 
| 34 |  |  |  |  |  |  | sub filter_as_regex { | 
| 35 | 555 |  |  | 555 | 1 | 306081 | my ($filter) = @_; | 
| 36 |  |  |  |  |  |  |  | 
| 37 | 555 | 100 |  |  |  | 1423 | return "^(?!\\\$)" if $filter eq '#';   # Match everything except /^\$/ | 
| 38 | 535 | 100 |  |  |  | 1031 | return "^/"        if $filter eq '/#';  # Parent (empty topic) is invalid | 
| 39 |  |  |  |  |  |  |  | 
| 40 | 518 |  |  |  |  | 989 | $filter = quotemeta $filter; | 
| 41 |  |  |  |  |  |  |  | 
| 42 | 518 |  |  |  |  | 2421 | $filter =~ s{ \z (? | 
| 43 | 518 |  |  |  |  | 1247 | $filter =~ s{ \\ \/ \\ \#           } {}x; | 
| 44 | 518 |  |  |  |  | 1169 | $filter =~ s{ \\ \+                 } {[^/]*+}xg; | 
| 45 | 518 |  |  |  |  | 1108 | $filter =~ s{ ^ (?= \[ \^ / \] \* ) } {(?!\\\$)}x;  # No /^\$/ if /^\+/ | 
| 46 |  |  |  |  |  |  |  | 
| 47 | 518 |  |  |  |  | 1366 | return "^$filter"; | 
| 48 |  |  |  |  |  |  | } | 
| 49 |  |  |  |  |  |  |  | 
| 50 |  |  |  |  |  |  | sub import { | 
| 51 | 2 |  |  | 2 |  | 27 | my ($class, $server) = @_; | 
| 52 | 2 | 50 |  |  |  | 11 | @_ <= 2 or _croak "Too many arguments for use " . __PACKAGE__; | 
| 53 |  |  |  |  |  |  |  | 
| 54 | 2 | 50 |  |  |  | 57 | $server or return; | 
| 55 |  |  |  |  |  |  |  | 
| 56 | 0 |  |  |  |  |  | $global = $class->new($server); | 
| 57 |  |  |  |  |  |  |  | 
| 58 | 2 |  |  | 2 |  | 2317 | no strict 'refs'; | 
|  | 2 |  |  |  |  | 4 |  | 
|  | 2 |  |  |  |  | 5824 |  | 
| 59 | 0 |  |  |  |  |  | *{ (caller)[0] . "::publish" } = \&publish; | 
|  | 0 |  |  |  |  |  |  | 
| 60 | 0 |  |  |  |  |  | *{ (caller)[0] . "::retain"  } = \&retain; | 
|  | 0 |  |  |  |  |  |  | 
| 61 |  |  |  |  |  |  | } | 
| 62 |  |  |  |  |  |  |  | 
| 63 |  |  |  |  |  |  | sub new { | 
| 64 | 0 |  |  | 0 | 1 |  | my ($class, $server, $sockopts) = @_; | 
| 65 | 0 | 0 | 0 |  |  |  | @_ == 2 or @_ == 3 or _croak "Wrong number of arguments for $class->new"; | 
| 66 |  |  |  |  |  |  |  | 
| 67 | 0 |  |  |  |  |  | my $port = $class->_default_port; | 
| 68 |  |  |  |  |  |  |  | 
| 69 |  |  |  |  |  |  | # Add port for bare IPv6 address | 
| 70 | 0 | 0 | 0 |  |  |  | $server = "[$server]:$port" if $server =~ /:.*:/ and not $server =~ /\[/; | 
| 71 |  |  |  |  |  |  |  | 
| 72 |  |  |  |  |  |  | # Add port for bare IPv4 address or bracketed IPv6 address | 
| 73 | 0 | 0 | 0 |  |  |  | $server .= ":$port" if $server !~ /:/ or $server =~ /^\[.*\]$/; | 
| 74 |  |  |  |  |  |  |  | 
| 75 |  |  |  |  |  |  | # Create a random ID for the instance of the object | 
| 76 | 0 |  |  |  |  |  | my $random_id = join "", map chr 65 + int rand 26, 1 .. 10; | 
| 77 |  |  |  |  |  |  |  | 
| 78 | 0 |  | 0 |  |  |  | return bless { | 
| 79 |  |  |  |  |  |  | server       => $server, | 
| 80 |  |  |  |  |  |  | last_connect => 0, | 
| 81 |  |  |  |  |  |  | sockopts     => $sockopts // {}, | 
| 82 |  |  |  |  |  |  | random_id    => $random_id | 
| 83 |  |  |  |  |  |  | }, $class; | 
| 84 |  |  |  |  |  |  | } | 
| 85 |  |  |  |  |  |  |  | 
| 86 |  |  |  |  |  |  | sub last_will { | 
| 87 | 0 |  |  | 0 | 1 |  | my ($self, $topic, $message, $retain) = @_; | 
| 88 |  |  |  |  |  |  |  | 
| 89 | 0 |  |  |  |  |  | my %old; | 
| 90 | 0 | 0 |  |  |  |  | %old = %{ $self->{will} } if $self->{will}; | 
|  | 0 |  |  |  |  |  |  | 
| 91 |  |  |  |  |  |  |  | 
| 92 | 0 | 0 |  |  |  |  | _croak "Wrong number of arguments for last_will" if @_ > 4; | 
| 93 |  |  |  |  |  |  |  | 
| 94 | 0 | 0 |  |  |  |  | if (@_ >= 2) { | 
| 95 | 0 | 0 | 0 |  |  |  | if (not defined $topic and not defined $message) { | 
| 96 | 0 |  |  |  |  |  | delete $self->{will}; | 
| 97 | 0 |  |  |  |  |  | delete $self->{encoded_will}; | 
| 98 |  |  |  |  |  |  |  | 
| 99 | 0 |  |  |  |  |  | return; | 
| 100 |  |  |  |  |  |  | } else { | 
| 101 |  |  |  |  |  |  | $self->{will} = { | 
| 102 |  |  |  |  |  |  | topic   => $topic    // $old{topic}   // '', | 
| 103 |  |  |  |  |  |  | message => $message  // $old{message} // '', | 
| 104 | 0 |  | 0 |  |  |  | retain  => !!$retain // $old{retain}  // 0, | 
|  |  |  | 0 |  |  |  |  | 
|  |  |  | 0 |  |  |  |  | 
|  |  |  | 0 |  |  |  |  | 
|  |  |  | 0 |  |  |  |  | 
|  |  |  | 0 |  |  |  |  | 
| 105 |  |  |  |  |  |  | }; | 
| 106 | 0 | 0 |  |  |  |  | _croak("Topic is empty") if not length $self->{will}->{topic}; | 
| 107 |  |  |  |  |  |  |  | 
| 108 | 0 |  |  |  |  |  | my $e = $self->{encoded_will} = { %{ $self->{will} } }; | 
|  | 0 |  |  |  |  |  |  | 
| 109 | 0 |  |  |  |  |  | utf8::encode($e->{topic}); | 
| 110 | 0 | 0 |  |  |  |  | utf8::downgrade($e->{message}, 1) or do { | 
| 111 | 0 |  |  |  |  |  | my ($file, $line, $method) = (caller 1)[1, 2, 3]; | 
| 112 | 0 |  |  |  |  |  | warn "Wide character in $method at $file line $line.\n"; | 
| 113 | 0 |  |  |  |  |  | utf8::encode($e->{message}); | 
| 114 |  |  |  |  |  |  | }; | 
| 115 |  |  |  |  |  |  | } | 
| 116 |  |  |  |  |  |  | } | 
| 117 |  |  |  |  |  |  |  | 
| 118 | 0 |  |  |  |  |  | return @{ $self->{will} }{qw/topic message retain/}; | 
|  | 0 |  |  |  |  |  |  | 
| 119 |  |  |  |  |  |  | } | 
| 120 |  |  |  |  |  |  |  | 
| 121 |  |  |  |  |  |  | sub login { | 
| 122 | 0 |  |  | 0 | 1 |  | my ($self, $username, $password) = @_; | 
| 123 |  |  |  |  |  |  |  | 
| 124 |  |  |  |  |  |  |  | 
| 125 | 0 | 0 |  |  |  |  | if (@_ > 1) { | 
| 126 |  |  |  |  |  |  | _croak "Password login is disabled for insecure connections" | 
| 127 |  |  |  |  |  |  | if defined $password | 
| 128 | 0 | 0 | 0 |  |  |  | and not $self->_secure || $ENV{MQTT_SIMPLE_ALLOW_INSECURE_LOGIN}; | 
|  |  |  | 0 |  |  |  |  | 
| 129 |  |  |  |  |  |  |  | 
| 130 | 0 |  |  |  |  |  | utf8::encode($username); | 
| 131 | 0 |  |  |  |  |  | $self->{username} = $username; | 
| 132 | 0 |  |  |  |  |  | $self->{password} = $password; | 
| 133 |  |  |  |  |  |  | } | 
| 134 |  |  |  |  |  |  |  | 
| 135 | 0 |  |  |  |  |  | return $username; | 
| 136 |  |  |  |  |  |  | } | 
| 137 |  |  |  |  |  |  |  | 
| 138 |  |  |  |  |  |  | sub _connect { | 
| 139 | 0 |  |  | 0 |  |  | my ($self) = @_; | 
| 140 |  |  |  |  |  |  |  | 
| 141 | 0 | 0 | 0 |  |  |  | return if $self->{socket} and $self->{socket}->connected; | 
| 142 |  |  |  |  |  |  |  | 
| 143 | 0 | 0 |  |  |  |  | if ($self->{last_connect} > time() - $RECONNECT_INTERVAL) { | 
| 144 | 0 |  |  |  |  |  | select undef, undef, undef, .01; | 
| 145 | 0 |  |  |  |  |  | return; | 
| 146 |  |  |  |  |  |  | } | 
| 147 |  |  |  |  |  |  |  | 
| 148 |  |  |  |  |  |  | # Reset state | 
| 149 | 0 |  |  |  |  |  | $self->{last_connect} = time; | 
| 150 | 0 |  |  |  |  |  | $self->{buffer} = ""; | 
| 151 | 0 |  |  |  |  |  | delete $self->{ping}; | 
| 152 |  |  |  |  |  |  |  | 
| 153 |  |  |  |  |  |  | # Connect | 
| 154 | 0 |  |  |  |  |  | my $socket_class = $self->_socket_class; | 
| 155 |  |  |  |  |  |  | my %socket_options = ( | 
| 156 |  |  |  |  |  |  | PeerAddr => $self->{server}, | 
| 157 | 0 |  |  |  |  |  | %{ $self->{sockopts} } | 
|  | 0 |  |  |  |  |  |  | 
| 158 |  |  |  |  |  |  | ); | 
| 159 | 0 | 0 |  |  |  |  | $self->{socket} = $socket_class->new( %socket_options ) | 
| 160 |  |  |  |  |  |  | or warn "$0: connect: " . $self->_socket_error . "\n"; | 
| 161 |  |  |  |  |  |  |  | 
| 162 |  |  |  |  |  |  | # Say hello | 
| 163 | 0 |  |  |  |  |  | local $self->{skip_connect} = 1;  # avoid infinite recursion :-) | 
| 164 | 0 |  |  |  |  |  | $self->_send_connect; | 
| 165 | 0 |  |  |  |  |  | $self->_send_subscribe; | 
| 166 |  |  |  |  |  |  | } | 
| 167 |  |  |  |  |  |  |  | 
| 168 |  |  |  |  |  |  | sub _prepend_variable_length { | 
| 169 |  |  |  |  |  |  | # Copied from Net::MQTT::Constants | 
| 170 | 0 |  |  | 0 |  |  | my ($data) = @_; | 
| 171 | 0 |  |  |  |  |  | my $v = length $data; | 
| 172 | 0 |  |  |  |  |  | my $o = ""; | 
| 173 | 0 |  |  |  |  |  | my $d; | 
| 174 | 0 |  |  |  |  |  | do { | 
| 175 | 0 |  |  |  |  |  | $d = $v % 128; | 
| 176 | 0 |  |  |  |  |  | $v = int($v/128); | 
| 177 | 0 | 0 |  |  |  |  | $d |= 0x80 if $v; | 
| 178 | 0 |  |  |  |  |  | $o .= pack "C", $d; | 
| 179 |  |  |  |  |  |  | } while $d & 0x80; | 
| 180 | 0 |  |  |  |  |  | return "$o$data"; | 
| 181 |  |  |  |  |  |  | } | 
| 182 |  |  |  |  |  |  |  | 
| 183 |  |  |  |  |  |  | sub _send { | 
| 184 | 0 |  |  | 0 |  |  | my ($self, $data) = @_; | 
| 185 |  |  |  |  |  |  |  | 
| 186 | 0 | 0 |  |  |  |  | $self->_connect unless exists $self->{skip_connect}; | 
| 187 | 0 |  |  |  |  |  | delete $self->{skip_connect}; | 
| 188 |  |  |  |  |  |  |  | 
| 189 | 0 | 0 |  |  |  |  | my $socket = $self->{socket} or return; | 
| 190 |  |  |  |  |  |  |  | 
| 191 | 0 |  |  |  |  |  | while (my $chunk = substr $data, 0, $WRITE_BYTES, "") { | 
| 192 | 0 | 0 |  |  |  |  | syswrite $socket, $chunk | 
| 193 |  |  |  |  |  |  | or $self->_drop_connection;  # reconnect on next message | 
| 194 |  |  |  |  |  |  | } | 
| 195 |  |  |  |  |  |  |  | 
| 196 | 0 |  |  |  |  |  | $self->{last_send} = time; | 
| 197 |  |  |  |  |  |  | } | 
| 198 |  |  |  |  |  |  |  | 
| 199 |  |  |  |  |  |  | sub _send_connect { | 
| 200 | 0 |  |  | 0 |  |  | my ($self) = @_; | 
| 201 |  |  |  |  |  |  |  | 
| 202 | 0 |  |  |  |  |  | my $will = $self->{encoded_will}; | 
| 203 | 0 |  |  |  |  |  | my $flags = 0x02; | 
| 204 | 0 | 0 |  |  |  |  | $flags |= 0x04 if $will; | 
| 205 | 0 | 0 | 0 |  |  |  | $flags |= 0x20 if $will and $will->{retain}; | 
| 206 |  |  |  |  |  |  |  | 
| 207 | 0 | 0 |  |  |  |  | $flags |= 0x80 if defined $self->{username}; | 
| 208 | 0 | 0 | 0 |  |  |  | $flags |= 0x40 if defined $self->{username} and defined $self->{password}; | 
| 209 |  |  |  |  |  |  |  | 
| 210 |  |  |  |  |  |  | $self->_send("\x10" . _prepend_variable_length(pack( | 
| 211 |  |  |  |  |  |  | "x C/a* C C n n/a*" | 
| 212 |  |  |  |  |  |  | . ($flags & 0x04 ? "n/a* n/a*" : "") | 
| 213 |  |  |  |  |  |  | . ($flags & 0x80 ? "n/a*" : "") | 
| 214 |  |  |  |  |  |  | . ($flags & 0x40 ? "n/a*" : ""), | 
| 215 |  |  |  |  |  |  | $PROTOCOL_NAME, | 
| 216 |  |  |  |  |  |  | $PROTOCOL_LEVEL, | 
| 217 |  |  |  |  |  |  | $flags, | 
| 218 |  |  |  |  |  |  | $KEEPALIVE_INTERVAL, | 
| 219 |  |  |  |  |  |  | $self->_client_identifier, | 
| 220 |  |  |  |  |  |  | ($flags & 0x04 ? ($will->{topic}, $will->{message}) : ()), | 
| 221 |  |  |  |  |  |  | ($flags & 0x80 ? $self->{username} : ()), | 
| 222 | 0 | 0 |  |  |  |  | ($flags & 0x40 ? $self->{password} : ()), | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
|  |  | 0 |  |  |  |  |  | 
| 223 |  |  |  |  |  |  | ))); | 
| 224 |  |  |  |  |  |  | } | 
| 225 |  |  |  |  |  |  |  | 
| 226 |  |  |  |  |  |  | sub _send_subscribe { | 
| 227 | 0 |  |  | 0 |  |  | my ($self, @topics) = @_; | 
| 228 |  |  |  |  |  |  |  | 
| 229 | 0 | 0 |  |  |  |  | if (not @topics) { | 
| 230 | 0 | 0 |  |  |  |  | @topics = keys %{ $self->{sub} } or return; | 
|  | 0 |  |  |  |  |  |  | 
| 231 |  |  |  |  |  |  | } | 
| 232 | 0 | 0 |  |  |  |  | return if not @topics; | 
| 233 |  |  |  |  |  |  |  | 
| 234 | 0 |  |  |  |  |  | utf8::encode($_) for @topics; | 
| 235 |  |  |  |  |  |  |  | 
| 236 |  |  |  |  |  |  | # Hardcoded "packet identifier" \0\x01 for now (was \0\0 but the spec | 
| 237 |  |  |  |  |  |  | # disallows it for subscribe packets and mosquitto started enforcing that.) | 
| 238 | 0 |  |  |  |  |  | $self->_send("\x82" . _prepend_variable_length("\0\x01" . | 
| 239 |  |  |  |  |  |  | pack("(n/a* x)*", @topics)  # x = QoS 0 | 
| 240 |  |  |  |  |  |  | )); | 
| 241 |  |  |  |  |  |  | } | 
| 242 |  |  |  |  |  |  |  | 
| 243 |  |  |  |  |  |  | sub _send_unsubscribe { | 
| 244 | 0 |  |  | 0 |  |  | my ($self, @topics) = @_; | 
| 245 |  |  |  |  |  |  |  | 
| 246 | 0 | 0 |  |  |  |  | return if not @topics; | 
| 247 |  |  |  |  |  |  |  | 
| 248 | 0 |  |  |  |  |  | utf8::encode($_) for @topics; | 
| 249 |  |  |  |  |  |  |  | 
| 250 |  |  |  |  |  |  | # Hardcoded "packet identifier" \0\0x01 for now; see above. | 
| 251 | 0 |  |  |  |  |  | $self->_send("\xa2" . _prepend_variable_length("\0\x01" . | 
| 252 |  |  |  |  |  |  | pack("(n/a*)*", @topics) | 
| 253 |  |  |  |  |  |  | )); | 
| 254 |  |  |  |  |  |  | } | 
| 255 |  |  |  |  |  |  |  | 
| 256 |  |  |  |  |  |  | sub _parse { | 
| 257 | 0 |  |  | 0 |  |  | my ($self) = @_; | 
| 258 |  |  |  |  |  |  |  | 
| 259 | 0 |  |  |  |  |  | my $bufref = \$self->{buffer}; | 
| 260 |  |  |  |  |  |  |  | 
| 261 | 0 | 0 |  |  |  |  | return if length $$bufref < 2; | 
| 262 |  |  |  |  |  |  |  | 
| 263 | 0 |  |  |  |  |  | my $offset = 1; | 
| 264 |  |  |  |  |  |  |  | 
| 265 | 0 |  |  |  |  |  | my $length = do { | 
| 266 | 0 |  |  |  |  |  | my $multiplier = 1; | 
| 267 | 0 |  |  |  |  |  | my $v = 0; | 
| 268 | 0 |  |  |  |  |  | my $d; | 
| 269 | 0 |  |  |  |  |  | do { | 
| 270 | 0 | 0 |  |  |  |  | return if $offset >= length $$bufref;  # not enough data yet | 
| 271 | 0 |  |  |  |  |  | $d = unpack "C", substr $$bufref, $offset++, 1; | 
| 272 | 0 |  |  |  |  |  | $v += ($d & 0x7f) * $multiplier; | 
| 273 | 0 |  |  |  |  |  | $multiplier *= 128; | 
| 274 |  |  |  |  |  |  | } while ($d & 0x80); | 
| 275 | 0 |  |  |  |  |  | $v; | 
| 276 |  |  |  |  |  |  | }; | 
| 277 |  |  |  |  |  |  |  | 
| 278 | 0 | 0 |  |  |  |  | if ($length > $MAX_LENGTH) { | 
| 279 |  |  |  |  |  |  | # On receiving an enormous packet, just disconnect to avoid exhausting | 
| 280 |  |  |  |  |  |  | # RAM on tiny systems. | 
| 281 |  |  |  |  |  |  | # TODO: just slurp and drop the data | 
| 282 | 0 |  |  |  |  |  | $self->_drop_connection; | 
| 283 | 0 |  |  |  |  |  | return; | 
| 284 |  |  |  |  |  |  | } | 
| 285 |  |  |  |  |  |  |  | 
| 286 | 0 | 0 |  |  |  |  | return if length($$bufref) < $offset + $length;  # not enough data yet | 
| 287 |  |  |  |  |  |  |  | 
| 288 | 0 |  |  |  |  |  | my $first_byte = unpack "C", substr $$bufref, 0, 1; | 
| 289 |  |  |  |  |  |  |  | 
| 290 | 0 |  |  |  |  |  | my $packet = { | 
| 291 |  |  |  |  |  |  | type   => ($first_byte & 0xF0) >> 4, | 
| 292 |  |  |  |  |  |  | dup    => ($first_byte & 0x08) >> 3, | 
| 293 |  |  |  |  |  |  | qos    => ($first_byte & 0x06) >> 1, | 
| 294 |  |  |  |  |  |  | retain => ($first_byte & 0x01), | 
| 295 |  |  |  |  |  |  | data   => substr($$bufref, $offset, $length), | 
| 296 |  |  |  |  |  |  | }; | 
| 297 |  |  |  |  |  |  |  | 
| 298 | 0 |  |  |  |  |  | substr $$bufref, 0, $offset + $length, "";  # remove the parsed bits. | 
| 299 |  |  |  |  |  |  |  | 
| 300 | 0 |  |  |  |  |  | return $packet; | 
| 301 |  |  |  |  |  |  | } | 
| 302 |  |  |  |  |  |  |  | 
| 303 |  |  |  |  |  |  | sub _incoming_publish { | 
| 304 | 0 |  |  | 0 |  |  | my ($self, $packet) = @_; | 
| 305 |  |  |  |  |  |  |  | 
| 306 |  |  |  |  |  |  | # Because QoS is not supported, no packed ID in the data. It would | 
| 307 |  |  |  |  |  |  | # have been 16 bits between $topic and $message. | 
| 308 | 0 |  |  |  |  |  | my ($topic, $message) = unpack "n/a a*", $packet->{data}; | 
| 309 |  |  |  |  |  |  |  | 
| 310 | 0 |  |  |  |  |  | utf8::decode($topic); | 
| 311 |  |  |  |  |  |  |  | 
| 312 | 0 |  |  |  |  |  | for my $cb (@{ $self->{callbacks} }) { | 
|  | 0 |  |  |  |  |  |  | 
| 313 | 0 | 0 |  |  |  |  | if ($topic =~ /$cb->{regex}/) { | 
| 314 | 0 |  |  |  |  |  | $cb->{callback}->($topic, $message, $packet->{retain}); | 
| 315 | 0 |  |  |  |  |  | return; | 
| 316 |  |  |  |  |  |  | } | 
| 317 |  |  |  |  |  |  | } | 
| 318 |  |  |  |  |  |  | } | 
| 319 |  |  |  |  |  |  |  | 
| 320 |  |  |  |  |  |  | sub _publish { | 
| 321 | 0 |  |  | 0 |  |  | my ($self, $retain, $topic, $message) = @_; | 
| 322 |  |  |  |  |  |  |  | 
| 323 | 0 | 0 | 0 |  |  |  | $message //= "" if $retain; | 
| 324 |  |  |  |  |  |  |  | 
| 325 | 0 |  |  |  |  |  | utf8::encode($topic); | 
| 326 | 0 | 0 |  |  |  |  | utf8::downgrade($message, 1) or do { | 
| 327 | 0 |  |  |  |  |  | my ($file, $line, $method) = (caller 1)[1, 2, 3]; | 
| 328 | 0 |  |  |  |  |  | warn "Wide character in $method at $file line $line.\n"; | 
| 329 | 0 |  |  |  |  |  | utf8::encode($message); | 
| 330 |  |  |  |  |  |  | }; | 
| 331 |  |  |  |  |  |  |  | 
| 332 | 0 | 0 |  |  |  |  | $self->_send( | 
| 333 |  |  |  |  |  |  | ($retain ? "\x31" : "\x30") | 
| 334 |  |  |  |  |  |  | . _prepend_variable_length( | 
| 335 |  |  |  |  |  |  | pack("n/a*", $topic) . $message | 
| 336 |  |  |  |  |  |  | ) | 
| 337 |  |  |  |  |  |  | ); | 
| 338 |  |  |  |  |  |  | } | 
| 339 |  |  |  |  |  |  |  | 
| 340 |  |  |  |  |  |  | sub publish { | 
| 341 | 0 |  |  | 0 | 1 |  | my $method = UNIVERSAL::isa($_[0], __PACKAGE__); | 
| 342 | 0 | 0 |  |  |  |  | @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for publish"; | 
|  |  | 0 |  |  |  |  |  | 
| 343 |  |  |  |  |  |  |  | 
| 344 | 0 | 0 |  |  |  |  | ($method ? shift : $global)->_publish(0, @_); | 
| 345 |  |  |  |  |  |  | } | 
| 346 |  |  |  |  |  |  |  | 
| 347 |  |  |  |  |  |  | sub retain { | 
| 348 | 0 |  |  | 0 | 1 |  | my $method = UNIVERSAL::isa($_[0], __PACKAGE__); | 
| 349 | 0 | 0 |  |  |  |  | @_ == ($method ? 3 : 2) or _croak "Wrong number of arguments for retain"; | 
|  |  | 0 |  |  |  |  |  | 
| 350 |  |  |  |  |  |  |  | 
| 351 | 0 | 0 |  |  |  |  | ($method ? shift : $global)->_publish(1, @_); | 
| 352 |  |  |  |  |  |  | } | 
| 353 |  |  |  |  |  |  |  | 
| 354 |  |  |  |  |  |  | sub run { | 
| 355 | 0 |  |  | 0 | 1 |  | my ($self, @subscribe_args) = @_; | 
| 356 |  |  |  |  |  |  |  | 
| 357 | 0 | 0 |  |  |  |  | $self->subscribe(@subscribe_args) if @subscribe_args; | 
| 358 |  |  |  |  |  |  |  | 
| 359 | 0 |  |  |  |  |  | until ($self->{stop_loop}) { | 
| 360 | 0 |  |  |  |  |  | my @timeouts; | 
| 361 |  |  |  |  |  |  | push @timeouts, $KEEPALIVE_INTERVAL - (time() - $self->{last_send}) | 
| 362 | 0 | 0 |  |  |  |  | if exists $self->{last_send}; | 
| 363 |  |  |  |  |  |  | push @timeouts, $PING_TIMEOUT - (time() - $self->{ping}) | 
| 364 | 0 | 0 |  |  |  |  | if exists $self->{ping}; | 
| 365 |  |  |  |  |  |  |  | 
| 366 |  |  |  |  |  |  | my $timeout = @timeouts | 
| 367 | 0 | 0 |  |  |  |  | ? (sort { $a <=> $b } @timeouts)[0] | 
|  | 0 |  |  |  |  |  |  | 
| 368 |  |  |  |  |  |  | : 1;  # default to 1 | 
| 369 |  |  |  |  |  |  |  | 
| 370 | 0 |  |  |  |  |  | $self->tick($timeout); | 
| 371 |  |  |  |  |  |  | } | 
| 372 |  |  |  |  |  |  |  | 
| 373 | 0 |  |  |  |  |  | delete $self->{stop_loop}; | 
| 374 |  |  |  |  |  |  | } | 
| 375 |  |  |  |  |  |  |  | 
| 376 |  |  |  |  |  |  | sub subscribe { | 
| 377 | 0 |  |  | 0 | 1 |  | my ($self, @kv) = @_; | 
| 378 |  |  |  |  |  |  |  | 
| 379 | 0 |  |  |  |  |  | while (my ($topic, $callback) = splice @kv, 0, 2) { | 
| 380 | 0 |  |  |  |  |  | $self->{sub}->{ $topic } = 1; | 
| 381 | 0 |  |  |  |  |  | push @{ $self->{callbacks} }, { | 
|  | 0 |  |  |  |  |  |  | 
| 382 |  |  |  |  |  |  | topic => $topic, | 
| 383 |  |  |  |  |  |  | regex => filter_as_regex($topic), | 
| 384 |  |  |  |  |  |  | callback => $callback, | 
| 385 |  |  |  |  |  |  | }; | 
| 386 |  |  |  |  |  |  | } | 
| 387 |  |  |  |  |  |  |  | 
| 388 | 0 | 0 |  |  |  |  | $self->_send_subscribe() if $self->{socket}; | 
| 389 |  |  |  |  |  |  | } | 
| 390 |  |  |  |  |  |  |  | 
| 391 |  |  |  |  |  |  | sub unsubscribe { | 
| 392 | 0 |  |  | 0 | 1 |  | my ($self, @topics) = @_; | 
| 393 |  |  |  |  |  |  |  | 
| 394 | 0 |  |  |  |  |  | $self->_send_unsubscribe(@topics); | 
| 395 |  |  |  |  |  |  |  | 
| 396 | 0 |  |  |  |  |  | my $cb = $self->{callbacks}; | 
| 397 | 0 |  |  |  |  |  | for my $topic ( @topics ) { | 
| 398 | 0 |  |  |  |  |  | @$cb = grep {$_->{topic} ne $topic} @$cb; | 
|  | 0 |  |  |  |  |  |  | 
| 399 |  |  |  |  |  |  | } | 
| 400 |  |  |  |  |  |  |  | 
| 401 | 0 |  |  |  |  |  | delete @{ $self->{sub} }{ @topics }; | 
|  | 0 |  |  |  |  |  |  | 
| 402 |  |  |  |  |  |  | } | 
| 403 |  |  |  |  |  |  |  | 
| 404 |  |  |  |  |  |  | sub tick { | 
| 405 | 0 |  |  | 0 | 1 |  | my ($self, $timeout) = @_; | 
| 406 |  |  |  |  |  |  |  | 
| 407 | 0 |  |  |  |  |  | $self->_connect; | 
| 408 |  |  |  |  |  |  |  | 
| 409 | 0 | 0 |  |  |  |  | my $socket = $self->{socket} or return; | 
| 410 | 0 |  |  |  |  |  | my $bufref = \$self->{buffer}; | 
| 411 |  |  |  |  |  |  |  | 
| 412 | 0 |  |  |  |  |  | my $r = ''; | 
| 413 | 0 |  |  |  |  |  | vec($r, fileno($socket), 1) = 1; | 
| 414 |  |  |  |  |  |  |  | 
| 415 | 0 | 0 | 0 |  |  |  | if (select($r, undef, undef, $timeout // 0) > 0) { | 
| 416 |  |  |  |  |  |  | sysread $socket, $$bufref, $READ_BYTES, length $$bufref | 
| 417 | 0 | 0 |  |  |  |  | or delete $self->{socket}; | 
| 418 |  |  |  |  |  |  |  | 
| 419 | 0 |  |  |  |  |  | while (length $$bufref) { | 
| 420 | 0 | 0 |  |  |  |  | my $packet = $self->_parse() or last; | 
| 421 | 0 | 0 |  |  |  |  | $self->_incoming_publish($packet) if $packet->{type} == 3; | 
| 422 | 0 | 0 |  |  |  |  | delete $self->{ping}              if $packet->{type} == 13; | 
| 423 |  |  |  |  |  |  | } | 
| 424 |  |  |  |  |  |  | } | 
| 425 |  |  |  |  |  |  |  | 
| 426 | 0 | 0 |  |  |  |  | if (time() >= $self->{last_send} + $KEEPALIVE_INTERVAL) { | 
| 427 | 0 |  |  |  |  |  | $self->_send("\xc0\0");  # PINGREQ | 
| 428 | 0 |  |  |  |  |  | $self->{ping} = time; | 
| 429 |  |  |  |  |  |  | } | 
| 430 | 0 | 0 | 0 |  |  |  | if ($self->{ping} and time() >= $self->{ping} + $PING_TIMEOUT) { | 
| 431 | 0 |  |  |  |  |  | $self->_drop_connection; | 
| 432 |  |  |  |  |  |  | } | 
| 433 |  |  |  |  |  |  |  | 
| 434 | 0 |  |  |  |  |  | return !! $self->{socket}; | 
| 435 |  |  |  |  |  |  | } | 
| 436 |  |  |  |  |  |  |  | 
| 437 |  |  |  |  |  |  | sub disconnect { | 
| 438 | 0 |  |  | 0 | 1 |  | my ($self) = @_; | 
| 439 |  |  |  |  |  |  |  | 
| 440 |  |  |  |  |  |  | $self->_send(pack "C x", 0xe0) | 
| 441 | 0 | 0 | 0 |  |  |  | if $self->{socket} and $self->{socket}->connected; | 
| 442 |  |  |  |  |  |  |  | 
| 443 | 0 |  |  |  |  |  | $self->_drop_connection; | 
| 444 |  |  |  |  |  |  | } | 
| 445 |  |  |  |  |  |  |  | 
| 446 |  |  |  |  |  |  | sub _drop_connection { | 
| 447 | 0 |  |  | 0 |  |  | my ($self) = @_; | 
| 448 |  |  |  |  |  |  |  | 
| 449 | 0 |  |  |  |  |  | delete $self->{socket}; | 
| 450 | 0 |  |  |  |  |  | $self->{last_connect} = 0; | 
| 451 |  |  |  |  |  |  | } | 
| 452 |  |  |  |  |  |  |  | 
| 453 |  |  |  |  |  |  | 1; | 
| 454 |  |  |  |  |  |  |  | 
| 455 |  |  |  |  |  |  | __END__ |