File Coverage

blib/lib/AnyEvent/RabbitMQ.pm
Criterion Covered Total %
statement 51 351 14.5
branch 0 142 0.0
condition 0 48 0.0
subroutine 17 64 26.5
pod 0 11 0.0
total 68 616 11.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ;
2              
3 1     1   105775 use strict;
  1         2  
  1         29  
4 1     1   7 use warnings;
  1         14  
  1         28  
5 1     1   5 use Carp qw(confess croak);
  1         2  
  1         53  
6 1     1   6 use Scalar::Util qw(refaddr);
  1         2  
  1         52  
7 1     1   678 use List::MoreUtils qw(none);
  1         13636  
  1         8  
8 1     1   1740 use Devel::GlobalDestruction;
  1         2178  
  1         7  
9 1     1   601 use File::ShareDir;
  1         13759  
  1         44  
10 1     1   561 use Readonly;
  1         4004  
  1         70  
11 1     1   10 use Scalar::Util qw/ weaken /;
  1         2  
  1         183  
12              
13             require Data::Dumper;
14             sub Dumper {
15             local $Data::Dumper::Terse = 1;
16             local $Data::Dumper::Indent = 1;
17             local $Data::Dumper::Useqq = 1;
18             local $Data::Dumper::Deparse = 1;
19             local $Data::Dumper::Quotekeys = 0;
20             local $Data::Dumper::Sortkeys = 1;
21             &Data::Dumper::Dumper
22             }
23              
24 1     1   933 use AnyEvent::Handle;
  1         27784  
  1         38  
25 1     1   641 use AnyEvent::Socket;
  1         15787  
  1         118  
26              
27 1     1   551 use Net::AMQP 0.06;
  1         87632  
  1         45  
28 1     1   8 use Net::AMQP::Common qw(:all);
  1         3  
  1         223  
29              
30 1     1   665 use AnyEvent::RabbitMQ::Channel;
  1         3  
  1         32  
31 1     1   7 use AnyEvent::RabbitMQ::LocalQueue;
  1         2  
  1         20  
32              
33 1     1   5 use namespace::clean;
  1         2  
  1         3  
34              
35             our $VERSION = '1.22'; # VERSION
36              
37             use constant {
38 1         4567 _ST_CLOSED => 0,
39             _ST_OPENING => 1,
40             _ST_OPEN => 2,
41             _ST_CLOSING => 3,
42 1     1   683 };
  1         18  
