File Coverage

blib/lib/AnyEvent/RabbitMQ.pm
Criterion Covered Total %
statement 51 351 14.5
branch 0 142 0.0
condition 0 48 0.0
subroutine 17 64 26.5
pod 0 11 0.0
total 68 616 11.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ;
2              
3 1     1   108715 use strict;
  1         3  
  1         29  
4 1     1   6 use warnings;
  1         2  
  1         27  
5 1     1   5 use Carp qw(confess croak);
  1         2  
  1         47  
6 1     1   5 use Scalar::Util qw(refaddr);
  1         2  
  1         56  
7 1     1   689 use List::MoreUtils qw(none);
  1         14195  
  1         7  
8 1     1   1692 use Devel::GlobalDestruction;
  1         2566  
  1         7  
9 1     1   641 use File::ShareDir;
  1         14446  
  1         47  
10 1     1   675 use Readonly;
  1         4229  
  1         54  
11 1     1   8 use Scalar::Util qw/ weaken /;
  1         2  
  1         186  
12              
13             require Data::Dumper;
14             sub Dumper {
15             local $Data::Dumper::Terse = 1;
16             local $Data::Dumper::Indent = 1;
17             local $Data::Dumper::Useqq = 1;
18             local $Data::Dumper::Deparse = 1;
19             local $Data::Dumper::Quotekeys = 0;
20             local $Data::Dumper::Sortkeys = 1;
21             &Data::Dumper::Dumper
22             }
23              
24 1     1   932 use AnyEvent::Handle;
  1         28319  
  1         45  
25 1     1   667 use AnyEvent::Socket;
  1         16477  
  1         106  
26              
27 1     1   542 use Net::AMQP 0.06;
  1         88452  
  1         40  
28 1     1   63 use Net::AMQP::Common qw(:all);
  1         4  
  1         229  
29              
30 1     1   635 use AnyEvent::RabbitMQ::Channel;
  1         3  
  1         78  
31 1     1   8 use AnyEvent::RabbitMQ::LocalQueue;
  1         2  
  1         23  
32              
33 1     1   14 use namespace::clean;
  1         6  
  1         4  
34              
35             our $VERSION = '1.21_01'; # TRIAL VERSION
36             $VERSION = eval $VERSION;
37              
38             use constant {
39 1         4646 _ST_CLOSED => 0,
40             _ST_OPENING => 1,
41             _ST_OPEN => 2,
42             _ST_CLOSING => 3,
43 1     1   696 };
  1         2  
