File Coverage

blib/lib/AnyEvent/MQTT.pm
Criterion Covered Total %
statement 410 420 97.6
branch 127 140 90.7
condition 3 3 100.0
subroutine 58 58 100.0
pod 8 8 100.0
total 606 629 96.3


line stmt bran cond sub pod time code
1 15     15   229025 use strict;
  15         35  
  15         469  
2 15     15   85 use warnings;
  15         49  
  15         814  
3             package AnyEvent::MQTT;
4             $AnyEvent::MQTT::VERSION = '1.202051';
5             # ABSTRACT: AnyEvent module for an MQTT client
6              
7              
8 15     15   82 use constant DEBUG => $ENV{ANYEVENT_MQTT_DEBUG};
  15         28  
  15         1156  
9 15     15   90 use AnyEvent;
  15         27  
  15         343  
10 15     15   88 use AnyEvent::Handle;
  15         29  
  15         343  
11 15     15   71 use Net::MQTT::Constants;
  15         28  
  15         120  
12 15     15   12627 use Net::MQTT::Message;
  15         565893  
  15         600  
13 15     15   6932 use Net::MQTT::TopicStore;
  15         8291  
  15         547  
14 15     15   101 use Carp qw/croak carp/;
  15         36  
  15         944  
15 15     15   140 use Sub::Name;
  15         64  
  15         704  
16 15     15   95 use Scalar::Util qw/weaken/;
  15         32  
  15         82700  