43              
44             Readonly my $DEFAULT_AMQP_SPEC
45             => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
46              
47             Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1;
48              
49             sub new {
50 0     0 0   my $class = shift;
51 0           return bless {
52             verbose => 0,
53             @_,
54             _state => _ST_CLOSED,
55             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
56             _last_chan_id => 0,
57             _channels => {},
58             _login_user => '',
59             _server_properties => {},
60             _frame_max => undef,
61             _body_max => undef,
62             _channel_max => undef,
63             }, $class;
64             }
65              
66             sub verbose {
67 0     0 0   my $self = shift;
68             @_ ? ($self->{verbose} = shift) : $self->{verbose}
69 0 0         }
70              
71             sub is_open {
72 0     0 0   my $self = shift;
73 0           $self->{_state} == _ST_OPEN
74             }
75              
76             sub channels {
77 0     0 0   my $self = shift;
78 0           return $self->{_channels};
79             }
80              
81             sub _delete_channel {
82 0     0     my $self = shift;
83 0           my ($channel,) = @_;
84 0           my $c = $self->{_channels}->{$channel->id};
85 0 0 0       if (defined($c) && refaddr($c) == refaddr($channel)) {
86 0           delete $self->{_channels}->{$channel->id};
87 0           return 1;
88             }
89 0           return 0;
90             }
91              
92             sub login_user {
93 0     0 0   my $self = shift;
94 0           return $self->{_login_user};
95             }
96              
97             my $_loaded_spec;
98             sub load_xml_spec {
99 0     0 0   my $self = shift;
100 0           my ($spec) = @_;
101 0   0       $spec ||= $DEFAULT_AMQP_SPEC;
102 0 0 0       if ($_loaded_spec && $_loaded_spec ne $spec) {
    0          
103 0           croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
104             }
105             elsif (!$_loaded_spec) {
106 0           Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
107             }
108 0           return $self;
109             }
110              
111             sub connect {
112 0     0 0   my $self = shift;
113 0           my %args = $self->_set_cbs(@_);
114              
115 0 0         if ($self->{_state} != _ST_CLOSED) {
116 0           $args{on_failure}->('Connection has already been opened');
117 0           return $self;
118             }
119              
120 0   0 0     $args{on_close} ||= sub {};
121 0   0 0     $args{on_read_failure} ||= sub {warn @_, "\n"};
  0            
122 0   0       $args{timeout} ||= 0;
123              
124 0           for (qw/ host port /) {
125 0 0         $args{$_} or return $args{on_failure}->("No $_ passed to connect");
126             }
127              
128 0 0         if ($self->{verbose}) {
129 0           warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
130             }
131              
132 0           $self->{_state} = _ST_OPENING;
133              
134 0           weaken(my $weak_self = $self);
135 0           my $conn; $conn = AnyEvent::Socket::tcp_connect(
136             $args{host},
137             $args{port},
138             sub {
139 0     0     undef $conn;
140 0 0         my $self = $weak_self or return;
141              
142 0           my $fh = shift;
143              
144 0 0         unless ($fh) {
145 0           $self->{_state} = _ST_CLOSED;
146             return $args{on_failure}->(
147 0           sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
148             );
149             }
150              
151 0           my $close_cb = $args{on_close};
152 0           my $failure_cb = $args{on_failure};
153             $self->{_handle} = AnyEvent::Handle->new(
154             fh => $fh,
155             on_error => sub {
156 0           my ($handle, $fatal, $message) = @_;
157 0 0         my $self = $weak_self or return;
158              
159 0 0         if ($self->is_open) {
160 0           $self->_server_closed($close_cb, $message);
161             }
162             else {
163 0           $failure_cb->(@_);
164             }
165             },
166             on_drain => sub {
167 0           my ($handle) = @_;
168 0 0         my $self = $weak_self or return;
169              
170             $self->{drain_condvar}->send
171 0 0         if exists $self->{drain_condvar};
172             },
173             peername => $args{host},
174             $args{tls} ? (tls => 'connect') : (),
175             $args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (),
176 0 0         $args{nodelay} ? ( nodelay => $args{nodelay} ) : (),
    0          
    0          
177             );
178 0           $self->_read_loop($args{on_close}, $args{on_read_failure});
179 0           $self->_start(%args,);
180             },
181             sub {
182 0     0     return $args{timeout};
183             },
184 0           );
185              
186 0           return $self;
187             }
188              
189             sub server_properties {
190 0     0 0   return shift->{_server_properties};
191             }
192              
193             sub _read_loop {
194 0     0     my ($self, $close_cb, $failure_cb,) = @_;
195              
196 0 0         return if !defined $self->{_handle}; # called on_error
197              
198 0           weaken(my $weak_self = $self);
199             $self->{_handle}->push_read(chunk => 8, sub {
200 0 0   0     my $self = $weak_self or return;
201 0           my $data = $_[1];
202 0           my $stack = $_[1];
203              
204 0 0         if (length($data) <= 7) {
205 0           $failure_cb->('Broken data was received');
206 0           @_ = ($self, $close_cb, $failure_cb,);
207 0           goto &_read_loop;
208             }
209              
210 0           my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
211 0 0 0       if (!defined $type_id || !defined $channel || !defined $length) {
      0        
212 0           $failure_cb->('Broken data was received');
213 0           @_ = ($self, $close_cb, $failure_cb,);
214 0           goto &_read_loop;
215             }
216              
217             $self->{_handle}->push_read(chunk => $length, sub {
218 0 0         my $self = $weak_self or return;
219 0           $stack .= $_[1];
220 0           my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
221              
222 0 0         $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
223              
224 0 0         if ($self->{verbose}) {
225 0           warn '[C] <-- [S] ', Dumper($frame),
226             '-----------', "\n";
227             }
228              
229 0           my $id = $frame->channel;
230 0 0         if (0 == $id) {
231 0 0         if ($frame->type_id == 8) {
232             # Heartbeat, no action needs taking.
233             }
234             else {
235 0 0         return unless $self->_check_close_and_clean($frame, $close_cb,);
236 0           $self->{_queue}->push($frame);
237             }
238             } else {
239 0           my $channel = $self->{_channels}->{$id};
240 0 0         if (defined $channel) {
241 0           $channel->push_queue_or_consume($frame, $failure_cb);
242             } else {
243 0           $failure_cb->('Unknown channel id: ' . $frame->channel);
244             }
245             }
246              
247 0           @_ = ($self, $close_cb, $failure_cb,);
248 0           goto &_read_loop;
249 0           });
250 0           });
251              
252 0           return $self;
253             }
254              
255             sub _check_close_and_clean {
256 0     0     my $self = shift;
257 0           my ($frame, $close_cb,) = @_;
258              
259 0 0         my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;
260              
261 0 0         if ($self->{_state} == _ST_CLOSED) {
262 0   0       return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
263             }
264              
265 0 0 0       if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
266 0           delete $self->{_heartbeat_timer};
267 0           $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
268 0           $self->_server_closed($close_cb, $frame);
269 0           return;
270             }
271              
272 0           return 1;
273             }
274              
275             sub _server_closed {
276 0     0     my $self = shift;
277 0           my ($close_cb, $why,) = @_;
278              
279 0           $self->{_state} = _ST_CLOSING;
280 0           for my $channel (values %{ $self->{_channels} }) {
  0            
281 0 0         $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
282             }
283 0           $self->{_channels} = {};
284 0           $self->{_handle}->push_shutdown;
285 0           $self->{_state} = _ST_CLOSED;
286              
287 0           $close_cb->($why);
288 0           return;
289             }
290              
291             sub _start {
292 0     0     my $self = shift;
293 0           my %args = @_;
294              
295 0 0         if ($self->{verbose}) {
296 0           warn 'post header', "\n";
297             }
298              
299 0           $self->{_handle}->push_write(Net::AMQP::Protocol->header);
300              
301             $self->_push_read_and_valid(
302             'Connection::Start',
303             sub {
304 0     0     my $frame = shift;
305              
306 0           my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
307             return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
308 0 0         if none {$_ eq 'AMQPLAIN'} @mechanisms;
  0            
309              
310 0           my @locales = split /\s/, $frame->method_frame->locales;
311             return $args{on_failure}->('en_US is not found in locales')
312 0 0         if none {$_ eq 'en_US'} @locales;
  0            
313              
314 0           $self->{_server_properties} = $frame->method_frame->server_properties;
315              
316             $self->_push_write(
317             Net::AMQP::Protocol::Connection::StartOk->new(
318             client_properties => {
319             platform => 'Perl',
320             product => __PACKAGE__,
321             information => 'http://d.hatena.ne.jp/cooldaemon/',
322             version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
323             capabilities => {
324             consumer_cancel_notify => Net::AMQP::Value::true,
325             exchange_exchange_bindings => Net::AMQP::Value::true,
326             },
327 0 0         %{ $args{client_properties} || {} },
328             },
329             mechanism => 'AMQPLAIN',
330             response => {
331             LOGIN => $args{user},
332             PASSWORD => $args{pass},
333             },
334 0           locale => 'en_US',
335             ),
336             );
337              
338 0           $self->_tune(%args,);
339             },
340             $args{on_failure},
341 0           );
342              
343 0           return $self;
344             }
345              
346             sub _tune {
347 0     0     my $self = shift;
348 0           my %args = @_;
349              
350 0           weaken(my $weak_self = $self);
351             $self->_push_read_and_valid(
352             'Connection::Tune',
353             sub {
354 0 0   0     my $self = $weak_self or return;
355 0           my $frame = shift;
356              
357 0           my %tune;
358 0           foreach (qw( channel_max frame_max heartbeat )) {
359 0   0       my $client = $args{tune}{$_} || 0;
360 0   0       my $server = $frame->method_frame->$_ || 0;
361              
362             # negotiate with the server such that we cannot request a larger
363             # value set by the server, unless the server said unlimited
364 0 0 0       $tune{$_} = ($server == 0 or $client == 0)
    0          
    0          
365             ? ($server > $client ? $server : $client) # max
366             : ($client > $server ? $server : $client); # min
367             }
368              
369 0 0         if ($self->{_frame_max} = $tune{frame_max}) {
370             # calculate how big the body can actually be
371 0           $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
372             }
373              
374 0   0       $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
375              
376 0           $self->_push_write(
377             Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
378             );
379              
380 0 0         if ($tune{heartbeat} > 0) {
381 0           $self->_start_heartbeat($tune{heartbeat}, %args,);
382             }
383              
384 0           $self->_open(%args,);
385             },
386             $args{on_failure},
387 0           );
388              
389 0           return $self;
390             }
391              
392             sub _start_heartbeat {
393 0     0     my ($self, $interval, %args,) = @_;
394              
395 0           my $close_cb = $args{on_close};
396 0           my $failure_cb = $args{on_read_failure};
397 0           my $last_recv = 0;
398 0           my $idle_cycles = 0;
399 0           weaken(my $weak_self = $self);
400             my $timer_cb = sub {
401 0 0   0     my $self = $weak_self or return;
402 0 0         if ($self->{_heartbeat_recv} != $last_recv) {
    0          
403 0           $last_recv = $self->{_heartbeat_recv};
404 0           $idle_cycles = 0;
405             }
406             elsif (++$idle_cycles > 1) {
407 0           delete $self->{_heartbeat_timer};
408 0           $failure_cb->("Heartbeat lost");
409 0           $self->_server_closed($close_cb, "Heartbeat lost");
410 0           return;
411             }
412 0           $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
413 0           };
414              
415 0           $self->{_heartbeat_recv} = time;
416 0           $self->{_heartbeat_timer} = AnyEvent->timer(
417             after => $interval,
418             interval => $interval,
419             cb => $timer_cb,
420             );
421              
422 0           return $self;
423             }
424              
425             sub _open {
426 0     0     my $self = shift;
427 0           my %args = @_;
428              
429             $self->_push_write_and_read(
430             'Connection::Open',
431             {
432             virtual_host => $args{vhost},
433             insist => 1,
434             },
435             'Connection::OpenOk',
436             sub {
437 0     0     $self->{_state} = _ST_OPEN;
438 0           $self->{_login_user} = $args{user};
439 0           $args{on_success}->($self);
440             },
441             $args{on_failure},
442 0           );
443              
444 0           return $self;
445             }
446              
447             sub close {
448 0 0   0 0   return if in_global_destruction;
449 0           my $self = shift;
450 0           my %args = $self->_set_cbs(@_);
451              
452 0 0         if ($self->{_state} == _ST_CLOSED) {
453 0           $args{on_success}->(@_);
454 0           return $self;
455             }
456 0 0         if ($self->{_state} != _ST_OPEN) {
457 0 0         $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
458 0           return $self;
459             }
460 0           $self->{_state} = _ST_CLOSING;
461              
462             my $cv = AE::cv {
463 0     0     delete $self->{_closing};
464 0           $self->_finish_close(%args);
465 0           };
466              
467 0           $cv->begin();
468              
469 0           my @ids = keys %{$self->{_channels}};
  0            
470 0           for my $id (@ids) {
471 0           my $channel = $self->{_channels}->{$id};
472 0 0         if ($channel->is_open) {
473 0           $cv->begin();
474             $channel->close(
475 0     0     on_success => sub { $cv->end() },
476 0     0     on_failure => sub { $cv->end() },
477 0           );
478             }
479             }
480              
481 0           $cv->end();
482              
483 0           return $self;
484             }
485              
486             sub _finish_close {
487 0     0     my $self = shift;
488 0           my %args = @_;
489              
490 0 0         if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
  0 0          
  0            
  0            
491 0           $args{on_failure}->("BUG: closing with channel(s) open: @ch");
492 0           return;
493             }
494              
495 0           $self->{_state} = _ST_CLOSED;
496              
497             $self->_push_write_and_read(
498             'Connection::Close', {}, 'Connection::CloseOk',
499             sub {
500             # circular ref ok
501 0     0     $self->{_handle}->push_shutdown;
502 0           $args{on_success}->(@_);
503             },
504             sub {
505             # circular ref ok
506 0     0     $self->{_handle}->push_shutdown;
507 0           $args{on_failure}->(@_);
508             },
509 0           );
510              
511 0           return;
512             }
513              
514             sub open_channel {
515 0     0 0   my $self = shift;
516 0           my %args = $self->_set_cbs(@_);
517              
518 0 0         return $self if !$self->_check_open($args{on_failure});
519              
520 0   0 0     $args{on_close} ||= sub {};
521              
522 0           my $id = $args{id};
523 0 0 0       if ($id && $self->{_channels}->{$id}) {
524 0           $args{on_failure}->("Channel id $id is already in use");
525 0           return $self;
526             }
527              
528 0 0         if (!$id) {
529 0           my $try_id = $self->{_last_chan_id};
530 0           for (1 .. $self->{_channel_max}) {
531 0 0         $try_id = 1 if ++$try_id > $self->{_channel_max};
532 0 0         unless (defined $self->{_channels}->{$try_id}) {
533 0           $id = $try_id;
534 0           last;
535             }
536             }
537 0 0         if (!$id) {
538 0           $args{on_failure}->('Ran out of channel ids');
539 0           return $self;
540             }
541 0           $self->{_last_chan_id} = $id;
542             }
543              
544             my $channel = AnyEvent::RabbitMQ::Channel->new(
545             id => $id,
546             connection => $self,
547             on_close => $args{on_close},
548 0           );
549              
550 0           $self->{_channels}->{$id} = $channel;
551              
552             $channel->open(
553             on_success => sub {
554 0     0     $args{on_success}->($channel);
555             },
556             on_failure => sub {
557 0     0     $self->_delete_channel($channel);
558 0           $args{on_failure}->(@_);
559             },
560 0           );
561              
562 0           return $self;
563             }
564              
565             sub _push_write_and_read {
566 0     0     my $self = shift;
567 0           my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
568              
569 0           $method = 'Net::AMQP::Protocol::' . $method;
570 0           $self->_push_write(
571             Net::AMQP::Frame::Method->new(
572             method_frame => $method->new(%$args)
573             ),
574             $id,
575             );
576              
577 0           return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
578             }
579              
580             sub _push_read_and_valid {
581 0     0     my $self = shift;
582 0           my ($exp, $cb, $failure_cb, $id,) = @_;
583 0 0         $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
584              
585 0           my $queue;
586 0 0         if (!$id) {
    0          
587 0           $queue = $self->{_queue};
588             } elsif (defined $self->{_channels}->{$id}) {
589 0           $queue = $self->{_channels}->{$id}->queue;
590             } else {
591 0           $failure_cb->('Unknown channel id: ' . $id);
592             }
593              
594 0 0         return unless $queue; # Can go away in global destruction..
595             $queue->get(sub {
596 0     0     my $frame = shift;
597              
598 0 0         return $failure_cb->('Received data is not method frame')
599             if !$frame->isa('Net::AMQP::Frame::Method');
600              
601 0           my $method_frame = $frame->method_frame;
602 0           for my $exp_elem (@$exp) {
603 0 0         return $cb->($frame)
604             if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
605             }
606              
607 0 0         $failure_cb->(
608             $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
609             ? 'Channel closed'
610             : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
611             );
612 0           });
613             }
614              
615             sub _push_write {
616 0     0     my $self = shift;
617 0           my ($output, $id,) = @_;
618              
619 0 0         if ($output->isa('Net::AMQP::Protocol::Base')) {
620 0           $output = $output->frame_wrap;
621             }
622 0   0       $output->channel($id || 0);
623              
624 0 0         if ($self->{verbose}) {
625 0           warn '[C] --> [S] ', Dumper($output);
626             }
627              
628             $self->{_handle}->push_write($output->to_raw_frame())
629 0 0         if $self->{_handle}; # Careful - could have gone (global destruction)
630 0           return;
631             }
632              
633             sub _set_cbs {
634 0     0     my $self = shift;
635 0           my %args = @_;
636              
637 0   0 0     $args{on_success} ||= sub {};
638 0 0 0 0     $args{on_failure} ||= sub { die @_ unless in_global_destruction };
  0            
639              
640 0           return %args;
641             }
642              
643             sub _check_open {
644 0     0     my $self = shift;
645 0           my ($failure_cb) = @_;
646              
647 0 0         return 1 if $self->is_open;
648              
649 0           $failure_cb->('Connection has already been closed');
650 0           return 0;
651             }
652              
653             sub drain_writes {
654 0     0 0   my ($self, $timeout) = shift;
655 0           $self->{drain_condvar} = AnyEvent->condvar;
656 0 0         if ($timeout) {
657             $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
658 0     0     $self->{drain_condvar}->croak("Timed out after $timeout");
659 0           });
660             }
661 0           $self->{drain_condvar}->recv;
662 0           delete $self->{drain_timer};
663             }
664              
665             sub DESTROY {
666 0     0     my $self = shift;
667 0 0         $self->close() unless in_global_destruction;
668 0           return;
669             }
670              
671             1;
672             __END__