File Coverage

blib/lib/AnyEvent/RabbitMQ.pm
Criterion Covered Total %
statement 51 351 14.5
branch 0 140 0.0
condition 0 48 0.0
subroutine 17 64 26.5
pod 0 11 0.0
total 68 614 11.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ;
2              
3 1     1   106157 use strict;
  1         3  
  1         43  
4 1     1   6 use warnings;
  1         3  
  1         32  
5 1     1   5 use Carp qw(confess croak);
  1         2  
  1         51  
6 1     1   6 use Scalar::Util qw(refaddr);
  1         2  
  1         52  
7 1     1   616 use List::MoreUtils qw(none);
  1         13333  
  1         8  
8 1     1   1592 use Devel::GlobalDestruction;
  1         2043  
  1         7  
9 1     1   568 use File::ShareDir;
  1         13643  
  1         63  
10 1     1   579 use Readonly;
  1         4131  
  1         53  
11 1     1   8 use Scalar::Util qw/ weaken /;
  1         2  
  1         162  
12              
13             require Data::Dumper;
14             sub Dumper {
15             local $Data::Dumper::Terse = 1;
16             local $Data::Dumper::Indent = 1;
17             local $Data::Dumper::Useqq = 1;
18             local $Data::Dumper::Deparse = 1;
19             local $Data::Dumper::Quotekeys = 0;
20             local $Data::Dumper::Sortkeys = 1;
21             &Data::Dumper::Dumper
22             }
23              
24 1     1   833 use AnyEvent::Handle;
  1         27742  
  1         51  
25 1     1   682 use AnyEvent::Socket;
  1         15780  
  1         108  
26              
27 1     1   561 use Net::AMQP 0.06;
  1         84684  
  1         41  
28 1     1   10 use Net::AMQP::Common qw(:all);
  1         2  
  1         196  
29              
30 1     1   615 use AnyEvent::RabbitMQ::Channel;
  1         4  
  1         31  
31 1     1   8 use AnyEvent::RabbitMQ::LocalQueue;
  1         3  
  1         22  
32              
33 1     1   5 use namespace::clean;
  1         2  
  1         4  
34              
35             our $VERSION = '1.21';
36              
37             use constant {
38 1         4533 _ST_CLOSED => 0,
39             _ST_OPENING => 1,
40             _ST_OPEN => 2,
41             _ST_CLOSING => 3,
42 1     1   751 };
  1         2  