17              
18              
19             sub new {
20 15     15 1 8227 my ($pkg, %p) = @_;
21 15         88 my $self =
22             bless {
23             socket => undef,
24             host => '127.0.0.1',
25             port => '1883',
26             timeout => 30,
27             wait => 'nothing',
28             keep_alive_timer => 120,
29             qos => MQTT_QOS_AT_MOST_ONCE,
30             message_id => 1,
31             user_name => undef,
32             password => undef,
33             will_topic => undef,
34             will_qos => MQTT_QOS_AT_MOST_ONCE,
35             will_retain => 0,
36             will_message => '',
37             client_id => undef,
38             clean_session => 1,
39             handle_args => [],
40             write_queue => [],
41             inflight => {},
42             _sub_topics => Net::MQTT::TopicStore->new(),
43             %p,
44             }, $pkg;
45             }
46              
47             sub DESTROY {
48 15     15   123814 $_[0]->cleanup;
49             }
50              
51              
52             sub cleanup {
53 20     20 1 61 my $self = shift;
54 20         39 print STDERR "cleanup\n" if DEBUG;
55 20 100       125 if ($self->{handle}) {
56 18         747 my $cv = AnyEvent->condvar;
57 18         180 my $handle = $self->{handle};
58 18         200 weaken $handle;
59 18     11   329 $cv->cb(sub { $handle->destroy });
  11         191  
60 18         252 $self->_send(message_type => MQTT_DISCONNECT, cv => $cv);
61             }
62 20         185 delete $self->{handle};
63 20         281 delete $self->{connected};
64 20         53 delete $self->{wait};
65 20         110 delete $self->{_keep_alive_handle};
66 20         50 delete $self->{_keep_alive_waiting};
67 20         868 $self->{write_queue} = [];
68             }
69              
70             sub _error {
71 5     5   124 my ($self, $fatal, $message, $reconnect) = @_;
72 5         30 $self->cleanup($message);
73 5 50       79 $self->{on_error}->($fatal, $message) if ($self->{on_error});
74 5 100       174 $self->_reconnect() if ($reconnect);
75             }
76              
77              
78             sub publish {
79 8     8 1 41127 my ($self, %p) = @_;
80             my $topic = exists $p{topic} ? $p{topic} :
81 8 100       109 croak ref $self, '->publish requires "topic" parameter';
82 7 100       21 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
83 7 100       173 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
84 7         40 my $expect;
85 7 100       23 if ($qos) {
86 3 100       7 $expect = ($qos == MQTT_QOS_AT_LEAST_ONCE ? MQTT_PUBACK : MQTT_PUBREC);
87             }
88 7         15 my $message = $p{message};
89 7 100       22 if (defined $message) {
90 4         7 print STDERR "publish: message[$message] => $topic\n" if DEBUG;
91 4         21 $self->_send_with_ack({
92             message_type => MQTT_PUBLISH,
93             %p,
94             }, $cv, $expect);
95 4         23 return $cv;
96             }
97             my $handle = exists $p{handle} ? $p{handle} :
98 3 100       82 croak ref $self, '->publish requires "message" or "handle" parameter';
99 2 100       35 unless ($handle->isa('AnyEvent::Handle')) {
100 1 50       2 my @args = @{$p{handle_args}||[]};
  1         7  
101 1         2 print STDERR "publish: IO[$handle] => $topic @args\n" if DEBUG;
102 1         19 $handle = AnyEvent::Handle->new(fh => $handle, @args);
103             }
104 2         94 my $error_sub = $handle->{on_error}; # Hack: There is no accessor api
105             $handle->on_error(subname 'on_error_for_read_publish_'.$topic =>
106             sub {
107 2     2   519 my ($hdl, $fatal, $msg) = @_;
108 2 50       14 $error_sub->(@_) if ($error_sub);
109 2         189 $hdl->destroy;
110 2         9 undef $hdl;
111 2         7 $cv->send(1);
112 2         38 });
113 2         12 my $weak_self = $self;
114 2         28 weaken $weak_self;
115 2 100       7 my @push_read_args = @{$p{push_read_args}||['line']};
  2         15  
116 2         6 my $sub; $sub = subname 'push_read_cb_for_'.$topic => sub {
117 2     2   1378 my ($hdl, $chunk, @args) = @_;
118 2         5 print STDERR "publish: $chunk => $topic\n" if DEBUG;
119 2         68 my $send_cv = AnyEvent->condvar;
120 2         17 print STDERR "publish: message[$chunk] => $topic\n" if DEBUG;
121             $weak_self->_send_with_ack({
122             message_type => MQTT_PUBLISH,
123             qos => $qos,
124             retain => $p{retain},
125 2         16 topic => $topic,
126             message => $chunk,
127             }, $send_cv, $expect);
128             $send_cv->cb(subname 'publish_ack_'.$topic =>
129 2         41 sub { $handle->push_read(@push_read_args => $sub ) });
  2         39  
130 2         110 return;
131 2         22 };
132 2         13 $handle->push_read(@push_read_args => $sub);
133 2         341 return $cv;
134             }
135              
136              
137             sub next_message_id {
138 70018     70018 1 258190 my $self = shift;
139 70018         91426 my $res = $self->{message_id};
140 70018         83471 $self->{message_id}++;
141 70018 100       113059 $self->{message_id} = 1 if $self->{message_id} >= 65536;
142 70018         102011 $res;
143             }
144              
145             sub _send_with_ack {
146 8     8   26 my ($self, $args, $cv, $expect, $dup) = @_;
147 8 100       24 if ($args->{qos}) {
148 5 100       17 unless (exists $args->{message_id}) {
149 3         7 $args->{message_id} = $self->next_message_id();
150             }
151 5         12 my $mid = $args->{message_id};
152 5         154 my $send_cv = AnyEvent->condvar;
153             $send_cv->cb(subname 'ack_cb_for_'.$mid => sub {
154             $self->{inflight}->{$mid} =
155             {
156             expect => $expect,
157             message => $args,
158             cv => $cv,
159             timeout =>
160             AnyEvent->timer(after => $self->{keep_alive_timer},
161             cb => subname 'ack_timeout_for_'.$mid =>
162             sub {
163 1         53 print ref $self, " timeout waiting for ",
164             message_type_string($expect), "\n" if DEBUG;
165 1         12 delete $self->{inflight}->{$mid};
166 1         6 $self->_send_with_ack($args, $cv, $expect, 1);
167 5     5   116 }),
168             };
169 5         107 });
170 5         60 $args->{cv} = $send_cv;
171             } else {
172 3         8 $args->{cv} = $cv;
173             }
174 8 100       25 $args->{dup} = 1 if ($dup);
175 8         41 return $self->_send(%$args);
176             }
177              
178              
179             sub subscribe {
180 19     19 1 10155 my ($self, %p) = @_;
181             my $topic = exists $p{topic} ? $p{topic} :
182 19 100       254 croak ref $self, '->subscribe requires "topic" parameter';
183             my $sub = exists $p{callback} ? $p{callback} :
184 18 100       124 croak ref $self, '->subscribe requires "callback" parameter';
185 17 100       53 my $qos = exists $p{qos} ? $p{qos} : MQTT_QOS_AT_MOST_ONCE;
186 17 100       401 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
187 17         119 my $mid = $self->_add_subscription($topic, $cv, $sub);
188 17 100       56 if (defined $mid) { # not already subscribed/subscribing
189 12         42 $self->_send(message_type => MQTT_SUBSCRIBE,
190             message_id => $mid,
191             topics => [[$topic, $qos]]);
192             }
193             $cv
194 17         62 }
195              
196              
197             sub unsubscribe {
198 7     7 1 8143 my ($self, %p) = @_;
199             my $topic = exists $p{topic} ? $p{topic} :
200 7 100       100 croak ref $self, '->unsubscribe requires "topic" parameter';
201 6 100       127 my $cv = exists $p{cv} ? delete $p{cv} : AnyEvent->condvar;
202 6         42 my $mid = $self->_remove_subscription($topic, $cv, $p{callback});
203 6 100       23 if (defined $mid) { # not already subscribed/subscribing
204 2         12 $self->_send(message_type => MQTT_UNSUBSCRIBE,
205             message_id => $mid,
206             topics => [$topic]);
207             }
208             $cv
209 6         24 }
210              
211             sub _add_subscription {
212 17     17   60 my ($self, $topic, $cv, $sub) = @_;
213 17         55 my $rec = $self->{_sub}->{$topic};
214 17 100       68 if ($rec) {
215 2         6 print STDERR "Add $sub to existing $topic subscription\n" if DEBUG;
216 2         8 $rec->{cb}->{$sub} = $sub;
217 2         14 $cv->send($rec->{qos});
218 2         30 foreach my $msg (values %{$rec->{retained}}) {
  2         10  
219 0         0 $sub->($msg->topic, $msg->message, $msg);
220             }
221 2         10 return;
222             }
223 15         26 $rec = $self->{_sub_pending}->{$topic};
224 15 100       42 if ($rec) {
225 3         6 print STDERR "Add $sub to existing pending $topic subscription\n" if DEBUG;
226 3         8 $rec->{cb}->{$sub} = $sub;
227 3         6 push @{$rec->{cv}}, $cv;
  3         8  
228 3         8 return;
229             }
230 12         35 my $mid = $self->next_message_id();
231 12         18 print STDERR "Add $sub as pending $topic subscription (mid=$mid)\n" if DEBUG;
232 12         37 $self->{_sub_pending_by_message_id}->{$mid} = $topic;
233 12         83 $self->{_sub_pending}->{$topic} =
234             { cb => { $sub => $sub }, cv => [ $cv ], retained => {} };
235 12         35 $mid;
236             }
237              
238             sub _remove_subscription {
239 6     6   20 my ($self, $topic, $cv, $sub) = @_;
240 6         13 my $rec = $self->{_unsub_pending}->{$topic};
241 6 100       17 if ($rec) {
242 1         2 print STDERR "Remove of $topic with pending unsubscribe\n" if DEBUG;
243 1         3 push @{$rec->{cv}}, $cv;
  1         3  
244 1         2 return;
245             }
246 5         12 $rec = $self->{_sub}->{$topic};
247 5 100       14 unless ($rec) {
248 1         2 print STDERR "Remove of $topic with no subscription\n" if DEBUG;
249 1         5 $cv->send(0);
250 1         11 return;
251             }
252              
253 4 100       10 if (defined $sub) {
254 3 50       10 unless (exists $rec->{cb}->{$sub}) {
255 0         0 print STDERR "Remove of $topic for $sub with no subscription\n"
256             if DEBUG;
257 0         0 $cv->send(0);
258 0         0 return;
259             }
260 3         7 delete $rec->{cb}->{$sub};
261 3 100       5 if (keys %{$rec->{cb}}) {
  3         12  
262 2         3 print STDERR "Remove of $topic for $sub\n" if DEBUG;
263 2         7 $cv->send(1);
264 2         23 return;
265             }
266             }
267 2         5 print STDERR "Remove of $topic\n" if DEBUG;
268 2         10 my $mid = $self->next_message_id();
269 2         6 delete $self->{_sub}->{$topic};
270 2         12 $self->{_sub_topics}->delete($topic);
271 2         17 $self->{_unsub_pending_by_message_id}->{$mid} = $topic;
272 2         9 $self->{_unsub_pending}->{$topic} = { cv => [ $cv ] };
273 2         18 return $mid;
274             }
275              
276             sub _confirm_subscription {
277 13     13   100 my ($self, $mid, $qos) = @_;
278 13         43 my $topic = delete $self->{_sub_pending_by_message_id}->{$mid};
279 13 100       55 unless (defined $topic) {
280 1         133 carp 'SubAck with no pending subscription for message id: ', $mid, "\n";
281 1         34 return;
282             }
283 12         43 my $rec = $self->{_sub}->{$topic} = delete $self->{_sub_pending}->{$topic};
284 12         53 $self->{_sub_topics}->add($topic);
285 12         465 $rec->{qos} = $qos;
286              
287 12         22 foreach my $cv (@{$rec->{cv}}) {
  12         41  
288 15         83 $cv->send($qos);
289             }
290 12         140 delete $rec->{cv};
291             }
292              
293             sub _confirm_unsubscribe {
294 3     3   18 my ($self, $mid) = @_;
295 3         10 my $topic = delete $self->{_unsub_pending_by_message_id}->{$mid};
296 3 100       12 unless (defined $topic) {
297 1         120 carp 'UnSubAck with no pending unsubscribe for message id: ', $mid, "\n";
298 1         34 return;
299             }
300 2         7 my $rec = delete $self->{_unsub_pending}->{$topic};
301 2         4 foreach my $cv (@{$rec->{cv}}) {
  2         8  
302 3         19 $cv->send(1);
303             }
304             }
305              
306             sub _send {
307 56     56   210063 my $self = shift;
308 56         244 my %p = @_;
309 56         141 my $cv = delete $p{cv};
310 56         415 my $msg = Net::MQTT::Message->new(%p);
311             $self->{connected} ?
312 56 100       1249 $self->_queue_write($msg, $cv) : $self->connect($msg, $cv);
313             }
314              
315             sub _queue_write {
316 56     56   146 my ($self, $msg, $cv) = @_;
317 56         121 my $queue = $self->{write_queue};
318 56         115 print STDERR 'Queuing: ', ($cv||'no cv'), ' ', $msg->string, "\n" if DEBUG;
319 56         103 push @{$queue}, [$msg, $cv];
  56         203  
320 56 100       287 $self->_write_now unless (defined $self->{_waiting});
321 56         749 $cv;
322             }
323              
324             sub _write_now {
325 114     114   217 my $self = shift;
326 114         234 my ($msg, $cv);
327 114         398 undef $self->{_waiting};
328 114 100       341 if (@_) {
329 16         51 ($msg, $cv) = @_;
330             } else {
331 98 100       164 my $args = shift @{$self->{write_queue}} or return;
  98         368  
332 50         174 ($msg, $cv) = @$args;
333             }
334 66         241 $self->_reset_keep_alive_timer();
335 66         2208 print STDERR "Sending: ", $msg->string, "\n" if DEBUG;
336 66 100       226 $self->{message_log_callback}->('>', $msg) if ($self->{message_log_callback});
337 66         878 $self->{_waiting} = [$msg, $cv];
338 66         120 print ' ', (unpack 'H*', $msg->bytes), "\n" if DEBUG;
339 66         467 $self->{handle}->push_write($msg->bytes);
340 66         7347 $cv;
341             }
342              
343             sub _reset_keep_alive_timer {
344 71     71   795 my ($self, $wait) = @_;
345 71         399 undef $self->{_keep_alive_handle};
346 71 100       251 my $method = $wait ? '_keep_alive_timeout' : '_send_keep_alive';
347 71         152 $self->{_keep_alive_waiting} = $wait;
348 71         121 my $weak_self = $self;
349 71         253 weaken $weak_self;
350             $self->{_keep_alive_handle} =
351             AnyEvent->timer(after => $self->{keep_alive_timer},
352             cb => subname((substr $method, 1).'_cb' =>
353 71     4   1227 sub { $weak_self->$method(@_) }));
  4         701280  
354             }
355              
356             sub _send_keep_alive {
357 3     3   15 my $self = shift;
358 3         9 print STDERR "Sending: keep alive\n" if DEBUG;
359 3         29 $self->_send(message_type => MQTT_PINGREQ);
360 3         19 $self->_reset_keep_alive_timer(1);
361             }
362              
363             sub _keep_alive_timeout {
364 1     1   9 my $self = shift;
365 1         4 print STDERR "keep alive timeout\n" if DEBUG;
366 1         8 undef $self->{_keep_alive_waiting};
367 1         18 $self->{handle}->destroy;
368 1         453 $self->_error(0, 'keep alive timeout', 1);
369             }
370              
371             sub _keep_alive_received {
372 3     3   8 my $self = shift;
373 3         5 print STDERR "keep alive received\n" if DEBUG;
374 3 100       17 return unless (defined $self->{_keep_alive_waiting});
375 1         3 $self->_reset_keep_alive_timer();
376             }
377              
378              
379             sub connect {
380 30     30 1 2195 my ($self, $msg, $cv) = @_;
381 30         60 print STDERR "connect\n" if DEBUG;
382 30         64 $self->{_waiting} = 'connect';
383 30 100       219 if ($msg) {
384 21 100       353 $cv = AnyEvent->condvar unless ($cv);
385 21         120 $self->_queue_write($msg, $cv);
386             } else {
387 9 100       267 $self->{connect_cv} = AnyEvent->condvar unless (exists $self->{connect_cv});
388 9         77 $cv = $self->{connect_cv};
389             }
390 30 100       143 return $cv if ($self->{handle});
391              
392 18         66 my $weak_self = $self;
393 18         81 weaken $weak_self;
394              
395 18         30 my $hd;
396             $hd = $self->{handle} =
397             AnyEvent::Handle->new(connect => [$self->{host}, $self->{port}],
398             on_error => subname('on_error_cb' => sub {
399 1     1   35 my ($handle, $fatal, $message) = @_;
400 1         2 print STDERR "handle error $_[1]\n" if DEBUG;
401 1         7 $handle->destroy;
402 1         102 $weak_self->_error($fatal, 'Error: '.$message, 0);
403             }),
404             on_eof => subname('on_eof_cb' => sub {
405 1     1   802 my ($handle) = @_;
406 1         1 print STDERR "handle eof\n" if DEBUG;
407 1         5 $handle->destroy;
408 1         56 $weak_self->_error(1, 'EOF', 1);
409             }),
410             on_timeout => subname('on_timeout_cb' => sub {
411 1     1   401003 $weak_self->_error(0, $weak_self->{wait}.' timeout', 1);
412 1         4 $weak_self->{wait} = 'nothing';
413             }),
414             on_connect => subname('on_connect_cb' => sub {
415 16     16   6586 my ($handle, $host, $port, $retry) = @_;
416 16         32 print STDERR "TCP handshake complete\n" if DEBUG;
417             # call user-defined on_connect function.
418 16 50       67 $weak_self->{on_connect}->($handle, $retry) if $weak_self->{on_connect};
419             my $msg =
420             Net::MQTT::Message->new(
421             message_type => MQTT_CONNECT,
422             keep_alive_timer => $weak_self->{keep_alive_timer},
423             client_id => $weak_self->{client_id},
424             clean_session => $weak_self->{clean_session},
425             will_topic => $weak_self->{will_topic},
426             will_qos => $weak_self->{will_qos},
427             will_retain => $weak_self->{will_retain},
428             will_message => $weak_self->{will_message},
429             user_name => $weak_self->{user_name},
430             password => $weak_self->{password},
431 16         140 );
432 16         531 $weak_self->_write_now($msg);
433 16         100 $handle->timeout($weak_self->{timeout});
434 16         1025 $weak_self->{wait} = 'connack';
435             $handle->on_read(subname 'on_read_cb' => sub {
436 14         26997 my ($hdl) = @_;
437             $hdl->push_read(ref $weak_self =>
438             subname 'reader_cb' => sub {
439 58         265 $weak_self->_handle_message(@_);
440 58         427 1;
441 14         203 });
442 16         272 });
443             }),
444 18         526 @{$self->{handle_args}},
  18         224  
445             );
446 18         12848 return $cv
447             }
448              
449             sub _reconnect {
450 3     3   9 my $self = shift;
451 3         8 print STDERR "reconnecting:\n" if DEBUG;
452 3         10 $self->{clean_session} = 0;
453 3         14 $self->connect(@_);
454             }
455              
456             sub _handle_message {
457 58     58   114 my $self = shift;
458 58         133 my ($handle, $msg, $error) = @_;
459 58 50       173 return $self->_error(0, $error, 1) if ($error);
460 58 100       158 $self->{message_log_callback}->('<', $msg) if ($self->{message_log_callback});
461 58 50       251 $self->_call_callback('before_msg_callback' => $msg) or return;
462 58         221 my $msg_type = lc ref $msg;
463 58         444 $msg_type =~ s/^.*:://;
464 58 50       230 $self->_call_callback('before_'.$msg_type.'_callback' => $msg) or return;
465 58         143 my $method = '_process_'.$msg_type;
466 58 100       377 unless ($self->can($method)) {
467 1         23 carp 'Unsupported message ', $msg->string(), "\n";
468 1         352 return;
469             }
470 57         238 my $res = $self->$method(@_);
471 57         344 $self->_call_callback('after_'.$msg_type.'_callback' => $msg, $res);
472 57         153 $res;
473             }
474              
475             sub _call_callback {
476 173     173   293 my $self = shift;
477 173         268 my $cb_name = shift;
478 173 50       608 return 1 unless (exists $self->{$cb_name});
479 0         0 $self->{$cb_name}->(@_);
480             }
481              
482             sub _process_connack {
483 13     13   42 my ($self, $handle, $msg, $error) = @_;
484 13         62 $handle->timeout(undef);
485 13 100       331 unless ($msg->return_code == MQTT_CONNECT_ACCEPTED) {
486 1         10 return $self->_error(1, 'Connection refused: '.$msg->string, 0);
487             }
488 12         86 print STDERR "Connection ready:\n", $msg->string(' '), "\n" if DEBUG;
489 12         59 $self->_write_now();
490 12         32 $self->{connected} = 1;
491 12 100       85 $self->{connect_cv}->send(1) if ($self->{connect_cv});
492 12         63 delete $self->{connect_cv};
493              
494 12         20 my $weak_self = $self;
495 12         47 weaken $weak_self;
496              
497             $handle->on_drain(subname 'on_drain_cb' => sub {
498 51     51   7689 print STDERR "drained\n" if DEBUG;
499 51         135 my $w = $weak_self->{_waiting};
500 51 100 100     547 $w->[1]->send(1) if (ref $w && defined $w->[1]);
501 51         1969 $weak_self->_write_now;
502 51         303 1;
503 12         177 });
504             return
505 12         35 }
506              
507             sub _process_pingresp {
508 3     3   12 shift->_keep_alive_received();
509             }
510              
511             sub _process_suback {
512 13     13   33 my ($self, $handle, $msg, $error) = @_;
513 13         67 print STDERR "Confirmed subscription:\n", $msg->string(' '), "\n" if DEBUG;
514 13         53 $self->_confirm_subscription($msg->message_id, $msg->qos_levels->[0]);
515             return
516 13         34 }
517              
518             sub _process_unsuback {
519 3     3   9 my ($self, $handle, $msg, $error) = @_;
520 3         9 print STDERR "Confirmed unsubscribe:\n", $msg->string(' '), "\n" if DEBUG;
521 3         17 $self->_confirm_unsubscribe($msg->message_id);
522             return
523 3         28 }
524              
525             sub _publish_locally {
526 16     16   41 my ($self, $msg) = @_;
527 16         59 my $msg_topic = $msg->topic;
528 16         103 my $msg_data = $msg->message;
529 16         131 my $matches = $self->{_sub_topics}->values($msg_topic);
530 16 100       486 unless (scalar @$matches) {
531 1         12 carp "Unexpected publish:\n", $msg->string(' '), "\n";
532 1         458 return;
533             }
534 15         33 my %matched;
535 15         92 my $msg_retain = $msg->retain;
536 15         93 foreach my $topic (@$matches) {
537 19         103 my $rec = $self->{_sub}->{$topic};
538 19 50       56 if ($msg_retain) {
539 0 0       0 if ($msg_data eq '') {
540 0         0 delete $rec->{retained}->{$msg_topic};
541 0         0 print STDERR " retained cleared\n" if DEBUG;
542             } else {
543 0         0 $rec->{retained}->{$msg_topic} = $msg;
544 0         0 print STDERR " retained '", $msg_data, "'\n" if DEBUG;
545             }
546             }
547 19         38 foreach my $cb (values %{$rec->{cb}}) {
  19         64  
548 27 100       292 next if ($matched{$cb}++);
549 25         89 $cb->($msg_topic, $msg_data, $msg);
550             }
551             }
552 15         335 1;
553             }
554              
555             sub _process_publish {
556 16     16   67 my ($self, $handle, $msg, $error) = @_;
557 16         54 my $qos = $msg->qos;
558 16 100       136 if ($qos == MQTT_QOS_EXACTLY_ONCE) {
559 1         7 my $mid = $msg->message_id;
560 1         13 $self->{messages}->{$mid} = $msg;
561 1         8 $self->_send(message_type => MQTT_PUBREC, message_id => $mid);
562 1         6 return;
563             }
564 15         64 $self->_publish_locally($msg);
565 15 100       56 $self->_send(message_type => MQTT_PUBACK, message_id => $msg->message_id)
566             if ($qos == MQTT_QOS_AT_LEAST_ONCE);
567             return
568 15         38 }
569              
570             sub _inflight_record {
571 7     7   19 my ($self, $msg) = @_;
572 7         36 my $mid = $msg->message_id;
573 7 100       49 unless (exists $self->{inflight}->{$mid}) {
574 2         16 carp "Unexpected message for message id $mid\n ".$msg->string;
575 2         592 return;
576             }
577 5         13 my $exp_type = $self->{inflight}->{$mid}->{expect};
578 5         18 my $got_type = $msg->message_type;
579 5 100       22 unless ($got_type == $exp_type) {
580 1         5 carp 'Received ', message_type_string($got_type), ' but expected ',
581             message_type_string($exp_type), " for message id $mid\n";
582 1         371 return;
583             }
584 4         16 return delete $self->{inflight}->{$mid};
585             }
586              
587             sub _process_puback {
588 3     3   18 my ($self, $handle, $msg, $error) = @_;
589 3 100       13 my $rec = $self->_inflight_record($msg) or return;
590 2         6 my $mid = $msg->message_id;
591 2         6 print STDERR 'PubAck: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
592 2         9 $rec->{cv}->send(1);
593 2         42 return 1;
594             }
595              
596             sub _process_pubrec {
597 2     2   8 my ($self, $handle, $msg, $error) = @_;
598 2 100       8 my $rec = $self->_inflight_record($msg) or return;
599 1         2 my $mid = $msg->message_id;
600 1         4 print STDERR 'PubRec: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
601             $self->_send_with_ack({
602             message_type => MQTT_PUBREL,
603             qos => MQTT_QOS_AT_LEAST_ONCE,
604             message_id => $mid,
605 1         3 }, $rec->{cv}, MQTT_PUBCOMP);
606             }
607              
608             sub _process_pubrel {
609 2     2   11 my ($self, $handle, $msg, $error) = @_;
610 2         18 my $mid = $msg->message_id;
611 2         14 print STDERR 'PubRel: ', $mid, "\n" if DEBUG;
612 2         13 my $pubmsg = delete $self->{messages}->{$mid};
613 2 100       14 unless ($pubmsg) {
614 1         11 carp "Unexpected message for message id $mid\n ".$msg->string;
615 1         283 return;
616             }
617 1         9 $self->_publish_locally($pubmsg);
618 1         7 $self->_send(message_type => MQTT_PUBCOMP, message_id => $mid);
619             }
620              
621             sub _process_pubcomp {
622 2     2   8 my ($self, $handle, $msg, $error) = @_;
623 2 100       10 my $rec = $self->_inflight_record($msg) or return;
624 1         3 my $mid = $msg->message_id;
625 1         3 print STDERR 'PubComp: ', $mid, ' ', $rec->{cv}, "\n" if DEBUG;
626 1         5 $rec->{cv}->send(1);
627 1         18 return 1;
628             }
629              
630              
631             sub anyevent_read_type {
632 14     14 1 428 my ($handle, $cb) = @_;
633             subname 'anyevent_read_type_reader' => sub {
634 51     51   340650 my ($handle) = @_;
635 51         125 my $rbuf = \$handle->{rbuf};
636 51         246 weaken $rbuf;
637 51 50       185 return unless (defined $$rbuf);
638 51         98 while (1) {
639 109         477 my $msg = Net::MQTT::Message->new_from_bytes($$rbuf, 1);
640 109 100       8013 last unless ($msg);
641 58         169 $cb->($handle, $msg);
642             }
643 51         153 return;
644 14         125 };
645             }
646              
647             1;
648              
649             __END__