File Coverage

blib/lib/AnyEvent/MQTT.pm
Criterion Covered Total %
statement 417 444 93.9
branch 129 148 87.1
condition 9 17 52.9
subroutine 58 58 100.0
pod 8 8 100.0
total 621 675 92.0


line stmt bran cond sub pod time code
1 15     15   229186 use strict;
  15         47  
  15         486  
2 15     15   81 use warnings;
  15         25  
  15         858  
3             package AnyEvent::MQTT;
4             $AnyEvent::MQTT::VERSION = '1.202052';
5             # ABSTRACT: AnyEvent module for an MQTT client
6              
7              
8 15     15   90 use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
  15         30  
  15         1435  
9 15     15   92 use AnyEvent;
  15         44  
  15         346  
10 15     15   82 use AnyEvent::Handle;
  15         27  
  15         342  
11 15     15   74 use Net::MQTT::Constants;
  15         27  
  15         126  
12 15     15   12406 use Net::MQTT::Message;
  15         564489  
  15         592  
13 15     15   6774 use Net::MQTT::TopicStore;
  15         8062  
  15         571  
14 15     15   108 use Carp qw/croak carp/;
  15         32  
  15         1011  
15 15     15   109 use Sub::Name;
  15         33  
  15         742  
16 15     15   92 use Scalar::Util qw/weaken/;
  15         44  
  15         86321  
17              
18              
19             sub new {
20 15     15 1 8733 my ($pkg, %p) = @_;
21 15         91 my $self =
22             bless {
23             socket => undef,
24             host => '127.0.0.1',
25             port => '1883',
26             timeout => 30,
27             wait => 'nothing',
28             keep_alive_timer => 120,
29             qos => MQTT_QOS_AT_MOST_ONCE,
30             message_id => 1,
31             user_name => undef,
32             password => undef,
33             will_topic => undef,
34             will_qos => MQTT_QOS_AT_MOST_ONCE,
35             will_retain => 0,
36             will_message => '',
37             client_id => undef,
38             clean_session => 1,
39             handle_args => [],
40             write_queue => [],
41             inflight => {},
42             _sub_topics => Net::MQTT::TopicStore->new(),
43             %p,
44             }, $pkg;
45             }
46              
47             sub DESTROY {
48 15     15   126877 $_[0]->cleanup;
49             }
50              
51              
52             sub cleanup {
53 20     20 1 94 my $self = shift;
54 20         49 print STDERR "cleanup\n" if DEBUG;
55 20 100       161 if ($self->{handle}) {
56 18         859 my $cv = AnyEvent->condvar;
57 18         195 my $handle = $self->{handle};
58 18         108 weaken $handle;
59 18     11   246 $cv->cb(sub { $handle->destroy });
  11         239  
60 18         266 $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
61             }
62 20         129 delete $self->{handle};
63 20         337 delete $self->{connected};
64 20         57 delete $self->{wait};
65 20         114 delete $self->{_keep_alive_handle};
66 20         43 delete $self->{_keep_alive_waiting};
67 20         1077 $self->{write_queue} = [];
68             }
69              
70             sub _error {
71 5     5   118 my ($self, $fatal, $message, $reconnect) = @_;
72 5         29 $self->cleanup($message);
73 5 50       52 $self->{on_error}->($fatal, $message) if ($self->{on_error});
74 5 100       196 $self->_reconnect() if ($reconnect);
75             }
76              
77              
78             sub publish {
79 8     8 1 42242 my ($self, %p) = @_;
80             my $topic = exists $p{topic} ? $p{topic} :
81 8 100       108 croak ref $self, '->publish requires "topic" parameter';
82 7 100       22 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
83 7 100       143 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
84 7         36 my $expect;
85 7 100       24 if ($qos) {
86 3 100       8 $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
87             }
88 7         14 my $message = $p{message};
89 7 100       21 if (defined $message) {
90 4         7 print STDERR "publish: message[$message] => $topic\n" if DEBUG;
91 4         23 $self->_send_with_ack({
92             message_type => MQTT_PUBLISH,
93             %p,
94             }, $cv, $expect);
95 4         19 return $cv;
96             }
97             my $handle = exists $p{handle} ? $p{handle} :
98 3 100       83 croak ref $self, '->publish requires "message" or "handle" parameter';
99 2 100       27 unless ($handle->isa('AnyEvent::Handle')) {
100 1 50       26 my @args = @{$p{handle_args}||[]};
  1         7  
101 1         2 print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
102 1         7 $handle = AnyEvent::Handle->new(fh => $handle, @args);
103             }
104 2         106 my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
105             $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
106             sub {
107 2     2   350 my ($hdl, $fatal, $msg) = @_;
108 2 50       10 $error_sub->(@_) if ($error_sub);
109 2         154 $hdl->destroy;
110 2         9 undef $hdl;
111 2         5 $cv->send(1);
112 2         34 });
113 2         9 my $weak_self = $self;
114 2         6 weaken $weak_self;
115 2 100       4 my @push_read_args = @{$p{push_read_args}||['line']};
  2         11  