43              
44             Readonly my $DEFAULT_AMQP_SPEC
45             => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
46              
47             Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1;
48              
49             sub new {
50 0     0 0   my $class = shift;
51 0           return bless {
52             verbose => 0,
53             @_,
54             _state => _ST_CLOSED,
55             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
56             _last_chan_id => 0,
57             _channels => {},
58             _login_user => '',
59             _server_properties => {},
60             _frame_max => undef,
61             _body_max => undef,
62             _channel_max => undef,
63             }, $class;
64             }
65              
66             sub verbose {
67 0     0 0   my $self = shift;
68             @_ ? ($self->{verbose} = shift) : $self->{verbose}
69 0 0         }
70              
71             sub is_open {
72 0     0 0   my $self = shift;
73 0           $self->{_state} == _ST_OPEN
74             }
75              
76             sub channels {
77 0     0 0   my $self = shift;
78 0           return $self->{_channels};
79             }
80              
81             sub _delete_channel {
82 0     0     my $self = shift;
83 0           my ($channel,) = @_;
84 0           my $c = $self->{_channels}->{$channel->id};
85 0 0 0       if (defined($c) && refaddr($c) == refaddr($channel)) {
86 0           delete $self->{_channels}->{$channel->id};
87 0           return 1;
88             }
89 0           return 0;
90             }
91              
92             sub login_user {
93 0     0 0   my $self = shift;
94 0           return $self->{_login_user};
95             }
96              
97             my $_loaded_spec;
98             sub load_xml_spec {
99 0     0 0   my $self = shift;
100 0           my ($spec) = @_;
101 0   0       $spec ||= $DEFAULT_AMQP_SPEC;
102 0 0 0       if ($_loaded_spec && $_loaded_spec ne $spec) {
    0          
103 0           croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
104             }
105             elsif (!$_loaded_spec) {
106 0           Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
107             }
108 0           return $self;
109             }
110              
111             sub connect {
112 0     0 0   my $self = shift;
113 0           my %args = $self->_set_cbs(@_);
114              
115 0 0         if ($self->{_state} != _ST_CLOSED) {
116 0           $args{on_failure}->('Connection has already been opened');
117 0           return $self;
118             }
119              
120 0   0 0     $args{on_close} ||= sub {};
121 0   0 0     $args{on_read_failure} ||= sub {warn @_, "\n"};
  0            
122 0   0       $args{timeout} ||= 0;
123              
124 0           for (qw/ host port /) {
125 0 0         $args{$_} or return $args{on_failure}->("No $_ passed to connect");
126             }
127              
128 0 0         if ($self->{verbose}) {
129 0           warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
130             }
131              
132 0           $self->{_state} = _ST_OPENING;
133              
134 0           weaken(my $weak_self = $self);
135 0           my $conn; $conn = AnyEvent::Socket::tcp_connect(
136             $args{host},
137             $args{port},
138             sub {
139 0     0     undef $conn;
140 0 0         my $self = $weak_self or return;
141              
142 0           my $fh = shift;
143              
144 0 0         unless ($fh) {
145 0           $self->{_state} = _ST_CLOSED;
146             return $args{on_failure}->(
147 0           sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
148             );
149             }
150              
151 0           my $close_cb = $args{on_close};
152 0           my $failure_cb = $args{on_failure};
153             $self->{_handle} = AnyEvent::Handle->new(
154             fh => $fh,
155             on_error => sub {
156 0           my ($handle, $fatal, $message) = @_;
157 0 0         my $self = $weak_self or return;
158              
159 0 0         if ($self->is_open) {
160 0           $self->_server_closed($close_cb, $message);
161             }
162             else {
163 0           $failure_cb->(@_);
164             }
165             },
166             on_drain => sub {
167 0           my ($handle) = @_;
168 0 0         my $self = $weak_self or return;
169              
170             $self->{drain_condvar}->send
171 0 0         if exists $self->{drain_condvar};
172             },
173             $args{tls} ? (tls => 'connect') : (),
174 0 0         $args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (),
    0          
175             );
176 0           $self->_read_loop($args{on_close}, $args{on_read_failure});
177 0           $self->_start(%args,);
178             },
179             sub {
180 0     0     return $args{timeout};
181             },
182 0           );
183              
184 0           return $self;
185             }
186              
187             sub server_properties {
188 0     0 0   return shift->{_server_properties};
189             }
190              
191             sub _read_loop {
192 0     0     my ($self, $close_cb, $failure_cb,) = @_;
193              
194 0 0         return if !defined $self->{_handle}; # called on_error
195              
196 0           weaken(my $weak_self = $self);
197             $self->{_handle}->push_read(chunk => 8, sub {
198 0 0   0     my $self = $weak_self or return;
199 0           my $data = $_[1];
200 0           my $stack = $_[1];
201              
202 0 0         if (length($data) <= 7) {
203 0           $failure_cb->('Broken data was received');
204 0           @_ = ($self, $close_cb, $failure_cb,);
205 0           goto &_read_loop;
206             }
207              
208 0           my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
209 0 0 0       if (!defined $type_id || !defined $channel || !defined $length) {
      0        
210 0           $failure_cb->('Broken data was received');
211 0           @_ = ($self, $close_cb, $failure_cb,);
212 0           goto &_read_loop;
213             }
214              
215             $self->{_handle}->push_read(chunk => $length, sub {
216 0 0         my $self = $weak_self or return;
217 0           $stack .= $_[1];
218 0           my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
219              
220 0 0         $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
221              
222 0 0         if ($self->{verbose}) {
223 0           warn '[C] <-- [S] ', Dumper($frame),
224             '-----------', "\n";
225             }
226              
227 0           my $id = $frame->channel;
228 0 0         if (0 == $id) {
229 0 0         if ($frame->type_id == 8) {
230             # Heartbeat, no action needs taking.
231             }
232             else {
233 0 0         return unless $self->_check_close_and_clean($frame, $close_cb,);
234 0           $self->{_queue}->push($frame);
235             }
236             } else {
237 0           my $channel = $self->{_channels}->{$id};
238 0 0         if (defined $channel) {
239 0           $channel->push_queue_or_consume($frame, $failure_cb);
240             } else {
241 0           $failure_cb->('Unknown channel id: ' . $frame->channel);
242             }
243             }
244              
245 0           @_ = ($self, $close_cb, $failure_cb,);
246 0           goto &_read_loop;
247 0           });
248 0           });
249              
250 0           return $self;
251             }
252              
253             sub _check_close_and_clean {
254 0     0     my $self = shift;
255 0           my ($frame, $close_cb,) = @_;
256              
257 0 0         my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;
258              
259 0 0         if ($self->{_state} == _ST_CLOSED) {
260 0   0       return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
261             }
262              
263 0 0 0       if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
264 0           delete $self->{_heartbeat_timer};
265 0           $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
266 0           $self->_server_closed($close_cb, $frame);
267 0           return;
268             }
269              
270 0           return 1;
271             }
272              
273             sub _server_closed {
274 0     0     my $self = shift;
275 0           my ($close_cb, $why,) = @_;
276              
277 0           $self->{_state} = _ST_CLOSING;
278 0           for my $channel (values %{ $self->{_channels} }) {
  0            
279 0 0         $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
280             }
281 0           $self->{_channels} = {};
282 0           $self->{_handle}->push_shutdown;
283 0           $self->{_state} = _ST_CLOSED;
284              
285 0           $close_cb->($why);
286 0           return;
287             }
288              
289             sub _start {
290 0     0     my $self = shift;
291 0           my %args = @_;
292              
293 0 0         if ($self->{verbose}) {
294 0           warn 'post header', "\n";
295             }
296              
297 0           $self->{_handle}->push_write(Net::AMQP::Protocol->header);
298              
299             $self->_push_read_and_valid(
300             'Connection::Start',
301             sub {
302 0     0     my $frame = shift;
303              
304 0           my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
305             return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
306 0 0         if none {$_ eq 'AMQPLAIN'} @mechanisms;
  0            
307              
308 0           my @locales = split /\s/, $frame->method_frame->locales;
309             return $args{on_failure}->('en_US is not found in locales')
310 0 0         if none {$_ eq 'en_US'} @locales;
  0            
311              
312 0           $self->{_server_properties} = $frame->method_frame->server_properties;
313              
314             $self->_push_write(
315             Net::AMQP::Protocol::Connection::StartOk->new(
316             client_properties => {
317             platform => 'Perl',
318             product => __PACKAGE__,
319             information => 'http://d.hatena.ne.jp/cooldaemon/',
320             version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
321             capabilities => {
322             consumer_cancel_notify => Net::AMQP::Value::true,
323             exchange_exchange_bindings => Net::AMQP::Value::true,
324             },
325 0 0         %{ $args{client_properties} || {} },
326             },
327             mechanism => 'AMQPLAIN',
328             response => {
329             LOGIN => $args{user},
330             PASSWORD => $args{pass},
331             },
332 0           locale => 'en_US',
333             ),
334             );
335              
336 0           $self->_tune(%args,);
337             },
338             $args{on_failure},
339 0           );
340              
341 0           return $self;
342             }
343              
344             sub _tune {
345 0     0     my $self = shift;
346 0           my %args = @_;
347              
348 0           weaken(my $weak_self = $self);
349             $self->_push_read_and_valid(
350             'Connection::Tune',
351             sub {
352 0 0   0     my $self = $weak_self or return;
353 0           my $frame = shift;
354              
355 0           my %tune;
356 0           foreach (qw( channel_max frame_max heartbeat )) {
357 0   0       my $client = $args{tune}{$_} || 0;
358 0   0       my $server = $frame->method_frame->$_ || 0;
359              
360             # negotiate with the server such that we cannot request a larger
361             # value set by the server, unless the server said unlimited
362 0 0 0       $tune{$_} = ($server == 0 or $client == 0)
    0          
    0          
363             ? ($server > $client ? $server : $client) # max
364             : ($client > $server ? $server : $client); # min
365             }
366              
367 0 0         if ($self->{_frame_max} = $tune{frame_max}) {
368             # calculate how big the body can actually be
369 0           $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
370             }
371              
372 0   0       $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
373              
374 0           $self->_push_write(
375             Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
376             );
377              
378 0 0         if ($tune{heartbeat} > 0) {
379 0           $self->_start_heartbeat($tune{heartbeat}, %args,);
380             }
381              
382 0           $self->_open(%args,);
383             },
384             $args{on_failure},
385 0           );
386              
387 0           return $self;
388             }
389              
390             sub _start_heartbeat {
391 0     0     my ($self, $interval, %args,) = @_;
392              
393 0           my $close_cb = $args{on_close};
394 0           my $failure_cb = $args{on_read_failure};
395 0           my $last_recv = 0;
396 0           my $idle_cycles = 0;
397 0           weaken(my $weak_self = $self);
398             my $timer_cb = sub {
399 0 0   0     my $self = $weak_self or return;
400 0 0         if ($self->{_heartbeat_recv} != $last_recv) {
    0          
401 0           $last_recv = $self->{_heartbeat_recv};
402 0           $idle_cycles = 0;
403             }
404             elsif (++$idle_cycles > 1) {
405 0           delete $self->{_heartbeat_timer};
406 0           $failure_cb->("Heartbeat lost");
407 0           $self->_server_closed($close_cb, "Heartbeat lost");
408 0           return;
409             }
410 0           $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
411 0           };
412              
413 0           $self->{_heartbeat_recv} = time;
414 0           $self->{_heartbeat_timer} = AnyEvent->timer(
415             after => $interval,
416             interval => $interval,
417             cb => $timer_cb,
418             );
419              
420 0           return $self;
421             }
422              
423             sub _open {
424 0     0     my $self = shift;
425 0           my %args = @_;
426              
427             $self->_push_write_and_read(
428             'Connection::Open',
429             {
430             virtual_host => $args{vhost},
431             insist => 1,
432             },
433             'Connection::OpenOk',
434             sub {
435 0     0     $self->{_state} = _ST_OPEN;
436 0           $self->{_login_user} = $args{user};
437 0           $args{on_success}->($self);
438             },
439             $args{on_failure},
440 0           );
441              
442 0           return $self;
443             }
444              
445             sub close {
446 0 0   0 0   return if in_global_destruction;
447 0           my $self = shift;
448 0           my %args = $self->_set_cbs(@_);
449              
450 0 0         if ($self->{_state} == _ST_CLOSED) {
451 0           $args{on_success}->(@_);
452 0           return $self;
453             }
454 0 0         if ($self->{_state} != _ST_OPEN) {
455 0 0         $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
456 0           return $self;
457             }
458 0           $self->{_state} = _ST_CLOSING;
459              
460             my $cv = AE::cv {
461 0     0     delete $self->{_closing};
462 0           $self->_finish_close(%args);
463 0           };
464              
465 0           $cv->begin();
466              
467 0           my @ids = keys %{$self->{_channels}};
  0            
468 0           for my $id (@ids) {
469 0           my $channel = $self->{_channels}->{$id};
470 0 0         if ($channel->is_open) {
471 0           $cv->begin();
472             $channel->close(
473 0     0     on_success => sub { $cv->end() },
474 0     0     on_failure => sub { $cv->end() },
475 0           );
476             }
477             }
478              
479 0           $cv->end();
480              
481 0           return $self;
482             }
483              
484             sub _finish_close {
485 0     0     my $self = shift;
486 0           my %args = @_;
487              
488 0 0         if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
  0 0          
  0            
  0            
489 0           $args{on_failure}->("BUG: closing with channel(s) open: @ch");
490 0           return;
491             }
492              
493 0           $self->{_state} = _ST_CLOSED;
494              
495             $self->_push_write_and_read(
496             'Connection::Close', {}, 'Connection::CloseOk',
497             sub {
498             # circular ref ok
499 0     0     $self->{_handle}->push_shutdown;
500 0           $args{on_success}->(@_);
501             },
502             sub {
503             # circular ref ok
504 0     0     $self->{_handle}->push_shutdown;
505 0           $args{on_failure}->(@_);
506             },
507 0           );
508              
509 0           return;
510             }
511              
512             sub open_channel {
513 0     0 0   my $self = shift;
514 0           my %args = $self->_set_cbs(@_);
515              
516 0 0         return $self if !$self->_check_open($args{on_failure});
517              
518 0   0 0     $args{on_close} ||= sub {};
519              
520 0           my $id = $args{id};
521 0 0 0       if ($id && $self->{_channels}->{$id}) {
522 0           $args{on_failure}->("Channel id $id is already in use");
523 0           return $self;
524             }
525              
526 0 0         if (!$id) {
527 0           my $try_id = $self->{_last_chan_id};
528 0           for (1 .. $self->{_channel_max}) {
529 0 0         $try_id = 1 if ++$try_id > $self->{_channel_max};
530 0 0         unless (defined $self->{_channels}->{$try_id}) {
531 0           $id = $try_id;
532 0           last;
533             }
534             }
535 0 0         if (!$id) {
536 0           $args{on_failure}->('Ran out of channel ids');
537 0           return $self;
538             }
539 0           $self->{_last_chan_id} = $id;
540             }
541              
542             my $channel = AnyEvent::RabbitMQ::Channel->new(
543             id => $id,
544             connection => $self,
545             on_close => $args{on_close},
546 0           );
547              
548 0           $self->{_channels}->{$id} = $channel;
549              
550             $channel->open(
551             on_success => sub {
552 0     0     $args{on_success}->($channel);
553             },
554             on_failure => sub {
555 0     0     $self->_delete_channel($channel);
556 0           $args{on_failure}->(@_);
557             },
558 0           );
559              
560 0           return $self;
561             }
562              
563             sub _push_write_and_read {
564 0     0     my $self = shift;
565 0           my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
566              
567 0           $method = 'Net::AMQP::Protocol::' . $method;
568 0           $self->_push_write(
569             Net::AMQP::Frame::Method->new(
570             method_frame => $method->new(%$args)
571             ),
572             $id,
573             );
574              
575 0           return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
576             }
577              
578             sub _push_read_and_valid {
579 0     0     my $self = shift;
580 0           my ($exp, $cb, $failure_cb, $id,) = @_;
581 0 0         $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
582              
583 0           my $queue;
584 0 0         if (!$id) {
    0          
585 0           $queue = $self->{_queue};
586             } elsif (defined $self->{_channels}->{$id}) {
587 0           $queue = $self->{_channels}->{$id}->queue;
588             } else {
589 0           $failure_cb->('Unknown channel id: ' . $id);
590             }
591              
592 0 0         return unless $queue; # Can go away in global destruction..
593             $queue->get(sub {
594 0     0     my $frame = shift;
595              
596 0 0         return $failure_cb->('Received data is not method frame')
597             if !$frame->isa('Net::AMQP::Frame::Method');
598              
599 0           my $method_frame = $frame->method_frame;
600 0           for my $exp_elem (@$exp) {
601 0 0         return $cb->($frame)
602             if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
603             }
604              
605 0 0         $failure_cb->(
606             $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
607             ? 'Channel closed'
608             : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
609             );
610 0           });
611             }
612              
613             sub _push_write {
614 0     0     my $self = shift;
615 0           my ($output, $id,) = @_;
616              
617 0 0         if ($output->isa('Net::AMQP::Protocol::Base')) {
618 0           $output = $output->frame_wrap;
619             }
620 0   0       $output->channel($id || 0);
621              
622 0 0         if ($self->{verbose}) {
623 0           warn '[C] --> [S] ', Dumper($output);
624             }
625              
626             $self->{_handle}->push_write($output->to_raw_frame())
627 0 0         if $self->{_handle}; # Careful - could have gone (global destruction)
628 0           return;
629             }
630              
631             sub _set_cbs {
632 0     0     my $self = shift;
633 0           my %args = @_;
634              
635 0   0 0     $args{on_success} ||= sub {};
636 0 0 0 0     $args{on_failure} ||= sub { die @_ unless in_global_destruction };
  0            
637              
638 0           return %args;
639             }
640              
641             sub _check_open {
642 0     0     my $self = shift;
643 0           my ($failure_cb) = @_;
644              
645 0 0         return 1 if $self->is_open;
646              
647 0           $failure_cb->('Connection has already been closed');
648 0           return 0;
649             }
650              
651             sub drain_writes {
652 0     0 0   my ($self, $timeout) = shift;
653 0           $self->{drain_condvar} = AnyEvent->condvar;
654 0 0         if ($timeout) {
655             $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
656 0     0     $self->{drain_condvar}->croak("Timed out after $timeout");
657 0           });
658             }
659 0           $self->{drain_condvar}->recv;
660 0           delete $self->{drain_timer};
661             }
662              
663             sub DESTROY {
664 0     0     my $self = shift;
665 0 0         $self->close() unless in_global_destruction;
666 0           return;
667             }
668              
669             1;
670             __END__