44              
45             Readonly my $DEFAULT_AMQP_SPEC
46             => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
47              
48             Readonly my $DEFAULT_CHANNEL_MAX => 2**16-1;
49              
50             sub new {
51 0     0 0   my $class = shift;
52 0           return bless {
53             verbose => 0,
54             @_,
55             _state => _ST_CLOSED,
56             _queue => AnyEvent::RabbitMQ::LocalQueue->new,
57             _last_chan_id => 0,
58             _channels => {},
59             _login_user => '',
60             _server_properties => {},
61             _frame_max => undef,
62             _body_max => undef,
63             _channel_max => undef,
64             }, $class;
65             }
66              
67             sub verbose {
68 0     0 0   my $self = shift;
69             @_ ? ($self->{verbose} = shift) : $self->{verbose}
70 0 0         }
71              
72             sub is_open {
73 0     0 0   my $self = shift;
74 0           $self->{_state} == _ST_OPEN
75             }
76              
77             sub channels {
78 0     0 0   my $self = shift;
79 0           return $self->{_channels};
80             }
81              
82             sub _delete_channel {
83 0     0     my $self = shift;
84 0           my ($channel,) = @_;
85 0           my $c = $self->{_channels}->{$channel->id};
86 0 0 0       if (defined($c) && refaddr($c) == refaddr($channel)) {
87 0           delete $self->{_channels}->{$channel->id};
88 0           return 1;
89             }
90 0           return 0;
91             }
92              
93             sub login_user {
94 0     0 0   my $self = shift;
95 0           return $self->{_login_user};
96             }
97              
98             my $_loaded_spec;
99             sub load_xml_spec {
100 0     0 0   my $self = shift;
101 0           my ($spec) = @_;
102 0   0       $spec ||= $DEFAULT_AMQP_SPEC;
103 0 0 0       if ($_loaded_spec && $_loaded_spec ne $spec) {
    0          
104 0           croak("Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible");
105             }
106             elsif (!$_loaded_spec) {
107 0           Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
108             }
109 0           return $self;
110             }
111              
112             sub connect {
113 0     0 0   my $self = shift;
114 0           my %args = $self->_set_cbs(@_);
115              
116 0 0         if ($self->{_state} != _ST_CLOSED) {
117 0           $args{on_failure}->('Connection has already been opened');
118 0           return $self;
119             }
120              
121 0   0 0     $args{on_close} ||= sub {};
122 0   0 0     $args{on_read_failure} ||= sub {warn @_, "\n"};
  0            
123 0   0       $args{timeout} ||= 0;
124              
125 0           for (qw/ host port /) {
126 0 0         $args{$_} or return $args{on_failure}->("No $_ passed to connect");
127             }
128              
129 0 0         if ($self->{verbose}) {
130 0           warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
131             }
132              
133 0           $self->{_state} = _ST_OPENING;
134              
135 0           weaken(my $weak_self = $self);
136 0           my $conn; $conn = AnyEvent::Socket::tcp_connect(
137             $args{host},
138             $args{port},
139             sub {
140 0     0     undef $conn;
141 0 0         my $self = $weak_self or return;
142              
143 0           my $fh = shift;
144              
145 0 0         unless ($fh) {
146 0           $self->{_state} = _ST_CLOSED;
147             return $args{on_failure}->(
148 0           sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
149             );
150             }
151              
152 0           my $close_cb = $args{on_close};
153 0           my $failure_cb = $args{on_failure};
154             $self->{_handle} = AnyEvent::Handle->new(
155             fh => $fh,
156             on_error => sub {
157 0           my ($handle, $fatal, $message) = @_;
158 0 0         my $self = $weak_self or return;
159              
160 0 0         if ($self->is_open) {
161 0           $self->_server_closed($close_cb, $message);
162             }
163             else {
164 0           $failure_cb->(@_);
165             }
166             },
167             on_drain => sub {
168 0           my ($handle) = @_;
169 0 0         my $self = $weak_self or return;
170              
171             $self->{drain_condvar}->send
172 0 0         if exists $self->{drain_condvar};
173             },
174             peername => $args{host},
175             $args{tls} ? (tls => 'connect') : (),
176             $args{tls_ctx} ? ( tls_ctx => $args{tls_ctx} ) : (),
177 0 0         $args{nodelay} ? ( nodelay => $args{nodelay} ) : (),
    0          
    0          
178             );
179 0           $self->_read_loop($args{on_close}, $args{on_read_failure});
180 0           $self->_start(%args,);
181             },
182             sub {
183 0     0     return $args{timeout};
184             },
185 0           );
186              
187 0           return $self;
188             }
189              
190             sub server_properties {
191 0     0 0   return shift->{_server_properties};
192             }
193              
194             sub _read_loop {
195 0     0     my ($self, $close_cb, $failure_cb,) = @_;
196              
197 0 0         return if !defined $self->{_handle}; # called on_error
198              
199 0           weaken(my $weak_self = $self);
200             $self->{_handle}->push_read(chunk => 8, sub {
201 0 0   0     my $self = $weak_self or return;
202 0           my $data = $_[1];
203 0           my $stack = $_[1];
204              
205 0 0         if (length($data) <= 7) {
206 0           $failure_cb->('Broken data was received');
207 0           @_ = ($self, $close_cb, $failure_cb,);
208 0           goto &_read_loop;
209             }
210              
211 0           my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
212 0 0 0       if (!defined $type_id || !defined $channel || !defined $length) {
      0        
213 0           $failure_cb->('Broken data was received');
214 0           @_ = ($self, $close_cb, $failure_cb,);
215 0           goto &_read_loop;
216             }
217              
218             $self->{_handle}->push_read(chunk => $length, sub {
219 0 0         my $self = $weak_self or return;
220 0           $stack .= $_[1];
221 0           my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
222              
223 0 0         $self->{_heartbeat_recv} = time if $self->{_heartbeat_timer};
224              
225 0 0         if ($self->{verbose}) {
226 0           warn '[C] <-- [S] ', Dumper($frame),
227             '-----------', "\n";
228             }
229              
230 0           my $id = $frame->channel;
231 0 0         if (0 == $id) {
232 0 0         if ($frame->type_id == 8) {
233             # Heartbeat, no action needs taking.
234             }
235             else {
236 0 0         return unless $self->_check_close_and_clean($frame, $close_cb,);
237 0           $self->{_queue}->push($frame);
238             }
239             } else {
240 0           my $channel = $self->{_channels}->{$id};
241 0 0         if (defined $channel) {
242 0           $channel->push_queue_or_consume($frame, $failure_cb);
243             } else {
244 0           $failure_cb->('Unknown channel id: ' . $frame->channel);
245             }
246             }
247              
248 0           @_ = ($self, $close_cb, $failure_cb,);
249 0           goto &_read_loop;
250 0           });
251 0           });
252              
253 0           return $self;
254             }
255              
256             sub _check_close_and_clean {
257 0     0     my $self = shift;
258 0           my ($frame, $close_cb,) = @_;
259              
260 0 0         my $method_frame = $frame->isa('Net::AMQP::Frame::Method') ? $frame->method_frame : undef;
261              
262 0 0         if ($self->{_state} == _ST_CLOSED) {
263 0   0       return $method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::CloseOk');
264             }
265              
266 0 0 0       if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
267 0           delete $self->{_heartbeat_timer};
268 0           $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
269 0           $self->_server_closed($close_cb, $frame);
270 0           return;
271             }
272              
273 0           return 1;
274             }
275              
276             sub _server_closed {
277 0     0     my $self = shift;
278 0           my ($close_cb, $why,) = @_;
279              
280 0           $self->{_state} = _ST_CLOSING;
281 0           for my $channel (values %{ $self->{_channels} }) {
  0            
282 0 0         $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
283             }
284 0           $self->{_channels} = {};
285 0           $self->{_handle}->push_shutdown;
286 0           $self->{_state} = _ST_CLOSED;
287              
288 0           $close_cb->($why);
289 0           return;
290             }
291              
292             sub _start {
293 0     0     my $self = shift;
294 0           my %args = @_;
295              
296 0 0         if ($self->{verbose}) {
297 0           warn 'post header', "\n";
298             }
299              
300 0           $self->{_handle}->push_write(Net::AMQP::Protocol->header);
301              
302             $self->_push_read_and_valid(
303             'Connection::Start',
304             sub {
305 0     0     my $frame = shift;
306              
307 0           my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
308             return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
309 0 0         if none {$_ eq 'AMQPLAIN'} @mechanisms;
  0            
310              
311 0           my @locales = split /\s/, $frame->method_frame->locales;
312             return $args{on_failure}->('en_US is not found in locales')
313 0 0         if none {$_ eq 'en_US'} @locales;
  0            
314              
315 0           $self->{_server_properties} = $frame->method_frame->server_properties;
316              
317             $self->_push_write(
318             Net::AMQP::Protocol::Connection::StartOk->new(
319             client_properties => {
320             platform => 'Perl',
321             product => __PACKAGE__,
322             information => 'http://d.hatena.ne.jp/cooldaemon/',
323             version => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
324             capabilities => {
325             consumer_cancel_notify => Net::AMQP::Value::true,
326             exchange_exchange_bindings => Net::AMQP::Value::true,
327             },
328 0 0         %{ $args{client_properties} || {} },
329             },
330             mechanism => 'AMQPLAIN',
331             response => {
332             LOGIN => $args{user},
333             PASSWORD => $args{pass},
334             },
335 0           locale => 'en_US',
336             ),
337             );
338              
339 0           $self->_tune(%args,);
340             },
341             $args{on_failure},
342 0           );
343              
344 0           return $self;
345             }
346              
347             sub _tune {
348 0     0     my $self = shift;
349 0           my %args = @_;
350              
351 0           weaken(my $weak_self = $self);
352             $self->_push_read_and_valid(
353             'Connection::Tune',
354             sub {
355 0 0   0     my $self = $weak_self or return;
356 0           my $frame = shift;
357              
358 0           my %tune;
359 0           foreach (qw( channel_max frame_max heartbeat )) {
360 0   0       my $client = $args{tune}{$_} || 0;
361 0   0       my $server = $frame->method_frame->$_ || 0;
362              
363             # negotiate with the server such that we cannot request a larger
364             # value set by the server, unless the server said unlimited
365 0 0 0       $tune{$_} = ($server == 0 or $client == 0)
    0          
    0          
366             ? ($server > $client ? $server : $client) # max
367             : ($client > $server ? $server : $client); # min
368             }
369              
370 0 0         if ($self->{_frame_max} = $tune{frame_max}) {
371             # calculate how big the body can actually be
372 0           $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
373             }
374              
375 0   0       $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;
376              
377 0           $self->_push_write(
378             Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
379             );
380              
381 0 0         if ($tune{heartbeat} > 0) {
382 0           $self->_start_heartbeat($tune{heartbeat}, %args,);
383             }
384              
385 0           $self->_open(%args,);
386             },
387             $args{on_failure},
388 0           );
389              
390 0           return $self;
391             }
392              
393             sub _start_heartbeat {
394 0     0     my ($self, $interval, %args,) = @_;
395              
396 0           my $close_cb = $args{on_close};
397 0           my $failure_cb = $args{on_read_failure};
398 0           my $last_recv = 0;
399 0           my $idle_cycles = 0;
400 0           weaken(my $weak_self = $self);
401             my $timer_cb = sub {
402 0 0   0     my $self = $weak_self or return;
403 0 0         if ($self->{_heartbeat_recv} != $last_recv) {
    0          
404 0           $last_recv = $self->{_heartbeat_recv};
405 0           $idle_cycles = 0;
406             }
407             elsif (++$idle_cycles > 1) {
408 0           delete $self->{_heartbeat_timer};
409 0           $failure_cb->("Heartbeat lost");
410 0           $self->_server_closed($close_cb, "Heartbeat lost");
411 0           return;
412             }
413 0           $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
414 0           };
415              
416 0           $self->{_heartbeat_recv} = time;
417 0           $self->{_heartbeat_timer} = AnyEvent->timer(
418             after => $interval,
419             interval => $interval,
420             cb => $timer_cb,
421             );
422              
423 0           return $self;
424             }
425              
426             sub _open {
427 0     0     my $self = shift;
428 0           my %args = @_;
429              
430             $self->_push_write_and_read(
431             'Connection::Open',
432             {
433             virtual_host => $args{vhost},
434             insist => 1,
435             },
436             'Connection::OpenOk',
437             sub {
438 0     0     $self->{_state} = _ST_OPEN;
439 0           $self->{_login_user} = $args{user};
440 0           $args{on_success}->($self);
441             },
442             $args{on_failure},
443 0           );
444              
445 0           return $self;
446             }
447              
448             sub close {
449 0 0   0 0   return if in_global_destruction;
450 0           my $self = shift;
451 0           my %args = $self->_set_cbs(@_);
452              
453 0 0         if ($self->{_state} == _ST_CLOSED) {
454 0           $args{on_success}->(@_);
455 0           return $self;
456             }
457 0 0         if ($self->{_state} != _ST_OPEN) {
458 0 0         $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
459 0           return $self;
460             }
461 0           $self->{_state} = _ST_CLOSING;
462              
463             my $cv = AE::cv {
464 0     0     delete $self->{_closing};
465 0           $self->_finish_close(%args);
466 0           };
467              
468 0           $cv->begin();
469              
470 0           my @ids = keys %{$self->{_channels}};
  0            
471 0           for my $id (@ids) {
472 0           my $channel = $self->{_channels}->{$id};
473 0 0         if ($channel->is_open) {
474 0           $cv->begin();
475             $channel->close(
476 0     0     on_success => sub { $cv->end() },
477 0     0     on_failure => sub { $cv->end() },
478 0           );
479             }
480             }
481              
482 0           $cv->end();
483              
484 0           return $self;
485             }
486              
487             sub _finish_close {
488 0     0     my $self = shift;
489 0           my %args = @_;
490              
491 0 0         if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
  0 0          
  0            
  0            
492 0           $args{on_failure}->("BUG: closing with channel(s) open: @ch");
493 0           return;
494             }
495              
496 0           $self->{_state} = _ST_CLOSED;
497              
498             $self->_push_write_and_read(
499             'Connection::Close', {}, 'Connection::CloseOk',
500             sub {
501             # circular ref ok
502 0     0     $self->{_handle}->push_shutdown;
503 0           $args{on_success}->(@_);
504             },
505             sub {
506             # circular ref ok
507 0     0     $self->{_handle}->push_shutdown;
508 0           $args{on_failure}->(@_);
509             },
510 0           );
511              
512 0           return;
513             }
514              
515             sub open_channel {
516 0     0 0   my $self = shift;
517 0           my %args = $self->_set_cbs(@_);
518              
519 0 0         return $self if !$self->_check_open($args{on_failure});
520              
521 0   0 0     $args{on_close} ||= sub {};
522              
523 0           my $id = $args{id};
524 0 0 0       if ($id && $self->{_channels}->{$id}) {
525 0           $args{on_failure}->("Channel id $id is already in use");
526 0           return $self;
527             }
528              
529 0 0         if (!$id) {
530 0           my $try_id = $self->{_last_chan_id};
531 0           for (1 .. $self->{_channel_max}) {
532 0 0         $try_id = 1 if ++$try_id > $self->{_channel_max};
533 0 0         unless (defined $self->{_channels}->{$try_id}) {
534 0           $id = $try_id;
535 0           last;
536             }
537             }
538 0 0         if (!$id) {
539 0           $args{on_failure}->('Ran out of channel ids');
540 0           return $self;
541             }
542 0           $self->{_last_chan_id} = $id;
543             }
544              
545             my $channel = AnyEvent::RabbitMQ::Channel->new(
546             id => $id,
547             connection => $self,
548             on_close => $args{on_close},
549 0           );
550              
551 0           $self->{_channels}->{$id} = $channel;
552              
553             $channel->open(
554             on_success => sub {
555 0     0     $args{on_success}->($channel);
556             },
557             on_failure => sub {
558 0     0     $self->_delete_channel($channel);
559 0           $args{on_failure}->(@_);
560             },
561 0           );
562              
563 0           return $self;
564             }
565              
566             sub _push_write_and_read {
567 0     0     my $self = shift;
568 0           my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
569              
570 0           $method = 'Net::AMQP::Protocol::' . $method;
571 0           $self->_push_write(
572             Net::AMQP::Frame::Method->new(
573             method_frame => $method->new(%$args)
574             ),
575             $id,
576             );
577              
578 0           return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
579             }
580              
581             sub _push_read_and_valid {
582 0     0     my $self = shift;
583 0           my ($exp, $cb, $failure_cb, $id,) = @_;
584 0 0         $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
585              
586 0           my $queue;
587 0 0         if (!$id) {
    0          
588 0           $queue = $self->{_queue};
589             } elsif (defined $self->{_channels}->{$id}) {
590 0           $queue = $self->{_channels}->{$id}->queue;
591             } else {
592 0           $failure_cb->('Unknown channel id: ' . $id);
593             }
594              
595 0 0         return unless $queue; # Can go away in global destruction..
596             $queue->get(sub {
597 0     0     my $frame = shift;
598              
599 0 0         return $failure_cb->('Received data is not method frame')
600             if !$frame->isa('Net::AMQP::Frame::Method');
601              
602 0           my $method_frame = $frame->method_frame;
603 0           for my $exp_elem (@$exp) {
604 0 0         return $cb->($frame)
605             if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
606             }
607              
608 0 0         $failure_cb->(
609             $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
610             ? 'Channel closed'
611             : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
612             );
613 0           });
614             }
615              
616             sub _push_write {
617 0     0     my $self = shift;
618 0           my ($output, $id,) = @_;
619              
620 0 0         if ($output->isa('Net::AMQP::Protocol::Base')) {
621 0           $output = $output->frame_wrap;
622             }
623 0   0       $output->channel($id || 0);
624              
625 0 0         if ($self->{verbose}) {
626 0           warn '[C] --> [S] ', Dumper($output);
627             }
628              
629             $self->{_handle}->push_write($output->to_raw_frame())
630 0 0         if $self->{_handle}; # Careful - could have gone (global destruction)
631 0           return;
632             }
633              
634             sub _set_cbs {
635 0     0     my $self = shift;
636 0           my %args = @_;
637              
638 0   0 0     $args{on_success} ||= sub {};
639 0 0 0 0     $args{on_failure} ||= sub { die @_ unless in_global_destruction };
  0            
640              
641 0           return %args;
642             }
643              
644             sub _check_open {
645 0     0     my $self = shift;
646 0           my ($failure_cb) = @_;
647              
648 0 0         return 1 if $self->is_open;
649              
650 0           $failure_cb->('Connection has already been closed');
651 0           return 0;
652             }
653              
654             sub drain_writes {
655 0     0 0   my ($self, $timeout) = shift;
656 0           $self->{drain_condvar} = AnyEvent->condvar;
657 0 0         if ($timeout) {
658             $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
659 0     0     $self->{drain_condvar}->croak("Timed out after $timeout");
660 0           });
661             }
662 0           $self->{drain_condvar}->recv;
663 0           delete $self->{drain_timer};
664             }
665              
666             sub DESTROY {
667 0     0     my $self = shift;
668 0 0         $self->close() unless in_global_destruction;
669 0           return;
670             }
671              
672             1;
673             __END__