116 2         5 my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
117 2     2   915 my ($hdl, $chunk, @args) = @_;
118 2         4 print STDERR "publish: $chunk => $topic\n" if DEBUG;
119 2         53 my $send_cv = AnyEvent->condvar;
120 2         13 print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
121             $weak_self->_send_with_ack({
122             message_type => MQTT_PUBLISH,
123             qos => $qos,
124             retain => $p{retain},
125 2         14 topic => $topic,
126             message => $chunk,
127             }, $send_cv, $expect);
128             $send_cv->cb(subname 'publish_ack_'.$topic =>
129 2         32 sub { $handle->push_read(@push_read_args => $sub ) });
  2         20  
130 2         69 return;
131 2         16 };
132 2         8 $handle->push_read(@push_read_args => $sub);
133 2         231 return $cv;
134             }
135              
136              
137             sub next_message_id {
138 70018     70018 1 259157 my $self = shift;
139 70018         88772 my $res = $self->{message_id};
140 70018         81016 $self->{message_id}++;
141 70018 100       112898 $self->{message_id} = 1 if $self->{message_id} >= 65536;
142 70018         101746 $res;
143             }
144              
145             sub _send_with_ack {
146 8     8   22 my ($self, $args, $cv, $expect, $dup) = @_;
147 8 100       21 if ($args->{qos}) {
148 5 100       14 unless (exists $args->{message_id}) {
149 3         7 $args->{message_id} = $self->next_message_id();
150             }
151 5         11 my $mid = $args->{message_id};
152 5         161 my $send_cv = AnyEvent->condvar;
153             $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
154             $self->{inflight}->{$mid} =
155             {
156             expect => $expect,
157             message => $args,
158             cv => $cv,
159             timeout =>
160             AnyEvent->timer(after => $self->{keep_alive_timer},
161             cb => subname 'ack_timeout_for_'.$mid =>
162             sub {
163 1         47 print ref $self, " timeout waiting for ",
164             message_type_string($expect), "\n" if DEBUG;
165 1         8 delete $self->{inflight}->{$mid};
166 1         10 $self->_send_with_ack($args, $cv, $expect, 1);
167 5     5   118 }),
168             };
169 5         105 });
170 5         60 $args->{cv} = $send_cv;
171             } else {
172 3         6 $args->{cv} = $cv;
173             }
174 8 100       22 $args->{dup} = 1 if ($dup);
175 8         40 return $self->_send(%$args);
176             }
177              
178              
179             sub subscribe {
180 19     19 1 9547 my ($self, %p) = @_;
181             my $topic = exists $p{topic} ? $p{topic} :
182 19 100       263 croak ref $self, '->subscribe requires "topic" parameter';
183             my $sub = exists $p{callback} ? $p{callback} :
184 18 100       174 croak ref $self, '->subscribe requires "callback" parameter';
185 17 100       42 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
186 17 100       371 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
187 17         114 my $mid = $self->_add_subscription($topic, $cv, $sub);
188 17 100       55 if (defined $mid) { # not already subscribed/subscribing
189 12         41 $self->_send(message_type => MQTT_SUBSCRIBE,
190             message_id => $mid,
191             topics => [[$topic, $qos]]);
192             }
193             $cv
194 17         66 }
195              
196              
197             sub unsubscribe {
198 7     7 1 8279 my ($self, %p) = @_;
199             my $topic = exists $p{topic} ? $p{topic} :
200 7 100       107 croak ref $self, '->unsubscribe requires "topic" parameter';
201 6 100       142 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
202 6         46 my $mid = $self->_remove_subscription($topic, $cv, $p{callback});
203 6 100       20 if (defined $mid) { # not already subscribed/subscribing
204 2         9 $self->_send(message_type => MQTT_UNSUBSCRIBE,
205             message_id => $mid,
206             topics => [$topic]);
207             }
208             $cv
209 6         30 }
210              
211             sub _add_subscription {
212 17     17   45 my ($self, $topic, $cv, $sub) = @_;
213 17         40 my $rec = $self->{_sub}->{$topic};
214 17 100       52 if ($rec) {
215 2         6 print STDERR "Add $sub to existing $topic subscription\n" if DEBUG;
216 2         17 $rec->{cb}->{$sub} = $sub;
217 2         13 $cv->send($rec->{qos});
218 2         30 foreach my $msg (values %{$rec->{retained}}) {
  2         10  
219 0         0 $sub->($msg->topic, $msg->message, $msg);
220             }
221 2         8 return;
222             }
223 15         32 $rec = $self->{_sub_pending}->{$topic};
224 15 100       36 if ($rec) {
225 3         8 print STDERR "Add $sub to existing pending $topic subscription\n" if DEBUG;
226 3         8 $rec->{cb}->{$sub} = $sub;
227 3         8 push @{$rec->{cv}}, $cv;
  3         9  
228 3         8 return;
229             }
230 12         36 my $mid = $self->next_message_id();
231 12         18 print STDERR "Add $sub as pending $topic subscription (mid=$mid)\n" if DEBUG;
232 12         36 $self->{_sub_pending_by_message_id}->{$mid} = $topic;
233 12         66 $self->{_sub_pending}->{$topic} =
234             { cb => { $sub => $sub }, cv => [ $cv ], retained => {} };
235 12         36 $mid;
236             }
237              
238             sub _remove_subscription {
239 6     6   20 my ($self, $topic, $cv, $sub) = @_;
240 6         18 my $rec = $self->{_unsub_pending}->{$topic};
241 6 100       20 if ($rec) {
242 1         3 print STDERR "Remove of $topic with pending unsubscribe\n" if DEBUG;
243 1         2 push @{$rec->{cv}}, $cv;
  1         5  
244 1         3 return;
245             }
246 5         13 $rec = $self->{_sub}->{$topic};
247 5 100       16 unless ($rec) {
248 1         2 print STDERR "Remove of $topic with no subscription\n" if DEBUG;
249 1         5 $cv->send(0);
250 1         11 return;
251             }
252              
253 4 100       15 if (defined $sub) {
254 3 50       11 unless (exists $rec->{cb}->{$sub}) {
255 0         0 print STDERR "Remove of $topic for $sub with no subscription\n"
256             if DEBUG;
257 0         0 $cv->send(0);
258 0         0 return;
259             }
260 3         10 delete $rec->{cb}->{$sub};
261 3 100       6 if (keys %{$rec->{cb}}) {
  3         13  
262 2         5 print STDERR "Remove of $topic for $sub\n" if DEBUG;
263 2         11 $cv->send(1);
264 2         32 return;
265             }
266             }
267 2         4 print STDERR "Remove of $topic\n" if DEBUG;
268 2         11 my $mid = $self->next_message_id();
269 2         7 delete $self->{_sub}->{$topic};
270 2         12 $self->{_sub_topics}->delete($topic);
271 2         17 $self->{_unsub_pending_by_message_id}->{$mid} = $topic;
272 2         7 $self->{_unsub_pending}->{$topic} = { cv => [ $cv ] };
273 2         16 return $mid;
274             }
275              
276             sub _confirm_subscription {
277 13     13   104 my ($self, $mid, $qos) = @_;
278 13         45 my $topic = delete $self->{_sub_pending_by_message_id}->{$mid};
279 13 100       42 unless (defined $topic) {
280 1         166 carp 'SubAck with no pending subscription for message id: ', $mid, "\n";
281 1         38 return;
282             }
283 12         43 my $rec = $self->{_sub}->{$topic} = delete $self->{_sub_pending}->{$topic};
284 12         58 $self->{_sub_topics}->add($topic);
285 12         464 $rec->{qos} = $qos;
286              
287 12         23 foreach my $cv (@{$rec->{cv}}) {
  12         34  
288 15         73 $cv->send($qos);
289             }
290              
291             # publish any matching queued QoS messages
292 12 0 33     154 if (!$self->{clean_session} && $qos && $self->{_qos_msg_cache}) {
      33        
293 0         0 my $cache = $self->{_qos_msg_cache};
294 0         0 my $ts = Net::MQTT::TopicStore->new($topic);
295 0         0 for my $i (grep { $ts->values($cache->[$_]->topic) } reverse(0..$#$cache)) {
  0         0  
296 0         0 my $msg = delete $cache->[$i];
297 0         0 print STDERR "Processing cached message for topic '", $msg->topic, "' with subscription to topic '$topic'\n" if DEBUG;
298 0         0 $self->_process_publish($self->{handle}, $msg);
299             }
300 0 0       0 delete $self->{_qos_msg_cache} unless @$cache;
301             }
302 12         35 delete $rec->{cv};
303             }
304              
305             sub _confirm_unsubscribe {
306 3     3   18 my ($self, $mid) = @_;
307 3         11 my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
308 3 100       10 unless (defined $topic) {
309 1         135 carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
310 1         32 return;
311             }
312 2         7 my $rec = delete $self->{_unsub_pending}->{$topic};
313 2         5 foreach my $cv (@{$rec->{cv}}) {
  2         7  
314 3         18 $cv->send(1);
315             }
316             }
317              
318             sub _send {
319 56     56   211775 my $self = shift;
320 56         270 my %p = @_;
321 56         165 my $cv = delete $p{cv};
322 56         388 my $msg = Net::MQTT::Message->new(%p);
323             $self->{connected} ?
324 56 100       1204 $self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
325             }
326              
327             sub _queue_write {
328 56     56   179 my ($self, $msg, $cv) = @_;
329 56         163 my $queue = $self->{write_queue};
330 56         103 print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
331 56         109 push @{$queue}, [$msg, $cv];
  56         193  
332 56 100       289 $self->_write_now unless (defined $self->{_waiting});
333 56         515 $cv;
334             }
335              
336             sub _write_now {
337 114     114   228 my $self = shift;
338 114         209 my ($msg, $cv);
339 114         402 undef $self->{_waiting};
340 114 100       335 if (@_) {
341 16         42 ($msg, $cv) = @_;
342             } else {
343 98 100       166 my $args = shift @{$self->{write_queue}} or return;
  98         369  
344 50         173 ($msg, $cv) = @$args;
345             }
346 66         226 $self->_reset_keep_alive_timer();
347 66         2168 print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
348 66 100       213 $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
349 66         790 $self->{_waiting} = [$msg, $cv];
350 66         117 print ' ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
351 66         428 $self->{handle}->push_write($msg->bytes);
352 66         6849 $cv;
353             }
354              
355             sub _reset_keep_alive_timer {
356 71     71   800 my ($self, $wait) = @_;
357 71         379 undef $self->{_keep_alive_handle};
358 71 100       238 my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
359 71         158 $self->{_keep_alive_waiting} = $wait;
360 71         130 my $weak_self = $self;
361 71         268 weaken $weak_self;
362             $self->{_keep_alive_handle} =
363             AnyEvent->timer(after => $self->{keep_alive_timer},
364             cb => subname((substr $method, 1).'_cb' =>
365 71     4   1153 sub { $weak_self->$method(@_) }));
  4         701381  
366             }
367              
368             sub _send_keep_alive {
369 3     3   13 my $self = shift;
370 3         9 print STDERR "Sending: keep alive\n" if DEBUG;
371 3         36 $self->_send(message_type => MQTT_PINGREQ);
372 3         13 $self->_reset_keep_alive_timer(1);
373             }
374              
375             sub _keep_alive_timeout {
376 1     1   7 my $self = shift;
377 1         3 print STDERR "keep alive timeout\n" if DEBUG;
378 1         6 undef $self->{_keep_alive_waiting};
379 1         13 $self->{handle}->destroy;
380 1         290 $self->_error(0, 'keep alive timeout', 1);
381             }
382              
383             sub _keep_alive_received {
384 3     3   11 my $self = shift;
385 3         8 print STDERR "keep alive received\n" if DEBUG;
386 3 100       18 return unless (defined $self->{_keep_alive_waiting});
387 1         4 $self->_reset_keep_alive_timer();
388             }
389              
390              
391             sub connect {
392 30     30 1 2052 my ($self, $msg, $cv) = @_;
393 30         55 print STDERR "connect\n" if DEBUG;
394 30         90 $self->{_waiting} = 'connect';
395 30 100       242 if ($msg) {
396 21 100       319 $cv = AnyEvent->condvar unless ($cv);
397 21         111 $self->_queue_write($msg, $cv);
398             } else {
399 9 100       288 $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
400 9         76 $cv = $self->{connect_cv};
401             }
402 30 100       158 return $cv if ($self->{handle});
403              
404 18         53 my $weak_self = $self;
405 18         76 weaken $weak_self;
406              
407 18         33 my $hd;
408             $hd = $self->{handle} =
409             AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
410             on_error => subname('on_error_cb' => sub {
411 1     1   24 my ($handle, $fatal, $message) = @_;
412 1         2 print STDERR "handle error $_[1]\n" if DEBUG;
413 1         5 $handle->destroy;
414 1         93 $weak_self->_error($fatal, 'Error: '.$message, 0);
415             }),
416             on_eof => subname('on_eof_cb' => sub {
417 1     1   799 my ($handle) = @_;
418 1         2 print STDERR "handle eof\n" if DEBUG;
419 1         4 $handle->destroy;
420 1         61 $weak_self->_error(1, 'EOF', 1);
421             }),
422             on_timeout => subname('on_timeout_cb' => sub {
423 1     1   401050 $weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
424 1         6 $weak_self->{wait} = 'nothing';
425             }),
426             on_connect => subname('on_connect_cb' => sub {
427 16     16   6481 my ($handle, $host, $port, $retry) = @_;
428 16         30 print STDERR "TCP handshake complete\n" if DEBUG;
429             # call user-defined on_connect function.
430 16 50       62 $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
431             my $msg =
432             Net::MQTT::Message->new(
433             message_type => MQTT_CONNECT,
434             keep_alive_timer => $weak_self->{keep_alive_timer},
435             client_id => $weak_self->{client_id},
436             clean_session => $weak_self->{clean_session},
437             will_topic => $weak_self->{will_topic},
438             will_qos => $weak_self->{will_qos},
439             will_retain => $weak_self->{will_retain},
440             will_message => $weak_self->{will_message},
441             user_name => $weak_self->{user_name},
442             password => $weak_self->{password},
443 16         134 );
444 16         465 $weak_self->_write_now($msg);
445 16         108 $handle->timeout($weak_self->{timeout});
446 16         958 $weak_self->{wait} = 'connack';
447             $handle->on_read(subname 'on_read_cb' => sub {
448 14         26192 my ($hdl) = @_;
449             $hdl->push_read(ref $weak_self =>
450             subname 'reader_cb' => sub {
451 58         266 $weak_self->_handle_message(@_);
452 58         405 1;
453 14         176 });
454 16         301 });
455             }),
456 18         501 @{$self->{handle_args}},
  18         226  
457             );
458 18         12901 return $cv
459             }
460              
461             sub _reconnect {
462 3     3   12 my $self = shift;
463 3         7 print STDERR "reconnecting:\n" if DEBUG;
464              
465             # must resubscribe everything
466 3 50       18 if ($self->{clean_session}) {
467 3         45 $self->{_sub_topics} = Net::MQTT::TopicStore->new();
468 3   50     137 $self->{_sub_reconnect} = delete $self->{_sub} || {};
469             }
470              
471 3         20 $self->connect(@_);
472             }
473              
474             sub _handle_message {
475 58     58   107 my $self = shift;
476 58         143 my ($handle, $msg, $error) = @_;
477 58 50       164 return $self->_error(0, $error, 1) if ($error);
478 58 100       182 $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
479 58 50       230 $self->_call_callback('before_msg_callback' => $msg) or return;
480 58         220 my $msg_type = lc ref $msg;
481 58         460 $msg_type =~ s/^.*:://;
482 58 50       223 $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
483 58         147 my $method = '_process_'.$msg_type;
484 58 100       392 unless ($self->can($method)) {
485 1         13 carp 'Unsupported message ', $msg->string(), "\n";
486 1         378 return;
487             }
488 57         224 my $res = $self->$method(@_);
489 57         330 $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
490 57         115 $res;
491             }
492              
493             sub _call_callback {
494 173     173   293 my $self = shift;
495 173         263 my $cb_name = shift;
496 173 50       627 return 1 unless (exists $self->{$cb_name});
497 0         0 $self->{$cb_name}->(@_);
498             }
499              
500             sub _process_connack {
501 13     13   42 my ($self, $handle, $msg, $error) = @_;
502 13         76 $handle->timeout(undef);
503 13 100       291 unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
504 1         11 return $self->_error(1, 'Connection refused: '.$msg->string, 0);
505             }
506 12         75 print STDERR "Connection ready:\n", $msg->string(' '), "\n" if DEBUG;
507 12         38 $self->_write_now();
508 12         32 $self->{connected} = 1;
509 12 100       69 $self->{connect_cv}->send(1) if ($self->{connect_cv});
510 12         50 delete $self->{connect_cv};
511              
512 12         23 my $weak_self = $self;
513 12         45 weaken $weak_self;
514              
515             $handle->on_drain(subname 'on_drain_cb' => sub {
516 51     51   7723 print STDERR "drained\n" if DEBUG;
517 51         136 my $w = $weak_self->{_waiting};
518 51 100 100     506 $w->[1]->send(1) if (ref $w && defined $w->[1]);
519 51         1996 $weak_self->_write_now;
520 51         283 1;
521 12         145 });
522              
523             # handle reconnect
524 12         27 while (my ($topic, $rec) = each %{$self->{_sub_reconnect}}) {
  12         157  
525 0         0 print STDERR "Resubscribing to '$topic':\n" if DEBUG;
526 0         0 for my $cb (values %{$rec->{cb}}) {
  0         0  
527 0         0 $self->subscribe(topic => $topic, callback => $cb, qos => $rec->{qos});
528             }
529             }
530 12         79 delete $self->{_sub_reconnect};
531             return
532 12         102 }
533              
534             sub _process_pingresp {
535 3     3   13 shift->_keep_alive_received();
536             }
537              
538             sub _process_suback {
539 13     13   37 my ($self, $handle, $msg, $error) = @_;
540 13         21 print STDERR "Confirmed subscription:\n", $msg->string(' '), "\n" if DEBUG;
541 13         58 $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
542             return
543 13         31 }
544              
545             sub _process_unsuback {
546 3     3   14 my ($self, $handle, $msg, $error) = @_;
547 3         5 print STDERR "Confirmed unsubscribe:\n", $msg->string(' '), "\n" if DEBUG;
548 3         19 $self->_confirm_unsubscribe($msg->message_id);
549             return
550 3         28 }
551              
552             sub _publish_locally {
553 16     16   44 my ($self, $msg) = @_;
554 16         64 my $msg_topic = $msg->topic;
555 16         112 my $msg_data = $msg->message;
556 16         126 my $matches = $self->{_sub_topics}->values($msg_topic);
557 16 100       468 unless (scalar @$matches) {
558 1         10 carp "Unexpected publish:\n", $msg->string(' '), "\n";
559 1         396 return;
560             }
561 15         48 my %matched;
562 15         134 my $msg_retain = $msg->retain;
563 15         94 foreach my $topic (@$matches) {
564 19         108 my $rec = $self->{_sub}->{$topic};
565 19 50       55 if ($msg_retain) {
566 0 0       0 if ($msg_data eq '') {
567 0         0 delete $rec->{retained}->{$msg_topic};
568 0         0 print STDERR " retained cleared\n" if DEBUG;
569             } else {
570 0         0 $rec->{retained}->{$msg_topic} = $msg;
571 0         0 print STDERR " retained '", $msg_data, "'\n" if DEBUG;
572             }
573             }
574 19         70 foreach my $cb (values %{$rec->{cb}}) {
  19         75  
575 27 100       372 next if ($matched{$cb}++);
576 25         96 $cb->($msg_topic, $msg_data, $msg);
577             }
578             }
579 15         329 1;
580             }
581              
582             sub _process_publish {
583 16     16   61 my ($self, $handle, $msg, $error) = @_;
584 16         64 my $qos = $msg->qos;
585              
586             # assuming this was intended for a subscription not yet restored
587 16 50 66     202 if ($qos && !$self->{clean_session} && !@{$self->{_sub_topics}->values($msg->topic)}) {
  0   33     0  
588 0         0 print STDERR "Caching message for '", $msg->topic, "'\n" if DEBUG;
589 0         0 push(@{$self->{_qos_msg_cache}}, $msg);
  0         0  
590 0         0 return;
591             }
592              
593 16 100       62 if ($qos == MQTT_QOS_EXACTLY_ONCE) {
594 1         10 my $mid = $msg->message_id;
595 1         11 $self->{messages}->{$mid} = $msg;
596 1         7 $self->_send(message_type => MQTT_PUBREC, message_id => $mid);
597 1         6 return;
598             }
599 15         53 $self->_publish_locally($msg);
600 15 100       84 $self->_send(message_type => MQTT_PUBACK, message_id => $msg->message_id)
601             if ($qos == MQTT_QOS_AT_LEAST_ONCE);
602             return
603 15         37 }
604              
605             sub _inflight_record {
606 7     7   19 my ($self, $msg) = @_;
607 7         50 my $mid = $msg->message_id;
608 7 100       49 unless (exists $self->{inflight}->{$mid}) {
609 2         14 carp "Unexpected message for message id $mid\n ".$msg->string;
610 2         505 return;
611             }
612 5         11 my $exp_type = $self->{inflight}->{$mid}->{expect};
613 5         23 my $got_type = $msg->message_type;
614 5 100       40 unless ($got_type == $exp_type) {
615 1         7 carp 'Received ', message_type_string($got_type), ' but expected ',
616             message_type_string($exp_type), " for message id $mid\n";
617 1         488 return;
618             }
619 4         17 return delete $self->{inflight}->{$mid};
620             }
621              
622             sub _process_puback {
623 3     3   14 my ($self, $handle, $msg, $error) = @_;
624 3 100       10 my $rec = $self->_inflight_record($msg) or return;
625 2         7 my $mid = $msg->message_id;
626 2         8 print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
627 2         9 $rec->{cv}->send(1);
628 2         45 return 1;
629             }
630              
631             sub _process_pubrec {
632 2     2   19 my ($self, $handle, $msg, $error) = @_;
633 2 100       9 my $rec = $self->_inflight_record($msg) or return;
634 1         3 my $mid = $msg->message_id;
635 1         3 print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
636             $self->_send_with_ack({
637             message_type => MQTT_PUBREL,
638             qos => MQTT_QOS_AT_LEAST_ONCE,
639             message_id => $mid,
640 1         4 }, $rec->{cv}, MQTT_PUBCOMP);
641             }
642              
643             sub _process_pubrel {
644 2     2   10 my ($self, $handle, $msg, $error) = @_;
645 2         20 my $mid = $msg->message_id;
646 2         13 print STDERR 'PubRel: ', $mid, "\n" if DEBUG;
647 2         10 my $pubmsg = delete $self->{messages}->{$mid};
648 2 100       15 unless ($pubmsg) {
649 1         9 carp "Unexpected message for message id $mid\n ".$msg->string;
650 1         338 return;
651             }
652 1         6 $self->_publish_locally($pubmsg);
653 1         6 $self->_send(message_type => MQTT_PUBCOMP, message_id => $mid);
654             }
655              
656             sub _process_pubcomp {
657 2     2   6 my ($self, $handle, $msg, $error) = @_;
658 2 100       8 my $rec = $self->_inflight_record($msg) or return;
659 1         3 my $mid = $msg->message_id;
660 1         4 print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
661 1         4 $rec->{cv}->send(1);
662 1         18 return 1;
663             }
664              
665              
666             sub anyevent_read_type {
667 14     14 1 317 my ($handle, $cb) = @_;
668             subname 'anyevent_read_type_reader' => sub {
669 51     51   338066 my ($handle) = @_;
670 51         131 my $rbuf = \$handle->{rbuf};
671 51         246 weaken $rbuf;
672 51 50       176 return unless (defined $$rbuf);
673 51         457 while (1) {
674 109         486 my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
675 109 100       8084 last unless ($msg);
676 58         176 $cb->($handle, $msg);
677             }
678 51         161 return;
679 14         117 };
680             }
681              
682             1;
683              
684             __END__