File Coverage

blib/lib/AnyEvent/MQTT.pm
Criterion Covered Total %
statement 417 444 93.9
branch 130 150 86.6
condition 9 17 52.9
subroutine 58 58 100.0
pod 8 8 100.0
total 622 677 91.8


line stmt bran cond sub pod time code
1 15     15   202600 use strict;
  15         32  
  15         485  
2 15     15   74 use warnings;
  15         23  
  15         811  
3             package AnyEvent::MQTT;
4             $AnyEvent::MQTT::VERSION = '1.212810';
5             # ABSTRACT: AnyEvent module for an MQTT client
6              
7              
8 15     15   76 use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
  15         27  
  15         1067  
9 15     15   84 use AnyEvent;
  15         27  
  15         322  
10 15     15   68 use AnyEvent::Handle;
  15         25  
  15         323  
11 15     15   74 use Net::MQTT::Constants;
  15         36  
  15         125  
12 15     15   10360 use Net::MQTT::Message;
  15         484112  
  15         645  
13 15     15   6487 use Net::MQTT::TopicStore;
  15         7134  
  15         501  
14 15     15   91 use Carp qw/croak carp/;
  15         32  
  15         1012  
15 15     15   86 use Sub::Name;
  15         30  
  15         607  
16 15     15   80 use Scalar::Util qw/weaken/;
  15         26  
  15         73561  
17              
18              
19             sub new {
20 15     15 1 9542 my ($pkg, %p) = @_;
21 15         152 my $self =
22             bless {
23             socket => undef,
24             host => '127.0.0.1',
25             port => '1883',
26             timeout => 30,
27             wait => 'nothing',
28             keep_alive_timer => 120,
29             qos => MQTT_QOS_AT_MOST_ONCE,
30             message_id => 1,
31             user_name => undef,
32             password => undef,
33             tls => undef,
34             will_topic => undef,
35             will_qos => MQTT_QOS_AT_MOST_ONCE,
36             will_retain => 0,
37             will_message => '',
38             client_id => undef,
39             clean_session => 1,
40             handle_args => [],
41             write_queue => [],
42             inflight => {},
43             _sub_topics => Net::MQTT::TopicStore->new(),
44             %p,
45             }, $pkg;
46             }
47              
48             sub DESTROY {
49 15     15   120243 $_[0]->cleanup;
50             }
51              
52              
53             sub cleanup {
54 20     20 1 78 my $self = shift;
55 20         36 print STDERR "cleanup\n" if DEBUG;
56 20 100       139 if ($self->{handle}) {
57 18         572 my $cv = AnyEvent->condvar;
58 18         141 my $handle = $self->{handle};
59 18         107 weaken $handle;
60 18     11   198 $cv->cb(sub { $handle->destroy });
  11         137  
61 18         206 $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
62             }
63 20         102 delete $self->{handle};
64 20         328 delete $self->{connected};
65 20         44 delete $self->{wait};
66 20         78 delete $self->{_keep_alive_handle};
67 20         40 delete $self->{_keep_alive_waiting};
68 20         706 $self->{write_queue} = [];
69             }
70              
71             sub _error {
72 5     5   114 my ($self, $fatal, $message, $reconnect) = @_;
73 5         25 $self->cleanup($message);
74 5 50       42 $self->{on_error}->($fatal, $message) if ($self->{on_error});
75 5 100       116 $self->_reconnect() if ($reconnect);
76             }
77              
78              
79             sub publish {
80 8     8 1 8485 my ($self, %p) = @_;
81             my $topic = exists $p{topic} ? $p{topic} :
82 8 100       103 croak ref $self, '->publish requires "topic" parameter';
83 7 100       26 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
84 7 100       137 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
85 7         35 my $expect;
86 7 100       23 if ($qos) {
87 3 100       10 $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
88             }
89 7         14 my $message = $p{message};
90 7 100       21 if (defined $message) {
91 4         6 print STDERR "publish: message[$message] => $topic\n" if DEBUG;
92 4         26 $self->_send_with_ack({
93             message_type => MQTT_PUBLISH,
94             %p,
95             }, $cv, $expect);
96 4         20 return $cv;
97             }
98             my $handle = exists $p{handle} ? $p{handle} :
99 3 100       75 croak ref $self, '->publish requires "message" or "handle" parameter';
100 2 100       23 unless ($handle->isa('AnyEvent::Handle')) {
101 1 50       17 my @args = @{$p{handle_args}||[]};
  1         6  
102 1         2 print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
103 1         7 $handle = AnyEvent::Handle->new(fh => $handle, @args);
104             }
105 2         112 my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
106             $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
107             sub {
108 2     2   345 my ($hdl, $fatal, $msg) = @_;
109 2 50       23 $error_sub->(@_) if ($error_sub);
110 2         129 $hdl->destroy;
111 2         6 undef $hdl;
112 2         6 $cv->send(1);
113 2         32 });
114 2         9 my $weak_self = $self;
115 2         7 weaken $weak_self;
116 2 100       3 my @push_read_args = @{$p{push_read_args}||['line']};
  2         13  
