| line | stmt | bran | cond | sub | pod | time | code | 
| 1 | 15 |  |  | 15 |  | 202600 | use strict; | 
|  | 15 |  |  |  |  | 32 |  | 
|  | 15 |  |  |  |  | 485 |  | 
| 2 | 15 |  |  | 15 |  | 74 | use warnings; | 
|  | 15 |  |  |  |  | 23 |  | 
|  | 15 |  |  |  |  | 811 |  | 
| 3 |  |  |  |  |  |  | package AnyEvent::MQTT; | 
| 4 |  |  |  |  |  |  | $AnyEvent::MQTT::VERSION = '1.212810'; | 
| 5 |  |  |  |  |  |  | # ABSTRACT: AnyEvent module for an MQTT client | 
| 6 |  |  |  |  |  |  |  | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 15 |  |  | 15 |  | 76 | use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG}; | 
|  | 15 |  |  |  |  | 27 |  | 
|  | 15 |  |  |  |  | 1067 |  | 
| 9 | 15 |  |  | 15 |  | 84 | use AnyEvent; | 
|  | 15 |  |  |  |  | 27 |  | 
|  | 15 |  |  |  |  | 322 |  | 
| 10 | 15 |  |  | 15 |  | 68 | use AnyEvent::Handle; | 
|  | 15 |  |  |  |  | 25 |  | 
|  | 15 |  |  |  |  | 323 |  | 
| 11 | 15 |  |  | 15 |  | 74 | use Net::MQTT::Constants; | 
|  | 15 |  |  |  |  | 36 |  | 
|  | 15 |  |  |  |  | 125 |  | 
| 12 | 15 |  |  | 15 |  | 10360 | use Net::MQTT::Message; | 
|  | 15 |  |  |  |  | 484112 |  | 
|  | 15 |  |  |  |  | 645 |  | 
| 13 | 15 |  |  | 15 |  | 6487 | use Net::MQTT::TopicStore; | 
|  | 15 |  |  |  |  | 7134 |  | 
|  | 15 |  |  |  |  | 501 |  | 
| 14 | 15 |  |  | 15 |  | 91 | use Carp qw/croak carp/; | 
|  | 15 |  |  |  |  | 32 |  | 
|  | 15 |  |  |  |  | 1012 |  | 
| 15 | 15 |  |  | 15 |  | 86 | use Sub::Name; | 
|  | 15 |  |  |  |  | 30 |  | 
|  | 15 |  |  |  |  | 607 |  | 
| 16 | 15 |  |  | 15 |  | 80 | use Scalar::Util qw/weaken/; | 
|  | 15 |  |  |  |  | 26 |  | 
|  | 15 |  |  |  |  | 73561 |  | 
| 17 |  |  |  |  |  |  |  | 
| 18 |  |  |  |  |  |  |  | 
| 19 |  |  |  |  |  |  | sub new { | 
| 20 | 15 |  |  | 15 | 1 | 9542 | my ($pkg, %p) = @_; | 
| 21 | 15 |  |  |  |  | 152 | my $self = | 
| 22 |  |  |  |  |  |  | bless { | 
| 23 |  |  |  |  |  |  | socket => undef, | 
| 24 |  |  |  |  |  |  | host => '127.0.0.1', | 
| 25 |  |  |  |  |  |  | port => '1883', | 
| 26 |  |  |  |  |  |  | timeout => 30, | 
| 27 |  |  |  |  |  |  | wait => 'nothing', | 
| 28 |  |  |  |  |  |  | keep_alive_timer => 120, | 
| 29 |  |  |  |  |  |  | qos => MQTT_QOS_AT_MOST_ONCE, | 
| 30 |  |  |  |  |  |  | message_id => 1, | 
| 31 |  |  |  |  |  |  | user_name => undef, | 
| 32 |  |  |  |  |  |  | password => undef, | 
| 33 |  |  |  |  |  |  | tls => undef, | 
| 34 |  |  |  |  |  |  | will_topic => undef, | 
| 35 |  |  |  |  |  |  | will_qos => MQTT_QOS_AT_MOST_ONCE, | 
| 36 |  |  |  |  |  |  | will_retain => 0, | 
| 37 |  |  |  |  |  |  | will_message => '', | 
| 38 |  |  |  |  |  |  | client_id => undef, | 
| 39 |  |  |  |  |  |  | clean_session => 1, | 
| 40 |  |  |  |  |  |  | handle_args => [], | 
| 41 |  |  |  |  |  |  | write_queue => [], | 
| 42 |  |  |  |  |  |  | inflight => {}, | 
| 43 |  |  |  |  |  |  | _sub_topics => Net::MQTT::TopicStore->new(), | 
| 44 |  |  |  |  |  |  | %p, | 
| 45 |  |  |  |  |  |  | }, $pkg; | 
| 46 |  |  |  |  |  |  | } | 
| 47 |  |  |  |  |  |  |  | 
| 48 |  |  |  |  |  |  | sub DESTROY { | 
| 49 | 15 |  |  | 15 |  | 120243 | $_[0]->cleanup; | 
| 50 |  |  |  |  |  |  | } | 
| 51 |  |  |  |  |  |  |  | 
| 52 |  |  |  |  |  |  |  | 
| 53 |  |  |  |  |  |  | sub cleanup { | 
| 54 | 20 |  |  | 20 | 1 | 78 | my $self = shift; | 
| 55 | 20 |  |  |  |  | 36 | print STDERR "cleanup\n" if DEBUG; | 
| 56 | 20 | 100 |  |  |  | 139 | if ($self->{handle}) { | 
| 57 | 18 |  |  |  |  | 572 | my $cv = AnyEvent->condvar; | 
| 58 | 18 |  |  |  |  | 141 | my $handle = $self->{handle}; | 
| 59 | 18 |  |  |  |  | 107 | weaken $handle; | 
| 60 | 18 |  |  | 11 |  | 198 | $cv->cb(sub { $handle->destroy }); | 
|  | 11 |  |  |  |  | 137 |  | 
| 61 | 18 |  |  |  |  | 206 | $self->_send(message_type => MQTT_DISCONNECT, cv => $cv); | 
| 62 |  |  |  |  |  |  | } | 
| 63 | 20 |  |  |  |  | 102 | delete $self->{handle}; | 
| 64 | 20 |  |  |  |  | 328 | delete $self->{connected}; | 
| 65 | 20 |  |  |  |  | 44 | delete $self->{wait}; | 
| 66 | 20 |  |  |  |  | 78 | delete $self->{_keep_alive_handle}; | 
| 67 | 20 |  |  |  |  | 40 | delete $self->{_keep_alive_waiting}; | 
| 68 | 20 |  |  |  |  | 706 | $self->{write_queue} = []; | 
| 69 |  |  |  |  |  |  | } | 
| 70 |  |  |  |  |  |  |  | 
| 71 |  |  |  |  |  |  | sub _error { | 
| 72 | 5 |  |  | 5 |  | 114 | my ($self, $fatal, $message, $reconnect) = @_; | 
| 73 | 5 |  |  |  |  | 25 | $self->cleanup($message); | 
| 74 | 5 | 50 |  |  |  | 42 | $self->{on_error}->($fatal, $message) if ($self->{on_error}); | 
| 75 | 5 | 100 |  |  |  | 116 | $self->_reconnect() if ($reconnect); | 
| 76 |  |  |  |  |  |  | } | 
| 77 |  |  |  |  |  |  |  | 
| 78 |  |  |  |  |  |  |  | 
| 79 |  |  |  |  |  |  | sub publish { | 
| 80 | 8 |  |  | 8 | 1 | 8485 | my ($self, %p) = @_; | 
| 81 |  |  |  |  |  |  | my $topic = exists $p{topic} ? $p{topic} : | 
| 82 | 8 | 100 |  |  |  | 103 | croak ref $self, '->publish requires "topic" parameter'; | 
| 83 | 7 | 100 |  |  |  | 26 | my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE; | 
| 84 | 7 | 100 |  |  |  | 137 | my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar; | 
| 85 | 7 |  |  |  |  | 35 | my $expect; | 
| 86 | 7 | 100 |  |  |  | 23 | if ($qos) { | 
| 87 | 3 | 100 |  |  |  | 10 | $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC); | 
| 88 |  |  |  |  |  |  | } | 
| 89 | 7 |  |  |  |  | 14 | my $message = $p{message}; | 
| 90 | 7 | 100 |  |  |  | 21 | if (defined $message) { | 
| 91 | 4 |  |  |  |  | 6 | print STDERR "publish: message[$message] => $topic\n" if DEBUG; | 
| 92 | 4 |  |  |  |  | 26 | $self->_send_with_ack({ | 
| 93 |  |  |  |  |  |  | message_type => MQTT_PUBLISH, | 
| 94 |  |  |  |  |  |  | %p, | 
| 95 |  |  |  |  |  |  | }, $cv, $expect); | 
| 96 | 4 |  |  |  |  | 20 | return $cv; | 
| 97 |  |  |  |  |  |  | } | 
| 98 |  |  |  |  |  |  | my $handle = exists $p{handle} ? $p{handle} : | 
| 99 | 3 | 100 |  |  |  | 75 | croak ref $self, '->publish requires "message" or "handle" parameter'; | 
| 100 | 2 | 100 |  |  |  | 23 | unless ($handle->isa('AnyEvent::Handle')) { | 
| 101 | 1 | 50 |  |  |  | 17 | my @args = @{$p{handle_args}||[]}; | 
|  | 1 |  |  |  |  | 6 |  | 
| 102 | 1 |  |  |  |  | 2 | print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG; | 
| 103 | 1 |  |  |  |  | 7 | $handle = AnyEvent::Handle->new(fh => $handle, @args); | 
| 104 |  |  |  |  |  |  | } | 
| 105 | 2 |  |  |  |  | 112 | my $error_sub = $handle->{on_error}; # Hack: There is no accessor api | 
| 106 |  |  |  |  |  |  | $handle->on_error(subname 'on_error_for_read_publish_'.$topic => | 
| 107 |  |  |  |  |  |  | sub { | 
| 108 | 2 |  |  | 2 |  | 345 | my ($hdl, $fatal, $msg) = @_; | 
| 109 | 2 | 50 |  |  |  | 23 | $error_sub->(@_) if ($error_sub); | 
| 110 | 2 |  |  |  |  | 129 | $hdl->destroy; | 
| 111 | 2 |  |  |  |  | 6 | undef $hdl; | 
| 112 | 2 |  |  |  |  | 6 | $cv->send(1); | 
| 113 | 2 |  |  |  |  | 32 | }); | 
| 114 | 2 |  |  |  |  | 9 | my $weak_self = $self; | 
| 115 | 2 |  |  |  |  | 7 | weaken $weak_self; | 
| 116 | 2 | 100 |  |  |  | 3 | my @push_read_args = @{$p{push_read_args}||['line']}; | 
|  | 2 |  |  |  |  | 13 |  | 
| 117 | 2 |  |  |  |  | 4 | my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub { | 
| 118 | 2 |  |  | 2 |  | 899 | my ($hdl, $chunk, @args) = @_; | 
| 119 | 2 |  |  |  |  | 4 | print STDERR "publish: $chunk => $topic\n" if DEBUG; | 
| 120 | 2 |  |  |  |  | 51 | my $send_cv = AnyEvent->condvar; | 
| 121 | 2 |  |  |  |  | 11 | print STDERR "publish: message[$chunk] => $topic\n" if DEBUG; | 
| 122 |  |  |  |  |  |  | $weak_self->_send_with_ack({ | 
| 123 |  |  |  |  |  |  | message_type => MQTT_PUBLISH, | 
| 124 |  |  |  |  |  |  | qos => $qos, | 
| 125 |  |  |  |  |  |  | retain => $p{retain}, | 
| 126 | 2 |  |  |  |  | 13 | topic => $topic, | 
| 127 |  |  |  |  |  |  | message => $chunk, | 
| 128 |  |  |  |  |  |  | }, $send_cv, $expect); | 
| 129 |  |  |  |  |  |  | $send_cv->cb(subname 'publish_ack_'.$topic => | 
| 130 | 2 |  |  |  |  | 32 | sub { $handle->push_read(@push_read_args => $sub ) }); | 
|  | 2 |  |  |  |  | 19 |  | 
| 131 | 2 |  |  |  |  | 67 | return; | 
| 132 | 2 |  |  |  |  | 15 | }; | 
| 133 | 2 |  |  |  |  | 9 | $handle->push_read(@push_read_args => $sub); | 
| 134 | 2 |  |  |  |  | 244 | return $cv; | 
| 135 |  |  |  |  |  |  | } | 
| 136 |  |  |  |  |  |  |  | 
| 137 |  |  |  |  |  |  |  | 
| 138 |  |  |  |  |  |  | sub next_message_id { | 
| 139 | 70018 |  |  | 70018 | 1 | 258016 | my $self = shift; | 
| 140 | 70018 |  |  |  |  | 89040 | my $res = $self->{message_id}; | 
| 141 | 70018 |  |  |  |  | 83465 | $self->{message_id}++; | 
| 142 | 70018 | 100 |  |  |  | 117857 | $self->{message_id} = 1 if $self->{message_id} >= 65536; | 
| 143 | 70018 |  |  |  |  | 99796 | $res; | 
| 144 |  |  |  |  |  |  | } | 
| 145 |  |  |  |  |  |  |  | 
| 146 |  |  |  |  |  |  | sub _send_with_ack { | 
| 147 | 8 |  |  | 8 |  | 24 | my ($self, $args, $cv, $expect, $dup) = @_; | 
| 148 | 8 | 100 |  |  |  | 23 | if ($args->{qos}) { | 
| 149 | 5 | 100 |  |  |  | 17 | unless (exists $args->{message_id}) { | 
| 150 | 3 |  |  |  |  | 10 | $args->{message_id} = $self->next_message_id(); | 
| 151 |  |  |  |  |  |  | } | 
| 152 | 5 |  |  |  |  | 12 | my $mid = $args->{message_id}; | 
| 153 | 5 |  |  |  |  | 118 | my $send_cv = AnyEvent->condvar; | 
| 154 |  |  |  |  |  |  | $send_cv->cb(subname 'ack_cb_for_'.$mid => sub { | 
| 155 |  |  |  |  |  |  | $self->{inflight}->{$mid} = | 
| 156 |  |  |  |  |  |  | { | 
| 157 |  |  |  |  |  |  | expect => $expect, | 
| 158 |  |  |  |  |  |  | message => $args, | 
| 159 |  |  |  |  |  |  | cv => $cv, | 
| 160 |  |  |  |  |  |  | timeout => | 
| 161 |  |  |  |  |  |  | AnyEvent->timer(after => $self->{keep_alive_timer}, | 
| 162 |  |  |  |  |  |  | cb => subname 'ack_timeout_for_'.$mid => | 
| 163 |  |  |  |  |  |  | sub { | 
| 164 | 1 |  |  |  |  | 29 | print ref $self, " timeout waiting for ", | 
| 165 |  |  |  |  |  |  | message_type_string($expect), "\n" if DEBUG; | 
| 166 | 1 |  |  |  |  | 5 | delete $self->{inflight}->{$mid}; | 
| 167 | 1 |  |  |  |  | 6 | $self->_send_with_ack($args, $cv, $expect, 1); | 
| 168 | 5 |  |  | 5 |  | 99 | }), | 
| 169 |  |  |  |  |  |  | }; | 
| 170 | 5 |  |  |  |  | 108 | }); | 
| 171 | 5 |  |  |  |  | 50 | $args->{cv} = $send_cv; | 
| 172 |  |  |  |  |  |  | } else { | 
| 173 | 3 |  |  |  |  | 6 | $args->{cv} = $cv; | 
| 174 |  |  |  |  |  |  | } | 
| 175 | 8 | 100 |  |  |  | 23 | $args->{dup} = 1 if ($dup); | 
| 176 | 8 |  |  |  |  | 38 | return $self->_send(%$args); | 
| 177 |  |  |  |  |  |  | } | 
| 178 |  |  |  |  |  |  |  | 
| 179 |  |  |  |  |  |  |  | 
| 180 |  |  |  |  |  |  | sub subscribe { | 
| 181 | 19 |  |  | 19 | 1 | 10172 | my ($self, %p) = @_; | 
| 182 |  |  |  |  |  |  | my $topic = exists $p{topic} ? $p{topic} : | 
| 183 | 19 | 100 |  |  |  | 239 | croak ref $self, '->subscribe requires "topic" parameter'; | 
| 184 |  |  |  |  |  |  | my $sub = exists $p{callback} ? $p{callback} : | 
| 185 | 18 | 100 |  |  |  | 116 | croak ref $self, '->subscribe requires "callback" parameter'; | 
| 186 | 17 | 100 |  |  |  | 43 | my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE; | 
| 187 | 17 | 100 |  |  |  | 289 | my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar; | 
| 188 | 17 |  |  |  |  | 98 | my $mid = $self->_add_subscription($topic, $cv, $sub); | 
| 189 | 17 | 100 |  |  |  | 43 | if (defined $mid) { # not already subscribed/subscribing | 
| 190 | 12 |  |  |  |  | 39 | $self->_send(message_type => MQTT_SUBSCRIBE, | 
| 191 |  |  |  |  |  |  | message_id => $mid, | 
| 192 |  |  |  |  |  |  | topics => [[$topic, $qos]]); | 
| 193 |  |  |  |  |  |  | } | 
| 194 |  |  |  |  |  |  | $cv | 
| 195 | 17 |  |  |  |  | 53 | } | 
| 196 |  |  |  |  |  |  |  | 
| 197 |  |  |  |  |  |  |  | 
| 198 |  |  |  |  |  |  | sub unsubscribe { | 
| 199 | 7 |  |  | 7 | 1 | 6416 | my ($self, %p) = @_; | 
| 200 |  |  |  |  |  |  | my $topic = exists $p{topic} ? $p{topic} : | 
| 201 | 7 | 100 |  |  |  | 96 | croak ref $self, '->unsubscribe requires "topic" parameter'; | 
| 202 | 6 | 100 |  |  |  | 110 | my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar; | 
| 203 | 6 |  |  |  |  | 39 | my $mid = $self->_remove_subscription($topic, $cv, $p{callback}); | 
| 204 | 6 | 100 |  |  |  | 16 | if (defined $mid) { # not already subscribed/subscribing | 
| 205 | 2 |  |  |  |  | 10 | $self->_send(message_type => MQTT_UNSUBSCRIBE, | 
| 206 |  |  |  |  |  |  | message_id => $mid, | 
| 207 |  |  |  |  |  |  | topics => [$topic]); | 
| 208 |  |  |  |  |  |  | } | 
| 209 |  |  |  |  |  |  | $cv | 
| 210 | 6 |  |  |  |  | 20 | } | 
| 211 |  |  |  |  |  |  |  | 
| 212 |  |  |  |  |  |  | sub _add_subscription { | 
| 213 | 17 |  |  | 17 |  | 39 | my ($self, $topic, $cv, $sub) = @_; | 
| 214 | 17 |  |  |  |  | 34 | my $rec = $self->{_sub}->{$topic}; | 
| 215 | 17 | 100 |  |  |  | 49 | if ($rec) { | 
| 216 | 2 |  |  |  |  | 3 | print STDERR "Add $sub to existing $topic subscription\n" if DEBUG; | 
| 217 | 2 |  |  |  |  | 7 | $rec->{cb}->{$sub} = $sub; | 
| 218 | 2 |  |  |  |  | 9 | $cv->send($rec->{qos}); | 
| 219 | 2 |  |  |  |  | 20 | foreach my $msg (values %{$rec->{retained}}) { | 
|  | 2 |  |  |  |  | 8 |  | 
| 220 | 0 |  |  |  |  | 0 | $sub->($msg->topic, $msg->message, $msg); | 
| 221 |  |  |  |  |  |  | } | 
| 222 | 2 |  |  |  |  | 6 | return; | 
| 223 |  |  |  |  |  |  | } | 
| 224 | 15 |  |  |  |  | 27 | $rec = $self->{_sub_pending}->{$topic}; | 
| 225 | 15 | 100 |  |  |  | 33 | if ($rec) { | 
| 226 | 3 |  |  |  |  | 5 | print STDERR "Add $sub to existing pending $topic subscription\n" if DEBUG; | 
| 227 | 3 |  |  |  |  | 17 | $rec->{cb}->{$sub} = $sub; | 
| 228 | 3 |  |  |  |  | 6 | push @{$rec->{cv}}, $cv; | 
|  | 3 |  |  |  |  | 15 |  | 
| 229 | 3 |  |  |  |  | 11 | return; | 
| 230 |  |  |  |  |  |  | } | 
| 231 | 12 |  |  |  |  | 34 | my $mid = $self->next_message_id(); | 
| 232 | 12 |  |  |  |  | 13 | print STDERR "Add $sub as pending $topic subscription (mid=$mid)\n" if DEBUG; | 
| 233 | 12 |  |  |  |  | 43 | $self->{_sub_pending_by_message_id}->{$mid} = $topic; | 
| 234 | 12 |  |  |  |  | 62 | $self->{_sub_pending}->{$topic} = | 
| 235 |  |  |  |  |  |  | { cb => { $sub => $sub }, cv => [ $cv ], retained => {} }; | 
| 236 | 12 |  |  |  |  | 34 | $mid; | 
| 237 |  |  |  |  |  |  | } | 
| 238 |  |  |  |  |  |  |  | 
| 239 |  |  |  |  |  |  | sub _remove_subscription { | 
| 240 | 6 |  |  | 6 |  | 17 | my ($self, $topic, $cv, $sub) = @_; | 
| 241 | 6 |  |  |  |  | 11 | my $rec = $self->{_unsub_pending}->{$topic}; | 
| 242 | 6 | 100 |  |  |  | 16 | if ($rec) { | 
| 243 | 1 |  |  |  |  | 2 | print STDERR "Remove of $topic with pending unsubscribe\n" if DEBUG; | 
| 244 | 1 |  |  |  |  | 2 | push @{$rec->{cv}}, $cv; | 
|  | 1 |  |  |  |  | 3 |  | 
| 245 | 1 |  |  |  |  | 3 | return; | 
| 246 |  |  |  |  |  |  | } | 
| 247 | 5 |  |  |  |  | 10 | $rec = $self->{_sub}->{$topic}; | 
| 248 | 5 | 100 |  |  |  | 15 | unless ($rec) { | 
| 249 | 1 |  |  |  |  | 1 | print STDERR "Remove of $topic with no subscription\n" if DEBUG; | 
| 250 | 1 |  |  |  |  | 5 | $cv->send(0); | 
| 251 | 1 |  |  |  |  | 9 | return; | 
| 252 |  |  |  |  |  |  | } | 
| 253 |  |  |  |  |  |  |  | 
| 254 | 4 | 100 |  |  |  | 7 | if (defined $sub) { | 
| 255 | 3 | 50 |  |  |  | 9 | unless (exists $rec->{cb}->{$sub}) { | 
| 256 | 0 |  |  |  |  | 0 | print STDERR "Remove of $topic for $sub with no subscription\n" | 
| 257 |  |  |  |  |  |  | if DEBUG; | 
| 258 | 0 |  |  |  |  | 0 | $cv->send(0); | 
| 259 | 0 |  |  |  |  | 0 | return; | 
| 260 |  |  |  |  |  |  | } | 
| 261 | 3 |  |  |  |  | 5 | delete $rec->{cb}->{$sub}; | 
| 262 | 3 | 100 |  |  |  | 5 | if (keys %{$rec->{cb}}) { | 
|  | 3 |  |  |  |  | 10 |  | 
| 263 | 2 |  |  |  |  | 3 | print STDERR "Remove of $topic for $sub\n" if DEBUG; | 
| 264 | 2 |  |  |  |  | 5 | $cv->send(1); | 
| 265 | 2 |  |  |  |  | 19 | return; | 
| 266 |  |  |  |  |  |  | } | 
| 267 |  |  |  |  |  |  | } | 
| 268 | 2 |  |  |  |  | 4 | print STDERR "Remove of $topic\n" if DEBUG; | 
| 269 | 2 |  |  |  |  | 9 | my $mid = $self->next_message_id(); | 
| 270 | 2 |  |  |  |  | 5 | delete $self->{_sub}->{$topic}; | 
| 271 | 2 |  |  |  |  | 11 | $self->{_sub_topics}->delete($topic); | 
| 272 | 2 |  |  |  |  | 17 | $self->{_unsub_pending_by_message_id}->{$mid} = $topic; | 
| 273 | 2 |  |  |  |  | 9 | $self->{_unsub_pending}->{$topic} = { cv => [ $cv ] }; | 
| 274 | 2 |  |  |  |  | 14 | return $mid; | 
| 275 |  |  |  |  |  |  | } | 
| 276 |  |  |  |  |  |  |  | 
| 277 |  |  |  |  |  |  | sub _confirm_subscription { | 
| 278 | 13 |  |  | 13 |  | 93 | my ($self, $mid, $qos) = @_; | 
| 279 | 13 |  |  |  |  | 38 | my $topic = delete $self->{_sub_pending_by_message_id}->{$mid}; | 
| 280 | 13 | 100 |  |  |  | 38 | unless (defined $topic) { | 
| 281 | 1 |  |  |  |  | 151 | carp 'SubAck with no pending subscription for message id: ', $mid, "\n"; | 
| 282 | 1 |  |  |  |  | 33 | return; | 
| 283 |  |  |  |  |  |  | } | 
| 284 | 12 |  |  |  |  | 40 | my $rec = $self->{_sub}->{$topic} = delete $self->{_sub_pending}->{$topic}; | 
| 285 | 12 |  |  |  |  | 61 | $self->{_sub_topics}->add($topic); | 
| 286 | 12 |  |  |  |  | 462 | $rec->{qos} = $qos; | 
| 287 |  |  |  |  |  |  |  | 
| 288 | 12 |  |  |  |  | 24 | foreach my $cv (@{$rec->{cv}}) { | 
|  | 12 |  |  |  |  | 34 |  | 
| 289 | 15 |  |  |  |  | 65 | $cv->send($qos); | 
| 290 |  |  |  |  |  |  | } | 
| 291 |  |  |  |  |  |  |  | 
| 292 |  |  |  |  |  |  | # publish any matching queued QoS messages | 
| 293 | 12 | 0 | 33 |  |  | 135 | if (!$self->{clean_session} && $qos && $self->{_qos_msg_cache}) { | 
|  |  |  | 33 |  |  |  |  | 
| 294 | 0 |  |  |  |  | 0 | my $cache = $self->{_qos_msg_cache}; | 
| 295 | 0 |  |  |  |  | 0 | my $ts = Net::MQTT::TopicStore->new($topic); | 
| 296 | 0 |  |  |  |  | 0 | for my $i (grep { $ts->values($cache->[$_]->topic) } reverse(0..$#$cache)) { | 
|  | 0 |  |  |  |  | 0 |  | 
| 297 | 0 |  |  |  |  | 0 | my $msg = delete $cache->[$i]; | 
| 298 | 0 |  |  |  |  | 0 | print STDERR "Processing cached message for topic '", $msg->topic, "' with subscription to topic '$topic'\n" if DEBUG; | 
| 299 | 0 |  |  |  |  | 0 | $self->_process_publish($self->{handle}, $msg); | 
| 300 |  |  |  |  |  |  | } | 
| 301 | 0 | 0 |  |  |  | 0 | delete $self->{_qos_msg_cache} unless @$cache; | 
| 302 |  |  |  |  |  |  | } | 
| 303 | 12 |  |  |  |  | 30 | delete $rec->{cv}; | 
| 304 |  |  |  |  |  |  | } | 
| 305 |  |  |  |  |  |  |  | 
| 306 |  |  |  |  |  |  | sub _confirm_unsubscribe { | 
| 307 | 3 |  |  | 3 |  | 18 | my ($self, $mid) = @_; | 
| 308 | 3 |  |  |  |  | 8 | my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid}; | 
| 309 | 3 | 100 |  |  |  | 8 | unless (defined $topic) { | 
| 310 | 1 |  |  |  |  | 159 | carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n"; | 
| 311 | 1 |  |  |  |  | 33 | return; | 
| 312 |  |  |  |  |  |  | } | 
| 313 | 2 |  |  |  |  | 6 | my $rec = delete $self->{_unsub_pending}->{$topic}; | 
| 314 | 2 |  |  |  |  | 5 | foreach my $cv (@{$rec->{cv}}) { | 
|  | 2 |  |  |  |  | 7 |  | 
| 315 | 3 |  |  |  |  | 16 | $cv->send(1); | 
| 316 |  |  |  |  |  |  | } | 
| 317 |  |  |  |  |  |  | } | 
| 318 |  |  |  |  |  |  |  | 
| 319 |  |  |  |  |  |  | sub _send { | 
| 320 | 56 |  |  | 56 |  | 210187 | my $self = shift; | 
| 321 | 56 |  |  |  |  | 209 | my %p = @_; | 
| 322 | 56 |  |  |  |  | 139 | my $cv = delete $p{cv}; | 
| 323 | 56 |  |  |  |  | 333 | my $msg = Net::MQTT::Message->new(%p); | 
| 324 |  |  |  |  |  |  | $self->{connected} ? | 
| 325 | 56 | 100 |  |  |  | 1057 | $self->_queue_write($msg, $cv) : $self->connect($msg, $cv); | 
| 326 |  |  |  |  |  |  | } | 
| 327 |  |  |  |  |  |  |  | 
| 328 |  |  |  |  |  |  | sub _queue_write { | 
| 329 | 56 |  |  | 56 |  | 128 | my ($self, $msg, $cv) = @_; | 
| 330 | 56 |  |  |  |  | 236 | my $queue = $self->{write_queue}; | 
| 331 | 56 |  |  |  |  | 91 | print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG; | 
| 332 | 56 |  |  |  |  | 101 | push @{$queue}, [$msg, $cv]; | 
|  | 56 |  |  |  |  | 152 |  | 
| 333 | 56 | 100 |  |  |  | 240 | $self->_write_now unless (defined $self->{_waiting}); | 
| 334 | 56 |  |  |  |  | 277 | $cv; | 
| 335 |  |  |  |  |  |  | } | 
| 336 |  |  |  |  |  |  |  | 
| 337 |  |  |  |  |  |  | sub _write_now { | 
| 338 | 114 |  |  | 114 |  | 192 | my $self = shift; | 
| 339 | 114 |  |  |  |  | 175 | my ($msg, $cv); | 
| 340 | 114 |  |  |  |  | 351 | undef $self->{_waiting}; | 
| 341 | 114 | 100 |  |  |  | 266 | if (@_) { | 
| 342 | 16 |  |  |  |  | 42 | ($msg, $cv) = @_; | 
| 343 |  |  |  |  |  |  | } else { | 
| 344 | 98 | 100 |  |  |  | 132 | my $args = shift @{$self->{write_queue}} or return; | 
|  | 98 |  |  |  |  | 356 |  | 
| 345 | 50 |  |  |  |  | 136 | ($msg, $cv) = @$args; | 
| 346 |  |  |  |  |  |  | } | 
| 347 | 66 |  |  |  |  | 220 | $self->_reset_keep_alive_timer(); | 
| 348 | 66 |  |  |  |  | 1844 | print STDERR "Sending: ", $msg->string, "\n" if DEBUG; | 
| 349 | 66 | 100 |  |  |  | 180 | $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback}); | 
| 350 | 66 |  |  |  |  | 710 | $self->{_waiting} = [$msg, $cv]; | 
| 351 | 66 |  |  |  |  | 89 | print '  ', (unpack 'H*', $msg->bytes), "\n" if DEBUG; | 
| 352 | 66 |  |  |  |  | 465 | $self->{handle}->push_write($msg->bytes); | 
| 353 | 66 |  |  |  |  | 6381 | $cv; | 
| 354 |  |  |  |  |  |  | } | 
| 355 |  |  |  |  |  |  |  | 
| 356 |  |  |  |  |  |  | sub _reset_keep_alive_timer { | 
| 357 | 71 |  |  | 71 |  | 689 | my ($self, $wait) = @_; | 
| 358 | 71 |  |  |  |  | 404 | undef $self->{_keep_alive_handle}; | 
| 359 | 71 | 100 |  |  |  | 205 | my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive'; | 
| 360 | 71 |  |  |  |  | 118 | $self->{_keep_alive_waiting} = $wait; | 
| 361 | 71 |  |  |  |  | 111 | my $weak_self = $self; | 
| 362 | 71 |  |  |  |  | 210 | weaken $weak_self; | 
| 363 |  |  |  |  |  |  | $self->{_keep_alive_handle} = | 
| 364 |  |  |  |  |  |  | AnyEvent->timer(after => $self->{keep_alive_timer}, | 
| 365 |  |  |  |  |  |  | cb => subname((substr $method, 1).'_cb' => | 
| 366 | 71 |  |  | 4 |  | 948 | sub { $weak_self->$method(@_) })); | 
|  | 4 |  |  |  |  | 701041 |  | 
| 367 |  |  |  |  |  |  | } | 
| 368 |  |  |  |  |  |  |  | 
| 369 |  |  |  |  |  |  | sub _send_keep_alive { | 
| 370 | 3 |  |  | 3 |  | 8 | my $self = shift; | 
| 371 | 3 |  |  |  |  | 7 | print STDERR "Sending: keep alive\n" if DEBUG; | 
| 372 | 3 |  |  |  |  | 17 | $self->_send(message_type => MQTT_PINGREQ); | 
| 373 | 3 |  |  |  |  | 11 | $self->_reset_keep_alive_timer(1); | 
| 374 |  |  |  |  |  |  | } | 
| 375 |  |  |  |  |  |  |  | 
| 376 |  |  |  |  |  |  | sub _keep_alive_timeout { | 
| 377 | 1 |  |  | 1 |  | 5 | my $self = shift; | 
| 378 | 1 |  |  |  |  | 3 | print STDERR "keep alive timeout\n" if DEBUG; | 
| 379 | 1 |  |  |  |  | 4 | undef $self->{_keep_alive_waiting}; | 
| 380 | 1 |  |  |  |  | 10 | $self->{handle}->destroy; | 
| 381 | 1 |  |  |  |  | 184 | $self->_error(0, 'keep alive timeout', 1); | 
| 382 |  |  |  |  |  |  | } | 
| 383 |  |  |  |  |  |  |  | 
| 384 |  |  |  |  |  |  | sub _keep_alive_received { | 
| 385 | 3 |  |  | 3 |  | 6 | my $self = shift; | 
| 386 | 3 |  |  |  |  | 5 | print STDERR "keep alive received\n" if DEBUG; | 
| 387 | 3 | 100 |  |  |  | 11 | return unless (defined $self->{_keep_alive_waiting}); | 
| 388 | 1 |  |  |  |  | 4 | $self->_reset_keep_alive_timer(); | 
| 389 |  |  |  |  |  |  | } | 
| 390 |  |  |  |  |  |  |  | 
| 391 |  |  |  |  |  |  |  | 
| 392 |  |  |  |  |  |  | sub connect { | 
| 393 | 30 |  |  | 30 | 1 | 1826 | my ($self, $msg, $cv) = @_; | 
| 394 | 30 |  |  |  |  | 47 | print STDERR "connect\n" if DEBUG; | 
| 395 | 30 |  |  |  |  | 58 | $self->{_waiting} = 'connect'; | 
| 396 | 30 | 100 |  |  |  | 212 | if ($msg) { | 
| 397 | 21 | 100 |  |  |  | 269 | $cv = AnyEvent->condvar unless ($cv); | 
| 398 | 21 |  |  |  |  | 106 | $self->_queue_write($msg, $cv); | 
| 399 |  |  |  |  |  |  | } else { | 
| 400 | 9 | 100 |  |  |  | 287 | $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv}); | 
| 401 | 9 |  |  |  |  | 68 | $cv = $self->{connect_cv}; | 
| 402 |  |  |  |  |  |  | } | 
| 403 | 30 | 100 |  |  |  | 115 | return $cv if ($self->{handle}); | 
| 404 |  |  |  |  |  |  |  | 
| 405 | 18 |  |  |  |  | 46 | my $weak_self = $self; | 
| 406 | 18 |  |  |  |  | 64 | weaken $weak_self; | 
| 407 |  |  |  |  |  |  |  | 
| 408 | 18 |  |  |  |  | 31 | my $hd; | 
| 409 |  |  |  |  |  |  | $hd = $self->{handle} = | 
| 410 |  |  |  |  |  |  | AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}], | 
| 411 |  |  |  |  |  |  | ($self->{tls} ? (tls => "connect") : ()), | 
| 412 |  |  |  |  |  |  | on_error => subname('on_error_cb' => sub { | 
| 413 | 1 |  |  | 1 |  | 23 | my ($handle, $fatal, $message) = @_; | 
| 414 | 1 |  |  |  |  | 2 | print STDERR "handle error $_[1]\n" if DEBUG; | 
| 415 | 1 |  |  |  |  | 5 | $handle->destroy; | 
| 416 | 1 |  |  |  |  | 79 | $weak_self->_error($fatal, 'Error: '.$message, 0); | 
| 417 |  |  |  |  |  |  | }), | 
| 418 |  |  |  |  |  |  | on_eof => subname('on_eof_cb' => sub { | 
| 419 | 1 |  |  | 1 |  | 679 | my ($handle) = @_; | 
| 420 | 1 |  |  |  |  | 2 | print STDERR "handle eof\n" if DEBUG; | 
| 421 | 1 |  |  |  |  | 6 | $handle->destroy; | 
| 422 | 1 |  |  |  |  | 57 | $weak_self->_error(1, 'EOF', 1); | 
| 423 |  |  |  |  |  |  | }), | 
| 424 |  |  |  |  |  |  | on_timeout => subname('on_timeout_cb' => sub { | 
| 425 | 1 |  |  | 1 |  | 400572 | $weak_self->_error(0, $weak_self->{wait}.' timeout', 1); | 
| 426 | 1 |  |  |  |  | 3 | $weak_self->{wait} = 'nothing'; | 
| 427 |  |  |  |  |  |  | }), | 
| 428 |  |  |  |  |  |  | on_connect => subname('on_connect_cb' => sub { | 
| 429 | 16 |  |  | 16 |  | 6018 | my ($handle, $host, $port, $retry) = @_; | 
| 430 | 16 |  |  |  |  | 29 | print STDERR "TCP handshake complete\n" if DEBUG; | 
| 431 |  |  |  |  |  |  | # call user-defined on_connect function. | 
| 432 | 16 | 50 |  |  |  | 63 | $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect}; | 
| 433 |  |  |  |  |  |  | my $msg = | 
| 434 |  |  |  |  |  |  | Net::MQTT::Message->new( | 
| 435 |  |  |  |  |  |  | message_type => MQTT_CONNECT, | 
| 436 |  |  |  |  |  |  | keep_alive_timer => $weak_self->{keep_alive_timer}, | 
| 437 |  |  |  |  |  |  | client_id => $weak_self->{client_id}, | 
| 438 |  |  |  |  |  |  | clean_session => $weak_self->{clean_session}, | 
| 439 |  |  |  |  |  |  | will_topic => $weak_self->{will_topic}, | 
| 440 |  |  |  |  |  |  | will_qos => $weak_self->{will_qos}, | 
| 441 |  |  |  |  |  |  | will_retain => $weak_self->{will_retain}, | 
| 442 |  |  |  |  |  |  | will_message => $weak_self->{will_message}, | 
| 443 |  |  |  |  |  |  | user_name => $weak_self->{user_name}, | 
| 444 |  |  |  |  |  |  | password => $weak_self->{password}, | 
| 445 | 16 |  |  |  |  | 163 | ); | 
| 446 | 16 |  |  |  |  | 415 | $weak_self->_write_now($msg); | 
| 447 | 16 |  |  |  |  | 81 | $handle->timeout($weak_self->{timeout}); | 
| 448 | 16 |  |  |  |  | 803 | $weak_self->{wait} = 'connack'; | 
| 449 |  |  |  |  |  |  | $handle->on_read(subname 'on_read_cb' => sub { | 
| 450 | 14 |  |  |  |  | 23755 | my ($hdl) = @_; | 
| 451 |  |  |  |  |  |  | $hdl->push_read(ref $weak_self => | 
| 452 |  |  |  |  |  |  | subname 'reader_cb' => sub { | 
| 453 | 58 |  |  |  |  | 191 | $weak_self->_handle_message(@_); | 
| 454 | 58 |  |  |  |  | 339 | 1; | 
| 455 | 14 |  |  |  |  | 178 | }); | 
| 456 | 16 |  |  |  |  | 206 | }); | 
| 457 |  |  |  |  |  |  | }), | 
| 458 | 18 | 50 |  |  |  | 552 | @{$self->{handle_args}}, | 
|  | 18 |  |  |  |  | 202 |  | 
| 459 |  |  |  |  |  |  | ); | 
| 460 | 18 |  |  |  |  | 10590 | return $cv | 
| 461 |  |  |  |  |  |  | } | 
| 462 |  |  |  |  |  |  |  | 
| 463 |  |  |  |  |  |  | sub _reconnect { | 
| 464 | 3 |  |  | 3 |  | 6 | my $self = shift; | 
| 465 | 3 |  |  |  |  | 5 | print STDERR "reconnecting:\n" if DEBUG; | 
| 466 |  |  |  |  |  |  |  | 
| 467 |  |  |  |  |  |  | # must resubscribe everything | 
| 468 | 3 | 50 |  |  |  | 11 | if ($self->{clean_session}) { | 
| 469 | 3 |  |  |  |  | 27 | $self->{_sub_topics} = Net::MQTT::TopicStore->new(); | 
| 470 | 3 |  | 50 |  |  | 68 | $self->{_sub_reconnect} = delete $self->{_sub} || {}; | 
| 471 |  |  |  |  |  |  | } | 
| 472 |  |  |  |  |  |  |  | 
| 473 | 3 |  |  |  |  | 12 | $self->connect(@_); | 
| 474 |  |  |  |  |  |  | } | 
| 475 |  |  |  |  |  |  |  | 
| 476 |  |  |  |  |  |  | sub _handle_message { | 
| 477 | 58 |  |  | 58 |  | 92 | my $self = shift; | 
| 478 | 58 |  |  |  |  | 130 | my ($handle, $msg, $error) = @_; | 
| 479 | 58 | 50 |  |  |  | 124 | return $self->_error(0, $error, 1) if ($error); | 
| 480 | 58 | 100 |  |  |  | 143 | $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback}); | 
| 481 | 58 | 50 |  |  |  | 216 | $self->_call_callback('before_msg_callback' => $msg) or return; | 
| 482 | 58 |  |  |  |  | 172 | my $msg_type = lc ref $msg; | 
| 483 | 58 |  |  |  |  | 332 | $msg_type =~ s/^.*:://; | 
| 484 | 58 | 50 |  |  |  | 175 | $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return; | 
| 485 | 58 |  |  |  |  | 139 | my $method = '_process_'.$msg_type; | 
| 486 | 58 | 100 |  |  |  | 324 | unless ($self->can($method)) { | 
| 487 | 1 |  |  |  |  | 12 | carp 'Unsupported message ', $msg->string(), "\n"; | 
| 488 | 1 |  |  |  |  | 288 | return; | 
| 489 |  |  |  |  |  |  | } | 
| 490 | 57 |  |  |  |  | 194 | my $res = $self->$method(@_); | 
| 491 | 57 |  |  |  |  | 221 | $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res); | 
| 492 | 57 |  |  |  |  | 97 | $res; | 
| 493 |  |  |  |  |  |  | } | 
| 494 |  |  |  |  |  |  |  | 
| 495 |  |  |  |  |  |  | sub _call_callback { | 
| 496 | 173 |  |  | 173 |  | 224 | my $self = shift; | 
| 497 | 173 |  |  |  |  | 207 | my $cb_name = shift; | 
| 498 | 173 | 50 |  |  |  | 520 | return 1 unless (exists $self->{$cb_name}); | 
| 499 | 0 |  |  |  |  | 0 | $self->{$cb_name}->(@_); | 
| 500 |  |  |  |  |  |  | } | 
| 501 |  |  |  |  |  |  |  | 
| 502 |  |  |  |  |  |  | sub _process_connack { | 
| 503 | 13 |  |  | 13 |  | 45 | my ($self, $handle, $msg, $error) = @_; | 
| 504 | 13 |  |  |  |  | 67 | $handle->timeout(undef); | 
| 505 | 13 | 100 |  |  |  | 283 | unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) { | 
| 506 | 1 |  |  |  |  | 10 | return $self->_error(1, 'Connection refused: '.$msg->string, 0); | 
| 507 |  |  |  |  |  |  | } | 
| 508 | 12 |  |  |  |  | 80 | print STDERR "Connection ready:\n", $msg->string('  '), "\n" if DEBUG; | 
| 509 | 12 |  |  |  |  | 49 | $self->_write_now(); | 
| 510 | 12 |  |  |  |  | 34 | $self->{connected} = 1; | 
| 511 | 12 | 100 |  |  |  | 61 | $self->{connect_cv}->send(1) if ($self->{connect_cv}); | 
| 512 | 12 |  |  |  |  | 53 | delete $self->{connect_cv}; | 
| 513 |  |  |  |  |  |  |  | 
| 514 | 12 |  |  |  |  | 21 | my $weak_self = $self; | 
| 515 | 12 |  |  |  |  | 52 | weaken $weak_self; | 
| 516 |  |  |  |  |  |  |  | 
| 517 |  |  |  |  |  |  | $handle->on_drain(subname 'on_drain_cb' => sub { | 
| 518 | 51 |  |  | 51 |  | 6090 | print STDERR "drained\n" if DEBUG; | 
| 519 | 51 |  |  |  |  | 107 | my $w = $weak_self->{_waiting}; | 
| 520 | 51 | 100 | 100 |  |  | 419 | $w->[1]->send(1) if (ref $w && defined $w->[1]); | 
| 521 | 51 |  |  |  |  | 1752 | $weak_self->_write_now; | 
| 522 | 51 |  |  |  |  | 249 | 1; | 
| 523 | 12 |  |  |  |  | 148 | }); | 
| 524 |  |  |  |  |  |  |  | 
| 525 |  |  |  |  |  |  | # handle reconnect | 
| 526 | 12 |  |  |  |  | 31 | while (my ($topic, $rec) = each %{$self->{_sub_reconnect}}) { | 
|  | 12 |  |  |  |  | 137 |  | 
| 527 | 0 |  |  |  |  | 0 | print STDERR "Resubscribing to '$topic':\n" if DEBUG; | 
| 528 | 0 |  |  |  |  | 0 | for my $cb (values %{$rec->{cb}}) { | 
|  | 0 |  |  |  |  | 0 |  | 
| 529 | 0 |  |  |  |  | 0 | $self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos}); | 
| 530 |  |  |  |  |  |  | } | 
| 531 |  |  |  |  |  |  | } | 
| 532 | 12 |  |  |  |  | 72 | delete $self->{_sub_reconnect}; | 
| 533 |  |  |  |  |  |  | return | 
| 534 | 12 |  |  |  |  | 100 | } | 
| 535 |  |  |  |  |  |  |  | 
| 536 |  |  |  |  |  |  | sub _process_pingresp { | 
| 537 | 3 |  |  | 3 |  | 11 | shift->_keep_alive_received(); | 
| 538 |  |  |  |  |  |  | } | 
| 539 |  |  |  |  |  |  |  | 
| 540 |  |  |  |  |  |  | sub _process_suback { | 
| 541 | 13 |  |  | 13 |  | 36 | my ($self, $handle, $msg, $error) = @_; | 
| 542 | 13 |  |  |  |  | 19 | print STDERR "Confirmed subscription:\n", $msg->string('  '), "\n" if DEBUG; | 
| 543 | 13 |  |  |  |  | 46 | $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]); | 
| 544 |  |  |  |  |  |  | return | 
| 545 | 13 |  |  |  |  | 31 | } | 
| 546 |  |  |  |  |  |  |  | 
| 547 |  |  |  |  |  |  | sub _process_unsuback { | 
| 548 | 3 |  |  | 3 |  | 9 | my ($self, $handle, $msg, $error) = @_; | 
| 549 | 3 |  |  |  |  | 4 | print STDERR "Confirmed unsubscribe:\n", $msg->string('  '), "\n" if DEBUG; | 
| 550 | 3 |  |  |  |  | 17 | $self->_confirm_unsubscribe($msg->message_id); | 
| 551 |  |  |  |  |  |  | return | 
| 552 | 3 |  |  |  |  | 21 | } | 
| 553 |  |  |  |  |  |  |  | 
| 554 |  |  |  |  |  |  | sub _publish_locally { | 
| 555 | 16 |  |  | 16 |  | 37 | my ($self, $msg) = @_; | 
| 556 | 16 |  |  |  |  | 54 | my $msg_topic = $msg->topic; | 
| 557 | 16 |  |  |  |  | 94 | my $msg_data = $msg->message; | 
| 558 | 16 |  |  |  |  | 100 | my $matches = $self->{_sub_topics}->values($msg_topic); | 
| 559 | 16 | 100 |  |  |  | 379 | unless (scalar @$matches) { | 
| 560 | 1 |  |  |  |  | 7 | carp "Unexpected publish:\n", $msg->string('  '), "\n"; | 
| 561 | 1 |  |  |  |  | 322 | return; | 
| 562 |  |  |  |  |  |  | } | 
| 563 | 15 |  |  |  |  | 26 | my %matched; | 
| 564 | 15 |  |  |  |  | 60 | my $msg_retain = $msg->retain; | 
| 565 | 15 |  |  |  |  | 64 | foreach my $topic (@$matches) { | 
| 566 | 19 |  |  |  |  | 85 | my $rec = $self->{_sub}->{$topic}; | 
| 567 | 19 | 50 |  |  |  | 44 | if ($msg_retain) { | 
| 568 | 0 | 0 |  |  |  | 0 | if ($msg_data eq '') { | 
| 569 | 0 |  |  |  |  | 0 | delete $rec->{retained}->{$msg_topic}; | 
| 570 | 0 |  |  |  |  | 0 | print STDERR "  retained cleared\n" if DEBUG; | 
| 571 |  |  |  |  |  |  | } else { | 
| 572 | 0 |  |  |  |  | 0 | $rec->{retained}->{$msg_topic} = $msg; | 
| 573 | 0 |  |  |  |  | 0 | print STDERR "  retained '", $msg_data, "'\n" if DEBUG; | 
| 574 |  |  |  |  |  |  | } | 
| 575 |  |  |  |  |  |  | } | 
| 576 | 19 |  |  |  |  | 26 | foreach my $cb (values %{$rec->{cb}}) { | 
|  | 19 |  |  |  |  | 61 |  | 
| 577 | 27 | 100 |  |  |  | 222 | next if ($matched{$cb}++); | 
| 578 | 25 |  |  |  |  | 85 | $cb->($msg_topic, $msg_data, $msg); | 
| 579 |  |  |  |  |  |  | } | 
| 580 |  |  |  |  |  |  | } | 
| 581 | 15 |  |  |  |  | 227 | 1; | 
| 582 |  |  |  |  |  |  | } | 
| 583 |  |  |  |  |  |  |  | 
| 584 |  |  |  |  |  |  | sub _process_publish { | 
| 585 | 16 |  |  | 16 |  | 45 | my ($self, $handle, $msg, $error) = @_; | 
| 586 | 16 |  |  |  |  | 43 | my $qos = $msg->qos; | 
| 587 |  |  |  |  |  |  |  | 
| 588 |  |  |  |  |  |  | # assuming this was intended for a subscription not yet restored | 
| 589 | 16 | 50 | 66 |  |  | 119 | if ($qos && !$self->{clean_session} && !@{$self->{_sub_topics}->values($msg->topic)}) { | 
|  | 0 |  | 33 |  |  | 0 |  | 
| 590 | 0 |  |  |  |  | 0 | print STDERR "Caching message for '", $msg->topic, "'\n" if DEBUG; | 
| 591 | 0 |  |  |  |  | 0 | push(@{$self->{_qos_msg_cache}}, $msg); | 
|  | 0 |  |  |  |  | 0 |  | 
| 592 | 0 |  |  |  |  | 0 | return; | 
| 593 |  |  |  |  |  |  | } | 
| 594 |  |  |  |  |  |  |  | 
| 595 | 16 | 100 |  |  |  | 47 | if ($qos == MQTT_QOS_EXACTLY_ONCE) { | 
| 596 | 1 |  |  |  |  | 20 | my $mid = $msg->message_id; | 
| 597 | 1 |  |  |  |  | 7 | $self->{messages}->{$mid} = $msg; | 
| 598 | 1 |  |  |  |  | 5 | $self->_send(message_type => MQTT_PUBREC, message_id => $mid); | 
| 599 | 1 |  |  |  |  | 3 | return; | 
| 600 |  |  |  |  |  |  | } | 
| 601 | 15 |  |  |  |  | 49 | $self->_publish_locally($msg); | 
| 602 | 15 | 100 |  |  |  | 40 | $self->_send(message_type => MQTT_PUBACK, message_id => $msg->message_id) | 
| 603 |  |  |  |  |  |  | if ($qos == MQTT_QOS_AT_LEAST_ONCE); | 
| 604 |  |  |  |  |  |  | return | 
| 605 | 15 |  |  |  |  | 63 | } | 
| 606 |  |  |  |  |  |  |  | 
| 607 |  |  |  |  |  |  | sub _inflight_record { | 
| 608 | 7 |  |  | 7 |  | 16 | my ($self, $msg) = @_; | 
| 609 | 7 |  |  |  |  | 39 | my $mid = $msg->message_id; | 
| 610 | 7 | 100 |  |  |  | 39 | unless (exists $self->{inflight}->{$mid}) { | 
| 611 | 2 |  |  |  |  | 11 | carp "Unexpected message for message id $mid\n  ".$msg->string; | 
| 612 | 2 |  |  |  |  | 430 | return; | 
| 613 |  |  |  |  |  |  | } | 
| 614 | 5 |  |  |  |  | 11 | my $exp_type = $self->{inflight}->{$mid}->{expect}; | 
| 615 | 5 |  |  |  |  | 22 | my $got_type = $msg->message_type; | 
| 616 | 5 | 100 |  |  |  | 21 | unless ($got_type == $exp_type) { | 
| 617 | 1 |  |  |  |  | 5 | carp 'Received ', message_type_string($got_type), ' but expected ', | 
| 618 |  |  |  |  |  |  | message_type_string($exp_type), " for message id $mid\n"; | 
| 619 | 1 |  |  |  |  | 224 | return; | 
| 620 |  |  |  |  |  |  | } | 
| 621 | 4 |  |  |  |  | 15 | return delete $self->{inflight}->{$mid}; | 
| 622 |  |  |  |  |  |  | } | 
| 623 |  |  |  |  |  |  |  | 
| 624 |  |  |  |  |  |  | sub _process_puback { | 
| 625 | 3 |  |  | 3 |  | 9 | my ($self, $handle, $msg, $error) = @_; | 
| 626 | 3 | 100 |  |  |  | 8 | my $rec = $self->_inflight_record($msg) or return; | 
| 627 | 2 |  |  |  |  | 5 | my $mid = $msg->message_id; | 
| 628 | 2 |  |  |  |  | 6 | print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG; | 
| 629 | 2 |  |  |  |  | 7 | $rec->{cv}->send(1); | 
| 630 | 2 |  |  |  |  | 36 | return 1; | 
| 631 |  |  |  |  |  |  | } | 
| 632 |  |  |  |  |  |  |  | 
| 633 |  |  |  |  |  |  | sub _process_pubrec { | 
| 634 | 2 |  |  | 2 |  | 6 | my ($self, $handle, $msg, $error) = @_; | 
| 635 | 2 | 100 |  |  |  | 9 | my $rec = $self->_inflight_record($msg) or return; | 
| 636 | 1 |  |  |  |  | 3 | my $mid = $msg->message_id; | 
| 637 | 1 |  |  |  |  | 4 | print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG; | 
| 638 |  |  |  |  |  |  | $self->_send_with_ack({ | 
| 639 |  |  |  |  |  |  | message_type => MQTT_PUBREL, | 
| 640 |  |  |  |  |  |  | qos => MQTT_QOS_AT_LEAST_ONCE, | 
| 641 |  |  |  |  |  |  | message_id => $mid, | 
| 642 | 1 |  |  |  |  | 5 | }, $rec->{cv}, MQTT_PUBCOMP); | 
| 643 |  |  |  |  |  |  | } | 
| 644 |  |  |  |  |  |  |  | 
| 645 |  |  |  |  |  |  | sub _process_pubrel { | 
| 646 | 2 |  |  | 2 |  | 8 | my ($self, $handle, $msg, $error) = @_; | 
| 647 | 2 |  |  |  |  | 12 | my $mid = $msg->message_id; | 
| 648 | 2 |  |  |  |  | 7 | print STDERR 'PubRel: ', $mid, "\n" if DEBUG; | 
| 649 | 2 |  |  |  |  | 8 | my $pubmsg = delete $self->{messages}->{$mid}; | 
| 650 | 2 | 100 |  |  |  | 12 | unless ($pubmsg) { | 
| 651 | 1 |  |  |  |  | 6 | carp "Unexpected message for message id $mid\n  ".$msg->string; | 
| 652 | 1 |  |  |  |  | 239 | return; | 
| 653 |  |  |  |  |  |  | } | 
| 654 | 1 |  |  |  |  | 4 | $self->_publish_locally($pubmsg); | 
| 655 | 1 |  |  |  |  | 4 | $self->_send(message_type => MQTT_PUBCOMP, message_id => $mid); | 
| 656 |  |  |  |  |  |  | } | 
| 657 |  |  |  |  |  |  |  | 
| 658 |  |  |  |  |  |  | sub _process_pubcomp { | 
| 659 | 2 |  |  | 2 |  | 7 | my ($self, $handle, $msg, $error) = @_; | 
| 660 | 2 | 100 |  |  |  | 9 | my $rec = $self->_inflight_record($msg) or return; | 
| 661 | 1 |  |  |  |  | 3 | my $mid = $msg->message_id; | 
| 662 | 1 |  |  |  |  | 3 | print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG; | 
| 663 | 1 |  |  |  |  | 3 | $rec->{cv}->send(1); | 
| 664 | 1 |  |  |  |  | 19 | return 1; | 
| 665 |  |  |  |  |  |  | } | 
| 666 |  |  |  |  |  |  |  | 
| 667 |  |  |  |  |  |  |  | 
| 668 |  |  |  |  |  |  | sub anyevent_read_type { | 
| 669 | 14 |  |  | 14 | 1 | 274 | my ($handle, $cb) = @_; | 
| 670 |  |  |  |  |  |  | subname 'anyevent_read_type_reader' => sub { | 
| 671 | 41 |  |  | 41 |  | 75287 | my ($handle) = @_; | 
| 672 | 41 |  |  |  |  | 97 | my $rbuf = \$handle->{rbuf}; | 
| 673 | 41 |  |  |  |  | 166 | weaken $rbuf; | 
| 674 | 41 | 50 |  |  |  | 127 | return unless (defined $$rbuf); | 
| 675 | 41 |  |  |  |  | 65 | while (1) { | 
| 676 | 99 |  |  |  |  | 502 | my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1); | 
| 677 | 99 | 100 |  |  |  | 6367 | last unless ($msg); | 
| 678 | 58 |  |  |  |  | 159 | $cb->($handle, $msg); | 
| 679 |  |  |  |  |  |  | } | 
| 680 | 41 |  |  |  |  | 94 | return; | 
| 681 | 14 |  |  |  |  | 125 | }; | 
| 682 |  |  |  |  |  |  | } | 
| 683 |  |  |  |  |  |  |  | 
| 684 |  |  |  |  |  |  | 1; | 
| 685 |  |  |  |  |  |  |  | 
| 686 |  |  |  |  |  |  | __END__ |