117 2         4 my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
118 2     2   899 my ($hdl, $chunk, @args) = @_;
119 2         4 print STDERR "publish: $chunk => $topic\n" if DEBUG;
120 2         51 my $send_cv = AnyEvent->condvar;
121 2         11 print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
122             $weak_self->_send_with_ack({
123             message_type => MQTT_PUBLISH,
124             qos => $qos,
125             retain => $p{retain},
126 2         13 topic => $topic,
127             message => $chunk,
128             }, $send_cv, $expect);
129             $send_cv->cb(subname 'publish_ack_'.$topic =>
130 2         32 sub { $handle->push_read(@push_read_args => $sub ) });
  2         19  
131 2         67 return;
132 2         15 };
133 2         9 $handle->push_read(@push_read_args => $sub);
134 2         244 return $cv;
135             }
136              
137              
138             sub next_message_id {
139 70018     70018 1 258016 my $self = shift;
140 70018         89040 my $res = $self->{message_id};
141 70018         83465 $self->{message_id}++;
142 70018 100       117857 $self->{message_id} = 1 if $self->{message_id} >= 65536;
143 70018         99796 $res;
144             }
145              
146             sub _send_with_ack {
147 8     8   24 my ($self, $args, $cv, $expect, $dup) = @_;
148 8 100       23 if ($args->{qos}) {
149 5 100       17 unless (exists $args->{message_id}) {
150 3         10 $args->{message_id} = $self->next_message_id();
151             }
152 5         12 my $mid = $args->{message_id};
153 5         118 my $send_cv = AnyEvent->condvar;
154             $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
155             $self->{inflight}->{$mid} =
156             {
157             expect => $expect,
158             message => $args,
159             cv => $cv,
160             timeout =>
161             AnyEvent->timer(after => $self->{keep_alive_timer},
162             cb => subname 'ack_timeout_for_'.$mid =>
163             sub {
164 1         29 print ref $self, " timeout waiting for ",
165             message_type_string($expect), "\n" if DEBUG;
166 1         5 delete $self->{inflight}->{$mid};
167 1         6 $self->_send_with_ack($args, $cv, $expect, 1);
168 5     5   99 }),
169             };
170 5         108 });
171 5         50 $args->{cv} = $send_cv;
172             } else {
173 3         6 $args->{cv} = $cv;
174             }
175 8 100       23 $args->{dup} = 1 if ($dup);
176 8         38 return $self->_send(%$args);
177             }
178              
179              
180             sub subscribe {
181 19     19 1 10172 my ($self, %p) = @_;
182             my $topic = exists $p{topic} ? $p{topic} :
183 19 100       239 croak ref $self, '->subscribe requires "topic" parameter';
184             my $sub = exists $p{callback} ? $p{callback} :
185 18 100       116 croak ref $self, '->subscribe requires "callback" parameter';
186 17 100       43 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
187 17 100       289 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
188 17         98 my $mid = $self->_add_subscription($topic, $cv, $sub);
189 17 100       43 if (defined $mid) { # not already subscribed/subscribing
190 12         39 $self->_send(message_type => MQTT_SUBSCRIBE,
191             message_id => $mid,
192             topics => [[$topic, $qos]]);
193             }
194             $cv
195 17         53 }
196              
197              
198             sub unsubscribe {
199 7     7 1 6416 my ($self, %p) = @_;
200             my $topic = exists $p{topic} ? $p{topic} :
201 7 100       96 croak ref $self, '->unsubscribe requires "topic" parameter';
202 6 100       110 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
203 6         39 my $mid = $self->_remove_subscription($topic, $cv, $p{callback});
204 6 100       16 if (defined $mid) { # not already subscribed/subscribing
205 2         10 $self->_send(message_type => MQTT_UNSUBSCRIBE,
206             message_id => $mid,
207             topics => [$topic]);
208             }
209             $cv
210 6         20 }
211              
212             sub _add_subscription {
213 17     17   39 my ($self, $topic, $cv, $sub) = @_;
214 17         34 my $rec = $self->{_sub}->{$topic};
215 17 100       49 if ($rec) {
216 2         3 print STDERR "Add $sub to existing $topic subscription\n" if DEBUG;
217 2         7 $rec->{cb}->{$sub} = $sub;
218 2         9 $cv->send($rec->{qos});
219 2         20 foreach my $msg (values %{$rec->{retained}}) {
  2         8  
220 0         0 $sub->($msg->topic, $msg->message, $msg);
221             }
222 2         6 return;
223             }
224 15         27 $rec = $self->{_sub_pending}->{$topic};
225 15 100       33 if ($rec) {
226 3         5 print STDERR "Add $sub to existing pending $topic subscription\n" if DEBUG;
227 3         17 $rec->{cb}->{$sub} = $sub;
228 3         6 push @{$rec->{cv}}, $cv;
  3         15  
229 3         11 return;
230             }
231 12         34 my $mid = $self->next_message_id();
232 12         13 print STDERR "Add $sub as pending $topic subscription (mid=$mid)\n" if DEBUG;
233 12         43 $self->{_sub_pending_by_message_id}->{$mid} = $topic;
234 12         62 $self->{_sub_pending}->{$topic} =
235             { cb => { $sub => $sub }, cv => [ $cv ], retained => {} };
236 12         34 $mid;
237             }
238              
239             sub _remove_subscription {
240 6     6   17 my ($self, $topic, $cv, $sub) = @_;
241 6         11 my $rec = $self->{_unsub_pending}->{$topic};
242 6 100       16 if ($rec) {
243 1         2 print STDERR "Remove of $topic with pending unsubscribe\n" if DEBUG;
244 1         2 push @{$rec->{cv}}, $cv;
  1         3  
245 1         3 return;
246             }
247 5         10 $rec = $self->{_sub}->{$topic};
248 5 100       15 unless ($rec) {
249 1         1 print STDERR "Remove of $topic with no subscription\n" if DEBUG;
250 1         5 $cv->send(0);
251 1         9 return;
252             }
253              
254 4 100       7 if (defined $sub) {
255 3 50       9 unless (exists $rec->{cb}->{$sub}) {
256 0         0 print STDERR "Remove of $topic for $sub with no subscription\n"
257             if DEBUG;
258 0         0 $cv->send(0);
259 0         0 return;
260             }
261 3         5 delete $rec->{cb}->{$sub};
262 3 100       5 if (keys %{$rec->{cb}}) {
  3         10  
263 2         3 print STDERR "Remove of $topic for $sub\n" if DEBUG;
264 2         5 $cv->send(1);
265 2         19 return;
266             }
267             }
268 2         4 print STDERR "Remove of $topic\n" if DEBUG;
269 2         9 my $mid = $self->next_message_id();
270 2         5 delete $self->{_sub}->{$topic};
271 2         11 $self->{_sub_topics}->delete($topic);
272 2         17 $self->{_unsub_pending_by_message_id}->{$mid} = $topic;
273 2         9 $self->{_unsub_pending}->{$topic} = { cv => [ $cv ] };
274 2         14 return $mid;
275             }
276              
277             sub _confirm_subscription {
278 13     13   93 my ($self, $mid, $qos) = @_;
279 13         38 my $topic = delete $self->{_sub_pending_by_message_id}->{$mid};
280 13 100       38 unless (defined $topic) {
281 1         151 carp 'SubAck with no pending subscription for message id: ', $mid, "\n";
282 1         33 return;
283             }
284 12         40 my $rec = $self->{_sub}->{$topic} = delete $self->{_sub_pending}->{$topic};
285 12         61 $self->{_sub_topics}->add($topic);
286 12         462 $rec->{qos} = $qos;
287              
288 12         24 foreach my $cv (@{$rec->{cv}}) {
  12         34  
289 15         65 $cv->send($qos);
290             }
291              
292             # publish any matching queued QoS messages
293 12 0 33     135 if (!$self->{clean_session} && $qos && $self->{_qos_msg_cache}) {
      33        
294 0         0 my $cache = $self->{_qos_msg_cache};
295 0         0 my $ts = Net::MQTT::TopicStore->new($topic);
296 0         0 for my $i (grep { $ts->values($cache->[$_]->topic) } reverse(0..$#$cache)) {
  0         0  
297 0         0 my $msg = delete $cache->[$i];
298 0         0 print STDERR "Processing cached message for topic '", $msg->topic, "' with subscription to topic '$topic'\n" if DEBUG;
299 0         0 $self->_process_publish($self->{handle}, $msg);
300             }
301 0 0       0 delete $self->{_qos_msg_cache} unless @$cache;
302             }
303 12         30 delete $rec->{cv};
304             }
305              
306             sub _confirm_unsubscribe {
307 3     3   18 my ($self, $mid) = @_;
308 3         8 my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
309 3 100       8 unless (defined $topic) {
310 1         159 carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
311 1         33 return;
312             }
313 2         6 my $rec = delete $self->{_unsub_pending}->{$topic};
314 2         5 foreach my $cv (@{$rec->{cv}}) {
  2         7  
315 3         16 $cv->send(1);
316             }
317             }
318              
319             sub _send {
320 56     56   210187 my $self = shift;
321 56         209 my %p = @_;
322 56         139 my $cv = delete $p{cv};
323 56         333 my $msg = Net::MQTT::Message->new(%p);
324             $self->{connected} ?
325 56 100       1057 $self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
326             }
327              
328             sub _queue_write {
329 56     56   128 my ($self, $msg, $cv) = @_;
330 56         236 my $queue = $self->{write_queue};
331 56         91 print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
332 56         101 push @{$queue}, [$msg, $cv];
  56         152  
333 56 100       240 $self->_write_now unless (defined $self->{_waiting});
334 56         277 $cv;
335             }
336              
337             sub _write_now {
338 114     114   192 my $self = shift;
339 114         175 my ($msg, $cv);
340 114         351 undef $self->{_waiting};
341 114 100       266 if (@_) {
342 16         42 ($msg, $cv) = @_;
343             } else {
344 98 100       132 my $args = shift @{$self->{write_queue}} or return;
  98         356  
345 50         136 ($msg, $cv) = @$args;
346             }
347 66         220 $self->_reset_keep_alive_timer();
348 66         1844 print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
349 66 100       180 $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
350 66         710 $self->{_waiting} = [$msg, $cv];
351 66         89 print ' ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
352 66         465 $self->{handle}->push_write($msg->bytes);
353 66         6381 $cv;
354             }
355              
356             sub _reset_keep_alive_timer {
357 71     71   689 my ($self, $wait) = @_;
358 71         404 undef $self->{_keep_alive_handle};
359 71 100       205 my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
360 71         118 $self->{_keep_alive_waiting} = $wait;
361 71         111 my $weak_self = $self;
362 71         210 weaken $weak_self;
363             $self->{_keep_alive_handle} =
364             AnyEvent->timer(after => $self->{keep_alive_timer},
365             cb => subname((substr $method, 1).'_cb' =>
366 71     4   948 sub { $weak_self->$method(@_) }));
  4         701041  
367             }
368              
369             sub _send_keep_alive {
370 3     3   8 my $self = shift;
371 3         7 print STDERR "Sending: keep alive\n" if DEBUG;
372 3         17 $self->_send(message_type => MQTT_PINGREQ);
373 3         11 $self->_reset_keep_alive_timer(1);
374             }
375              
376             sub _keep_alive_timeout {
377 1     1   5 my $self = shift;
378 1         3 print STDERR "keep alive timeout\n" if DEBUG;
379 1         4 undef $self->{_keep_alive_waiting};
380 1         10 $self->{handle}->destroy;
381 1         184 $self->_error(0, 'keep alive timeout', 1);
382             }
383              
384             sub _keep_alive_received {
385 3     3   6 my $self = shift;
386 3         5 print STDERR "keep alive received\n" if DEBUG;
387 3 100       11 return unless (defined $self->{_keep_alive_waiting});
388 1         4 $self->_reset_keep_alive_timer();
389             }
390              
391              
392             sub connect {
393 30     30 1 1826 my ($self, $msg, $cv) = @_;
394 30         47 print STDERR "connect\n" if DEBUG;
395 30         58 $self->{_waiting} = 'connect';
396 30 100       212 if ($msg) {
397 21 100       269 $cv = AnyEvent->condvar unless ($cv);
398 21         106 $self->_queue_write($msg, $cv);
399             } else {
400 9 100       287 $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
401 9         68 $cv = $self->{connect_cv};
402             }
403 30 100       115 return $cv if ($self->{handle});
404              
405 18         46 my $weak_self = $self;
406 18         64 weaken $weak_self;
407              
408 18         31 my $hd;
409             $hd = $self->{handle} =
410             AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
411             ($self->{tls} ? (tls => "connect") : ()),
412             on_error => subname('on_error_cb' => sub {
413 1     1   23 my ($handle, $fatal, $message) = @_;
414 1         2 print STDERR "handle error $_[1]\n" if DEBUG;
415 1         5 $handle->destroy;
416 1         79 $weak_self->_error($fatal, 'Error: '.$message, 0);
417             }),
418             on_eof => subname('on_eof_cb' => sub {
419 1     1   679 my ($handle) = @_;
420 1         2 print STDERR "handle eof\n" if DEBUG;
421 1         6 $handle->destroy;
422 1         57 $weak_self->_error(1, 'EOF', 1);
423             }),
424             on_timeout => subname('on_timeout_cb' => sub {
425 1     1   400572 $weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
426 1         3 $weak_self->{wait} = 'nothing';
427             }),
428             on_connect => subname('on_connect_cb' => sub {
429 16     16   6018 my ($handle, $host, $port, $retry) = @_;
430 16         29 print STDERR "TCP handshake complete\n" if DEBUG;
431             # call user-defined on_connect function.
432 16 50       63 $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
433             my $msg =
434             Net::MQTT::Message->new(
435             message_type => MQTT_CONNECT,
436             keep_alive_timer => $weak_self->{keep_alive_timer},
437             client_id => $weak_self->{client_id},
438             clean_session => $weak_self->{clean_session},
439             will_topic => $weak_self->{will_topic},
440             will_qos => $weak_self->{will_qos},
441             will_retain => $weak_self->{will_retain},
442             will_message => $weak_self->{will_message},
443             user_name => $weak_self->{user_name},
444             password => $weak_self->{password},
445 16         163 );
446 16         415 $weak_self->_write_now($msg);
447 16         81 $handle->timeout($weak_self->{timeout});
448 16         803 $weak_self->{wait} = 'connack';
449             $handle->on_read(subname 'on_read_cb' => sub {
450 14         23755 my ($hdl) = @_;
451             $hdl->push_read(ref $weak_self =>
452             subname 'reader_cb' => sub {
453 58         191 $weak_self->_handle_message(@_);
454 58         339 1;
455 14         178 });
456 16         206 });
457             }),
458 18 50       552 @{$self->{handle_args}},
  18         202  
459             );
460 18         10590 return $cv
461             }
462              
463             sub _reconnect {
464 3     3   6 my $self = shift;
465 3         5 print STDERR "reconnecting:\n" if DEBUG;
466              
467             # must resubscribe everything
468 3 50       11 if ($self->{clean_session}) {
469 3         27 $self->{_sub_topics} = Net::MQTT::TopicStore->new();
470 3   50     68 $self->{_sub_reconnect} = delete $self->{_sub} || {};
471             }
472              
473 3         12 $self->connect(@_);
474             }
475              
476             sub _handle_message {
477 58     58   92 my $self = shift;
478 58         130 my ($handle, $msg, $error) = @_;
479 58 50       124 return $self->_error(0, $error, 1) if ($error);
480 58 100       143 $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
481 58 50       216 $self->_call_callback('before_msg_callback' => $msg) or return;
482 58         172 my $msg_type = lc ref $msg;
483 58         332 $msg_type =~ s/^.*:://;
484 58 50       175 $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
485 58         139 my $method = '_process_'.$msg_type;
486 58 100       324 unless ($self->can($method)) {
487 1         12 carp 'Unsupported message ', $msg->string(), "\n";
488 1         288 return;
489             }
490 57         194 my $res = $self->$method(@_);
491 57         221 $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
492 57         97 $res;
493             }
494              
495             sub _call_callback {
496 173     173   224 my $self = shift;
497 173         207 my $cb_name = shift;
498 173 50       520 return 1 unless (exists $self->{$cb_name});
499 0         0 $self->{$cb_name}->(@_);
500             }
501              
502             sub _process_connack {
503 13     13   45 my ($self, $handle, $msg, $error) = @_;
504 13         67 $handle->timeout(undef);
505 13 100       283 unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
506 1         10 return $self->_error(1, 'Connection refused: '.$msg->string, 0);
507             }
508 12         80 print STDERR "Connection ready:\n", $msg->string(' '), "\n" if DEBUG;
509 12         49 $self->_write_now();
510 12         34 $self->{connected} = 1;
511 12 100       61 $self->{connect_cv}->send(1) if ($self->{connect_cv});
512 12         53 delete $self->{connect_cv};
513              
514 12         21 my $weak_self = $self;
515 12         52 weaken $weak_self;
516              
517             $handle->on_drain(subname 'on_drain_cb' => sub {
518 51     51   6090 print STDERR "drained\n" if DEBUG;
519 51         107 my $w = $weak_self->{_waiting};
520 51 100 100     419 $w->[1]->send(1) if (ref $w && defined $w->[1]);
521 51         1752 $weak_self->_write_now;
522 51         249 1;
523 12         148 });
524              
525             # handle reconnect
526 12         31 while (my ($topic, $rec) = each %{$self->{_sub_reconnect}}) {
  12         137  
527 0         0 print STDERR "Resubscribing to '$topic':\n" if DEBUG;
528 0         0 for my $cb (values %{$rec->{cb}}) {
  0         0  
529 0         0 $self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos});
530             }
531             }
532 12         72 delete $self->{_sub_reconnect};
533             return
534 12         100 }
535              
536             sub _process_pingresp {
537 3     3   11 shift->_keep_alive_received();
538             }
539              
540             sub _process_suback {
541 13     13   36 my ($self, $handle, $msg, $error) = @_;
542 13         19 print STDERR "Confirmed subscription:\n", $msg->string(' '), "\n" if DEBUG;
543 13         46 $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
544             return
545 13         31 }
546              
547             sub _process_unsuback {
548 3     3   9 my ($self, $handle, $msg, $error) = @_;
549 3         4 print STDERR "Confirmed unsubscribe:\n", $msg->string(' '), "\n" if DEBUG;
550 3         17 $self->_confirm_unsubscribe($msg->message_id);
551             return
552 3         21 }
553              
554             sub _publish_locally {
555 16     16   37 my ($self, $msg) = @_;
556 16         54 my $msg_topic = $msg->topic;
557 16         94 my $msg_data = $msg->message;
558 16         100 my $matches = $self->{_sub_topics}->values($msg_topic);
559 16 100       379 unless (scalar @$matches) {
560 1         7 carp "Unexpected publish:\n", $msg->string(' '), "\n";
561 1         322 return;
562             }
563 15         26 my %matched;
564 15         60 my $msg_retain = $msg->retain;
565 15         64 foreach my $topic (@$matches) {
566 19         85 my $rec = $self->{_sub}->{$topic};
567 19 50       44 if ($msg_retain) {
568 0 0       0 if ($msg_data eq '') {
569 0         0 delete $rec->{retained}->{$msg_topic};
570 0         0 print STDERR " retained cleared\n" if DEBUG;
571             } else {
572 0         0 $rec->{retained}->{$msg_topic} = $msg;
573 0         0 print STDERR " retained '", $msg_data, "'\n" if DEBUG;
574             }
575             }
576 19         26 foreach my $cb (values %{$rec->{cb}}) {
  19         61  
577 27 100       222 next if ($matched{$cb}++);
578 25         85 $cb->($msg_topic, $msg_data, $msg);
579             }
580             }
581 15         227 1;
582             }
583              
584             sub _process_publish {
585 16     16   45 my ($self, $handle, $msg, $error) = @_;
586 16         43 my $qos = $msg->qos;
587              
588             # assuming this was intended for a subscription not yet restored
589 16 50 66     119 if ($qos && !$self->{clean_session} && !@{$self->{_sub_topics}->values($msg->topic)}) {
  0   33     0  
590 0         0 print STDERR "Caching message for '", $msg->topic, "'\n" if DEBUG;
591 0         0 push(@{$self->{_qos_msg_cache}}, $msg);
  0         0  
592 0         0 return;
593             }
594              
595 16 100       47 if ($qos == MQTT_QOS_EXACTLY_ONCE) {
596 1         20 my $mid = $msg->message_id;
597 1         7 $self->{messages}->{$mid} = $msg;
598 1         5 $self->_send(message_type => MQTT_PUBREC, message_id => $mid);
599 1         3 return;
600             }
601 15         49 $self->_publish_locally($msg);
602 15 100       40 $self->_send(message_type => MQTT_PUBACK, message_id => $msg->message_id)
603             if ($qos == MQTT_QOS_AT_LEAST_ONCE);
604             return
605 15         63 }
606              
607             sub _inflight_record {
608 7     7   16 my ($self, $msg) = @_;
609 7         39 my $mid = $msg->message_id;
610 7 100       39 unless (exists $self->{inflight}->{$mid}) {
611 2         11 carp "Unexpected message for message id $mid\n ".$msg->string;
612 2         430 return;
613             }
614 5         11 my $exp_type = $self->{inflight}->{$mid}->{expect};
615 5         22 my $got_type = $msg->message_type;
616 5 100       21 unless ($got_type == $exp_type) {
617 1         5 carp 'Received ', message_type_string($got_type), ' but expected ',
618             message_type_string($exp_type), " for message id $mid\n";
619 1         224 return;
620             }
621 4         15 return delete $self->{inflight}->{$mid};
622             }
623              
624             sub _process_puback {
625 3     3   9 my ($self, $handle, $msg, $error) = @_;
626 3 100       8 my $rec = $self->_inflight_record($msg) or return;
627 2         5 my $mid = $msg->message_id;
628 2         6 print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
629 2         7 $rec->{cv}->send(1);
630 2         36 return 1;
631             }
632              
633             sub _process_pubrec {
634 2     2   6 my ($self, $handle, $msg, $error) = @_;
635 2 100       9 my $rec = $self->_inflight_record($msg) or return;
636 1         3 my $mid = $msg->message_id;
637 1         4 print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
638             $self->_send_with_ack({
639             message_type => MQTT_PUBREL,
640             qos => MQTT_QOS_AT_LEAST_ONCE,
641             message_id => $mid,
642 1         5 }, $rec->{cv}, MQTT_PUBCOMP);
643             }
644              
645             sub _process_pubrel {
646 2     2   8 my ($self, $handle, $msg, $error) = @_;
647 2         12 my $mid = $msg->message_id;
648 2         7 print STDERR 'PubRel: ', $mid, "\n" if DEBUG;
649 2         8 my $pubmsg = delete $self->{messages}->{$mid};
650 2 100       12 unless ($pubmsg) {
651 1         6 carp "Unexpected message for message id $mid\n ".$msg->string;
652 1         239 return;
653             }
654 1         4 $self->_publish_locally($pubmsg);
655 1         4 $self->_send(message_type => MQTT_PUBCOMP, message_id => $mid);
656             }
657              
658             sub _process_pubcomp {
659 2     2   7 my ($self, $handle, $msg, $error) = @_;
660 2 100       9 my $rec = $self->_inflight_record($msg) or return;
661 1         3 my $mid = $msg->message_id;
662 1         3 print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
663 1         3 $rec->{cv}->send(1);
664 1         19 return 1;
665             }
666              
667              
668             sub anyevent_read_type {
669 14     14 1 274 my ($handle, $cb) = @_;
670             subname 'anyevent_read_type_reader' => sub {
671 41     41   75287 my ($handle) = @_;
672 41         97 my $rbuf = \$handle->{rbuf};
673 41         166 weaken $rbuf;
674 41 50       127 return unless (defined $$rbuf);
675 41         65 while (1) {
676 99         502 my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
677 99 100       6367 last unless ($msg);
678 58         159 $cb->($handle, $msg);
679             }
680 41         94 return;
681 14         125 };
682             }
683              
684             1;
685              
686             __END__