File Coverage

blib/lib/Net/Async/AMQP.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package Net::Async::AMQP;
2             # ABSTRACT: IO::Async support for the AMQP protocol
3 4     4   95317 use strict;
  4         10  
  4         106  
4 4     4   21 use warnings;
  4         6  
  4         112  
5              
6 4     4   3051 use parent qw(IO::Async::Notifier);
  4         1229  
  4         24  
7              
8             our $VERSION = '0.031';
9              
10             =head1 NAME
11              
12             Net::Async::AMQP - provides client interface to AMQP using L
13              
14             =head1 VERSION
15              
16             version 0.031
17              
18             =head1 SYNOPSIS
19              
20             use IO::Async::Loop;
21             use Net::Async::AMQP;
22             my $loop = IO::Async::Loop->new;
23             $loop->add(my $amqp = Net::Async::AMQP->new);
24             $amqp->connect(
25             host => 'localhost',
26             user => 'guest',
27             pass => 'guest',
28             )->get;
29              
30             =head1 DESCRIPTION
31              
32             Does AMQP things. Note that the API may change before the stable 1.000
33             release - L has some alternative modules if you're looking for
34             something that has been around for longer.
35              
36             If you want a higher-level API which manages channels and connections, try
37             L.
38              
39             =head2 AMQP support
40              
41             The following AMQP features are supported:
42              
43             =over 4
44              
45             =item * Queue declare, bind, delete
46              
47             =item * Exchange declare, delete
48              
49             =item * Consumer setup and cancellation
50              
51             =item * Message publishing
52              
53             =item * Explicit ACK
54              
55             =item * QoS
56              
57             =item * SSL
58              
59             =back
60              
61             =head2 RabbitMQ-specific features
62              
63             RabbitMQ provides some additional features:
64              
65             =over 4
66              
67             =item * Exchange-to-exchange binding
68              
69             =item * Server flow control notification
70              
71             =item * Consumer cancellation notification
72              
73             =back
74              
75             =head2 Missing features
76              
77             The following features aren't currently implemented - raise a request via RT or by email (L)
78             if you want any of these:
79              
80             =over 4
81              
82             =item * Transactions
83              
84             =item * Flow control
85              
86             =item * SASL auth
87              
88             =back
89              
90             =cut
91              
92 4     4   104680 use Net::AMQP;
  0            
  0            
93             use Net::AMQP::Common qw(:all);
94              
95             use Future;
96             use curry::weak;
97             use Class::ISA ();
98             use List::Util qw(min);
99             use List::UtilsBy qw(extract_by);
100             use File::ShareDir ();
101             use Time::HiRes ();
102             use Scalar::Util qw(weaken);
103             use Mixin::Event::Dispatch::Bus;
104              
105             =head1 CONSTANTS
106              
107             =head2 AUTH_MECH
108              
109             Defines the mechanism used for authentication. Currently only AMQPLAIN
110             is supported.
111              
112             =cut
113              
114             use constant AUTH_MECH => 'AMQPLAIN';
115              
116             =head2 PAYLOAD_HEADER_LENGTH
117              
118             Length of header used in payload messages. Defined by the AMQP standard
119             as 8 bytes.
120              
121             =cut
122              
123             use constant PAYLOAD_HEADER_LENGTH => 8;
124              
125             =head2 MAX_FRAME_SIZE
126              
127             Largest amount of data we'll attempt to send in a single frame. Actual
128             frame limit will be negotiated with the remote server. Defaults to 262144.
129              
130             =cut
131              
132             use constant MAX_FRAME_SIZE => 262144;
133              
134             =head2 MAX_CHANNELS
135              
136             Maximum number of channels to request. Defaults to the AMQP limit (65535).
137             Attempting to set this any higher will not end well, it's an unsigned 16-bit
138             value.
139              
140             =cut
141              
142             use constant MAX_CHANNELS => 65535;
143              
144             =head2 HEARTBEAT_INTERVAL
145              
146             Interval in seconds between heartbeat frames, zero to disable. Can be
147             overridden by C in the environment, default
148             is 0 (disabled).
149              
150             =cut
151              
152             use constant HEARTBEAT_INTERVAL => $ENV{PERL_AMQP_HEARTBEAT_INTERVAL} // 0;
153              
154             use Net::Async::AMQP::Channel;
155             use Net::Async::AMQP::Queue;
156             use Net::Async::AMQP::Utils;
157              
158             =head1 PACKAGE VARIABLES
159              
160             =head2 $XML_SPEC
161              
162             This defines the path to the AMQP XML spec, which L uses
163             to create methods and handlers for the appropriate version of the MQ
164             protocol.
165              
166             Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ,
167             this is found in the C distribution sharedir (see
168             L).
169              
170             Normally, you should be able to ignore this. If you want to load an alternative
171             spec, note that (a) this is global, rather than per-instance, (b) it needs to
172             be set before you C this module.
173              
174             BEGIN { $Net::Async::AMQP::XML_SPEC = '/tmp/amqp.xml' }
175             use Net::Async::AMQP;
176              
177             Once loaded, this module will not attempt to apply the spec again.
178              
179             =cut
180              
181             our $XML_SPEC;
182             our $SPEC_LOADED;
183             BEGIN {
184             $XML_SPEC //= File::ShareDir::dist_file(
185             'Net-Async-AMQP',
186             'amqp0-9-1.extended.xml'
187             );
188              
189             # Load the appropriate protocol definitions. RabbitMQ uses a
190             # modified version of AMQP 0.9.1
191             Net::AMQP::Protocol->load_xml_spec($XML_SPEC) unless $SPEC_LOADED++;
192             }
193              
194             =head1 %CONNECTION_DEFAULTS
195              
196             The default parameters to use for L. Changing these values is permitted,
197             but do not attempt to delete from or add any entries to the hash.
198              
199             Passing parameters directly to L is much safer, please do that instead.
200              
201             =cut
202              
203             our %CONNECTION_DEFAULTS = (
204             port => 5672,
205             host => 'localhost',
206             user => 'guest',
207             pass => 'guest',
208             );
209              
210             =head1 METHODS
211              
212             =cut
213              
214             =head2 configure
215              
216             Set up variables. Takes the following optional named parameters:
217              
218             =over 4
219              
220             =item * heartbeat_interval - (optional) interval between heartbeat messages,
221             default is set by the L constant
222              
223             =item * max_channels - how many channels to allow on this connection,
224             default is defined by the L constant
225              
226             =back
227              
228             Returns the new instance.
229              
230             =cut
231              
232             sub configure {
233             my ($self, %args) = @_;
234             for (qw(heartbeat_interval max_channels)) {
235             $self->{$_} = delete $args{$_} if exists $args{$_}
236             }
237             $self->SUPER::configure(%args)
238             }
239              
240             =head2 bus
241              
242             Event bus. Used for sharing global events such as connection closure.
243              
244             =cut
245              
246             sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }
247              
248             =head2 connect
249              
250             Takes the following parameters:
251              
252             =over 4
253              
254             =item * port - the AMQP port, defaults to 5672, can be a service name if preferred
255              
256             =item * host - host to connect to, defaults to localhost
257              
258             =item * local_host - our local IP to connect from
259              
260             =item * user - which user to connect as, defaults to guest
261              
262             =item * pass - the password for this user, defaults to guest
263              
264             =item * ssl - true if you want to connect over SSL
265              
266             =item * SSL_* - SSL-specific parameters, see L and L for details
267              
268             =back
269              
270             Returns $self.
271              
272             =cut
273              
274             sub connect {
275             my $self = shift;
276             my %args = @_;
277              
278             die 'no loop' unless my $loop = $self->loop;
279              
280             my $f = $self->loop->new_future;
281              
282             # Apply defaults
283             $self->{$_} = $args{$_} //= $CONNECTION_DEFAULTS{$_} for keys %CONNECTION_DEFAULTS;
284              
285             # Remember our event callbacks so we can unsubscribe
286             my $connected;
287             my $close;
288              
289             # Clean up once we succeed/fail
290             $f->on_ready(sub {
291             $self->bus->unsubscribe_from_event(close => $close) if $close;
292             $self->bus->unsubscribe_from_event(connected => $connected) if $connected;
293             undef $close;
294             undef $connected;
295             undef $self;
296             undef $f;
297             });
298              
299             # One-shot event on connection
300             $self->bus->subscribe_to_event(connected => $connected = sub {
301             $f->done($self) unless $f->is_ready;
302             });
303             # Also pick up connection termination
304             $self->bus->subscribe_to_event(close => $close = sub {
305             $f->fail(connect => 'Remote closed connection') unless $f->is_ready;
306             });
307              
308             # Support SSL connection
309             require IO::Async::SSL if $args{ssl};
310             my $method = $args{ssl} ? 'SSL_connect' : 'connect';
311             $loop->$method(
312             host => $self->{host},
313             # local_host can be used to send from a different source address,
314             # sometimes useful for routing purposes or loadtesting
315             (exists $args{local_host} ? (local_host => $args{local_host}) : ()),
316             service => $self->{port},
317             socktype => 'stream',
318              
319             on_stream => $self->curry::on_stream(\%args),
320              
321             on_resolve_error => $f->curry::fail('resolve'),
322             on_connect_error => $f->curry::fail('connect'),
323             ($args{ssl}
324             ? (on_ssl_error => $f->curry::fail('ssl'))
325             : ()
326             ),
327             (map {; $_ => $args{$_} } grep /^SSL/, keys %args)
328             );
329             $f;
330             }
331              
332             =head2 on_stream
333              
334             Called once the underlying TCP connection has been established.
335              
336             Returns nothing of importance.
337              
338             =cut
339              
340             sub on_stream {
341             my ($self, $args, $stream) = @_;
342             $self->debug_printf("Stream received");
343             $self->{stream} = $stream;
344             $stream->configure(
345             on_read => $self->curry::weak::on_read,
346             );
347             $self->add_child($stream);
348             $self->apply_heartbeat_timer if $self->heartbeat_interval;
349             $self->post_connect(%$args);
350             return;
351             }
352              
353             sub dump_frame {
354             my ($self, $pkt) = @_;
355             my ($type) = unpack 'C1', substr $pkt, 0, 1, '';
356             printf "Type: %02x (%s)\n", $type, {
357             1 => 'Method',
358             }->{$type};
359              
360             my ($chan) = unpack 'n1', substr $pkt, 0, 2, '';
361             printf "Channel: %d\n", $chan;
362              
363             my ($len) = unpack 'N1', substr $pkt, 0, 4, '';
364             printf "Length: %d bytes\n", $len;
365              
366             if($type == 1) {
367             my ($class, $method) = unpack 'n1n1', substr $pkt, 0, 4, '';
368             printf "Class: %s\n", $class;
369             printf "Method: %s\n", $method;
370             }
371             }
372              
373             =head2 on_read
374              
375             Called whenever there's data available to be read.
376              
377             =cut
378              
379             sub on_read {
380             my ($self, $stream, $buffref, $eof) = @_;
381             # Frame dumping support - not that useful yet, so it's disabled
382             if(0) {
383             my $mem = $$buffref;
384             $self->dump_frame($mem);
385             my $idx = 0;
386             while(length $mem) {
387             my $hex = join ' ', unpack 'H2'x16, my $bytes = substr $mem, 0, 16, '';
388             substr $hex, 8 * 3, 0, ' ';
389             my $asc = join '', map /([[:print:]])/ ? $1 : '.', split //, $bytes;
390             substr $asc, 8, 0, ' ';
391             printf "%8d: %-52.52s %s\n", $idx, $hex, $asc;
392             $idx += length($asc);
393             }
394             print "\n";
395             $self->debug_printf("At EOF") if $eof;
396             }
397              
398             $self->last_frame_time(Time::HiRes::time);
399              
400             # As each frame is parsed it will be removed from the buffer
401             $self->process_frame($_) for Net::AMQP->parse_raw_frames($buffref);
402             $self->on_closed if $eof;
403             return 0;
404             }
405              
406             =head2 on_closed
407              
408             Called when the TCP connection is closed.
409              
410             =cut
411              
412             sub on_closed {
413             my $self = shift;
414             my $reason = shift // 'unknown';
415             $self->debug_printf("Connection closed [%s]", $reason);
416              
417             for my $ch (values %{$self->{channel_by_id}}) {
418             $ch->bus->invoke_event(
419             'close',
420             # code => 999,
421             message => 'Connection closed: ' . $reason,
422             );
423             $self->channel_closed($ch->id);
424             }
425              
426             # Clean up any mismatching entries in the Future map
427             $_->cancel for grep !$_->is_ready, values %{$self->{channel_map}};
428             $self->{channel_map} = {};
429              
430             $self->stream->close if $self->stream;
431             for (qw(stream heartbeat_send_timer heartbeat_receive_timer)) {
432             $self->remove_child(delete $self->{$_}) if $self->{$_};
433             }
434             $self->bus->invoke_event(close => $reason)
435             }
436              
437             =head2 post_connect
438              
439             Sends initial startup header and applies listener for the C< Connection::Start > message.
440              
441             Returns $self.
442              
443             =cut
444              
445             sub post_connect {
446             my $self = shift;
447             my %args = @_;
448              
449             my %client_prop = (
450             platform => $args{platform} // 'Perl/NetAsyncAMQP',
451             product => $args{product} // __PACKAGE__,
452             information => $args{information} // 'http://search.cpan.org/perldoc?Net::Async::AMQP',
453             version => $args{version} // $VERSION,
454             ($args{client_properties} ? %{$args{client_properties}} : ()),
455             );
456              
457             $self->push_pending(
458             'Connection::Start' => sub {
459             my ($self, $frame) = @_;
460             my $method_frame = $frame->method_frame;
461             my @mech = split ' ', $method_frame->mechanisms;
462             die "Auth mechanism " . AUTH_MECH . " not supported, unable to continue - options were: @mech" unless grep $_ eq AUTH_MECH, @mech;
463             my $output = Net::AMQP::Frame::Method->new(
464             channel => 0,
465             method_frame => Net::AMQP::Protocol::Connection::StartOk->new(
466             client_properties => \%client_prop,
467             mechanism => AUTH_MECH,
468             locale => $args{locale} // 'en_GB',
469             response => {
470             LOGIN => $args{user},
471             PASSWORD => $args{pass},
472             },
473             ),
474             );
475             $self->setup_tuning(%args);
476             $self->send_frame($output);
477             }
478             );
479              
480             # Send the initial header bytes. It'd be nice
481             # if we could use L
482             # for this, but it seems to be sending 1 for
483             # the protocol ID, and the revision number is
484             # before the major/minor version.
485             # $self->write(Net::AMQP::Protocol->header);
486             $self->write($self->header_bytes);
487             $self
488             }
489              
490             =head2 setup_tuning
491              
492             Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.
493              
494             Returns $self.
495              
496             =cut
497              
498             sub setup_tuning {
499             my $self = shift;
500             my %args = @_;
501             $self->push_pending(
502             'Connection::Tune' => sub {
503             my ($self, $frame) = @_;
504             my $method_frame = $frame->method_frame;
505             # Lowest value for frame max wins - our predef constant, or whatever the server suggests
506             $self->frame_max(my $frame_max = min $method_frame->frame_max, $self->MAX_FRAME_SIZE);
507             $self->channel_max(my $channel_max = $method_frame->channel_max || $self->max_channels || $self->MAX_CHANNELS);
508             $self->debug_printf("Remote says %d channels, will use %d", $method_frame->channel_max, $channel_max);
509             $self->{channel} = 0;
510             $self->send_frame(
511             Net::AMQP::Protocol::Connection::TuneOk->new(
512             channel_max => $channel_max,
513             frame_max => $frame_max,
514             heartbeat => $self->heartbeat_interval,
515             )
516             );
517             $self->open_connection(%args);
518             }
519             );
520             }
521              
522             =head2 open_connection
523              
524             Establish a new connection to a vhost - this is called after tuning is complete,
525             and must happen before any channel connections are attempted.
526              
527             Returns $self.
528              
529             =cut
530              
531             sub open_connection {
532             my $self = shift;
533             my %args = @_;
534             $self->setup_connection(%args);
535             $self->send_frame(
536             Net::AMQP::Frame::Method->new(
537             method_frame => Net::AMQP::Protocol::Connection::Open->new(
538             virtual_host => $args{vhost} // '/',
539             capabilities => '',
540             insist => 1,
541             ),
542             )
543             );
544             $self
545             }
546              
547             =head2 setup_connection
548              
549             Applies listener for the Connection::OpenOk message, which triggers the
550             C event.
551              
552             Returns $self.
553              
554             =cut
555              
556             sub setup_connection {
557             my $self = shift;
558             my %args = @_;
559             $self->push_pending(
560             'Connection::OpenOk' => sub {
561             my ($self, $frame) = @_;
562             my $method_frame = $frame->method_frame;
563             $self->debug_printf("OpenOk received");
564             $self->bus->invoke_event(connected =>);
565             }
566             );
567             $self
568             }
569              
570             =head2 next_channel
571              
572             Returns the next available channel ready for L.
573             Note that whatever it reports will be completely wrong if you've
574             manually specified a channel anywhere, so don't do that.
575              
576             If channels have been closed on this connection, those IDs will be
577             reused in preference to handing out a new ID.
578              
579             =cut
580              
581             sub next_channel {
582             my $self = shift;
583             $self->{channel} //= 0;
584             return shift @{$self->{available_channel_id}} if @{$self->{available_channel_id} ||= [] };
585             return undef if $self->{channel} >= $self->channel_max;
586             ++$self->{channel}
587             }
588              
589             =head2 create_channel
590              
591             Returns a new ::Channel instance, populating the map of assigned channels in the
592             process. Takes a single parameter:
593              
594             =over 4
595              
596             =item * $id - the channel ID, can be undef to assign via L
597              
598             =back
599              
600             =cut
601              
602             sub create_channel {
603             my ($self, $id) = @_;
604             $id //= $self->next_channel;
605             die "No channel available" unless $id;
606              
607             my $f = $self->loop->new_future;
608             $self->{channel_map}{$id} = $f;
609             $self->add_child(
610             my $c = Net::Async::AMQP::Channel->new(
611             amqp => $self,
612             future => $f,
613             id => $id,
614             )
615             );
616             $self->{channel_by_id}{$id} = $c;
617             $self->debug_printf("Record channel %d as %s", $id, $c);
618             return $c;
619             }
620              
621             =head2 open_channel
622              
623             Opens a new channel.
624              
625             Returns the new L instance.
626              
627             =cut
628              
629             sub open_channel {
630             my $self = shift;
631             my %args = @_;
632             my $channel;
633             if($args{channel}) {
634             $channel = delete $args{channel};
635             extract_by { $channel == $_ } @{$self->{available_channel_id}} if exists $self->{available_channel_id};
636             } else {
637             $channel = $self->next_channel;
638             }
639             die "Channel " . $channel . " exists already: " . $self->{channel_map}{$channel} if exists $self->{channel_map}{$channel};
640             my $c = $self->create_channel($channel);
641             my $f = $c->future;
642              
643             my $frame = Net::AMQP::Frame::Method->new(
644             method_frame => Net::AMQP::Protocol::Channel::Open->new,
645             );
646             $frame->channel($channel);
647             $c->push_pending(
648             'Channel::OpenOk' => sub {
649             my ($c, $frame) = @_;
650             my $f = $self->{channel_map}{$frame->channel};
651             $f->done($c) unless $f->is_ready;
652             }
653             );
654             $self->send_frame($frame);
655             return $f;
656             }
657              
658             =head2 close
659              
660             Close the connection.
661              
662             Returns a L which will resolve with C<$self> when the connection is closed.
663              
664             =cut
665              
666             sub close {
667             my $self = shift;
668             my %args = @_;
669              
670             $self->heartbeat_send_timer->stop if $self->heartbeat_send_timer;
671              
672             my $f = $self->loop->new_future;
673              
674             # We might end up with a connection shutdown rather
675             # than a clean Connection::Close response, so
676             # we need to handle both possibilities
677             my @handler;
678             $self->bus->subscribe_to_event(
679             @handler = (
680             close => sub {
681             my ($ev, $reason) = @_;
682             splice @handler;
683             eval { $ev->unsubscribe; };
684             return unless $f;
685             $f->done($reason) unless $f->is_ready;
686             weaken $f;
687             }
688             )
689             );
690              
691             my $frame = Net::AMQP::Frame::Method->new(
692             method_frame => Net::AMQP::Protocol::Connection::Close->new(
693             reply_code => $args{code} // 320,
694             reply_text => $args{text} // 'Request connection close',
695             ),
696             );
697             $self->push_pending(
698             'Connection::CloseOk' => [ $f, $self ],
699             );
700             $self->send_frame($frame);
701              
702             # ... and make sure we clean up after ourselves
703             $f->on_ready(sub {
704             $self->bus->unsubscribe_from_event(
705             @handler
706             );
707             weaken $f if $f;
708             });
709             }
710              
711             =head2 channel_closed
712              
713             =cut
714              
715             sub channel_closed {
716             my ($self, $id) = @_;
717             my $f = delete $self->{channel_map}{$id}
718             or die "Had a close indication for channel $id but this channel is unknown";
719             $f->cancel unless $f->is_ready;
720             $self->remove_child(delete $self->{channel_by_id}{$id});
721              
722             # Record this ID as available for the next time we need to open a new channel
723             push @{$self->{available_channel_id}}, $id;
724             $self
725             }
726              
727             sub channel_by_id { my $self = shift; $self->{channel_by_id}{+shift} }
728              
729             =head2 next_pending
730              
731             Retrieves the next pending handler for the given incoming frame type (see L),
732             and calls it.
733              
734             Takes the following parameters:
735              
736             =over 4
737              
738             =item * $type - the frame type, such as 'Basic::ConnectOk'
739              
740             =item * $frame - the frame itself
741              
742             =back
743              
744             Returns $self.
745              
746             =cut
747              
748             sub next_pending {
749             my ($self, $type, $frame) = @_;
750             $self->debug_printf("Check next pending for %s", $type);
751              
752             if(my $next = shift @{$self->{pending}{$type} || []}) {
753             # We have a registered handler for this frame type. This usually
754             # means that we've sent a frame and are awaiting a response.
755             if(ref($next) eq 'ARRAY') {
756             my ($f, @args) = @$next;
757             $f->done(@args) unless $f->is_ready;
758             } else {
759             $next->($self, $frame, @_);
760             }
761             } else {
762             # It's quite possible we'll see unsolicited frames back from
763             # the server: these will typically be errors, connection close,
764             # or consumer cancellation if the consumer_cancel_notify
765             # option is set (RabbitMQ). We don't expect many so report
766             # them when in debug mode.
767             $self->debug_printf("We had no pending handlers for %s, raising as event", $type);
768             $self->bus->invoke_event(
769             unexpected_frame => $type, $frame
770             );
771             }
772             $self
773             }
774              
775             =head1 METHODS - Accessors
776              
777             =head2 host
778              
779             The current host.
780              
781             =cut
782              
783             sub host { shift->{host} }
784              
785             =head2 vhost
786              
787             Virtual host.
788              
789             =cut
790              
791             sub vhost { shift->{vhost} }
792              
793             =head2 port
794              
795             Port number. Usually 5672.
796              
797             =cut
798              
799             sub port { shift->{port} }
800              
801             =head2 user
802              
803             MQ user.
804              
805             =cut
806              
807             sub user { shift->{user} }
808              
809             =head2 frame_max
810              
811             Maximum number of bytes allowed in any given frame. This is the
812             value negotiated with the remote server.
813              
814             =cut
815              
816             sub frame_max {
817             my $self = shift;
818             return $self->{frame_max} unless @_;
819              
820             $self->{frame_max} = shift;
821             $self
822             }
823              
824             =head2 channel_max
825              
826             Maximum number of channels. This is whatever we ended up with after initial negotiation.
827              
828             =cut
829              
830             sub channel_max {
831             my $self = shift;
832             return $self->{channel_max} ||= $self->{max_channels} || $self->MAX_CHANNELS unless @_;
833              
834             $self->{channel_max} = shift;
835             $self
836             }
837              
838             sub max_channels { shift->{max_channels} }
839              
840             =head2 last_frame_time
841              
842             Timestamp of the last frame we received from the remote. Used for handling heartbeats.
843              
844             =cut
845              
846             sub last_frame_time {
847             my $self = shift;
848             return $self->{last_frame_time} unless @_;
849              
850             $self->{last_frame_time} = shift;
851             $self->heartbeat_receive_timer->reset if $self->heartbeat_receive_timer;
852             $self
853             }
854              
855             =head2 stream
856              
857             Returns the current L for the AMQP connection.
858              
859             =cut
860              
861             sub stream { shift->{stream} }
862              
863             =head2 incoming_message
864              
865             L for the current incoming message (received in two or more parts:
866             the header then all body chunks).
867              
868             =cut
869              
870             sub incoming_message { shift->{incoming_message} }
871              
872             =head1 METHODS - Internal
873              
874             The following methods are intended for internal use. They are documented
875             for completeness but should not normally be needed outside this library.
876              
877             =cut
878              
879             =head2 heartbeat_interval
880              
881             Current maximum interval between frames.
882              
883             =cut
884              
885             sub heartbeat_interval { shift->{heartbeat_interval} //= HEARTBEAT_INTERVAL }
886              
887             =head2 missed_heartbeats_allowed
888              
889             How many times we allow the remote to miss the frame-sending deadline in a row
890             before we give up and close the connection. Defined by the protocol, should be
891             3x heartbeats.
892              
893             =cut
894              
895             sub missed_heartbeats_allowed { 3 }
896              
897             =head2 apply_heartbeat_timer
898              
899             Enable both heartbeat timers.
900              
901             =cut
902              
903             sub apply_heartbeat_timer {
904             my $self = shift;
905             { # On expiry, will trigger a heartbeat send from us to the server
906             my $timer = IO::Async::Timer::Countdown->new(
907             delay => $self->heartbeat_interval,
908             on_expire => $self->curry::weak::send_heartbeat,
909             );
910             $self->add_child($timer);
911             $timer->start;
912             Scalar::Util::weaken($self->{heartbeat_send_timer} = $timer);
913             }
914             { # This timer indicates no traffic from the remote for 3*heartbeat
915             my $timer = IO::Async::Timer::Countdown->new(
916             delay => $self->missed_heartbeats_allowed * $self->heartbeat_interval,
917             on_expire => $self->curry::weak::handle_heartbeat_failure,
918             );
919             $self->add_child($timer);
920             $timer->start;
921             Scalar::Util::weaken($self->{heartbeat_receive_timer} = $timer);
922             }
923             $self
924             }
925              
926             =head2 reset_heartbeat
927              
928             Resets our side of the heartbeat timer.
929              
930             This is used to ensure we send data at least once every L
931             seconds.
932              
933             =cut
934              
935             sub reset_heartbeat {
936             my $self = shift;
937             return unless my $timer = $self->heartbeat_send_timer;
938              
939             $timer->reset;
940             }
941              
942              
943             =head2 heartbeat_receive_timer
944              
945             Timer for tracking frames we've received.
946              
947             =cut
948              
949             sub heartbeat_receive_timer { shift->{heartbeat_receive_timer} }
950              
951             =head2 heartbeat_send_timer
952              
953             Timer for tracking when we're due to send out something.
954              
955             =cut
956              
957             sub heartbeat_send_timer { shift->{heartbeat_send_timer} }
958              
959             =head2 handle_heartbeat_failure
960              
961             Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat
962             intervals (see L). We'd expect some frame from the remote - even
963             if just a heartbeat frame - at least once every heartbeat interval so if this triggers then
964             we're likely dealing with a dead or heavily loaded server.
965              
966             This will invoke the L then close the connection.
967              
968             =cut
969              
970             sub handle_heartbeat_failure {
971             my $self = shift;
972             $self->debug_printf("Heartbeat timeout: no data received from server since %s, closing connection", $self->last_frame_time);
973              
974             $self->bus->invoke_event(
975             heartbeat_failure => $self->last_frame_time
976             );
977             $self->close;
978             }
979              
980             =head2 send_heartbeat
981              
982             Sends the heartbeat frame.
983              
984             =cut
985              
986             sub send_heartbeat {
987             my $self = shift;
988             $self->debug_printf("Sending heartbeat frame");
989              
990             # Heartbeat messages apply to the connection rather than
991             # individual channels, so we use channel 0 to represent this
992             $self->send_frame(
993             Net::AMQP::Frame::Heartbeat->new,
994             channel => 0,
995             );
996              
997             # Ensure heartbeat timer is active for next time
998             if(my $timer = $self->heartbeat_send_timer) {
999             $timer->reset;
1000             $timer->start;
1001             }
1002             }
1003              
1004             =head2 push_pending
1005              
1006             Adds the given handler(s) to the pending handler list for the given type(s).
1007              
1008             Takes one or more of the following parameter pairs:
1009              
1010             =over 4
1011              
1012             =item * $type - the frame type, see L
1013              
1014             =item * $code - the coderef to call, will be invoked once as follows when a matching frame is received:
1015              
1016             $code->($self, $frame, @_)
1017              
1018             =back
1019              
1020             Returns C< $self >.
1021              
1022             =cut
1023              
1024             sub push_pending {
1025             my $self = shift;
1026             while(@_) {
1027             my ($type, $code) = splice @_, 0, 2;
1028             push @{$self->{pending}{$type}}, $code;
1029             }
1030             return $self;
1031             }
1032              
1033             =head2 remove_pending
1034              
1035             Removes a coderef from the pending event handler.
1036              
1037             Returns C< $self >.
1038              
1039             =cut
1040              
1041             sub remove_pending {
1042             my $self = shift;
1043             while(@_) {
1044             my ($type, $code) = splice @_, 0, 2;
1045             # This is the same as extract_by { $_ eq $code } @{$self->{pending}{$type}};,
1046             # but since we'll be calling it a lot might as well do it inline:
1047             splice
1048             @{$self->{pending}{$type}},
1049             $_,
1050             1 for grep {
1051             $self->{pending}{$type}[$_] eq $code
1052             } reverse 0..$#{$self->{pending}{$type}};
1053             }
1054             return $self;
1055             }
1056              
1057             =head2 write
1058              
1059             Writes data to the server.
1060              
1061             Returns a L which will resolve to an empty list when
1062             done.
1063              
1064             =cut
1065              
1066             sub write {
1067             my $self = shift;
1068             $self->stream->write(@_)
1069             }
1070              
1071             =head2 process_frame
1072              
1073             Process a single incoming frame.
1074              
1075             Takes the following parameters:
1076              
1077             =over 4
1078              
1079             =item * $frame - the L instance
1080              
1081             =back
1082              
1083             Returns $self.
1084              
1085             =cut
1086              
1087             sub process_frame {
1088             my ($self, $frame) = @_;
1089             $self->debug_printf("Received %s", amqp_frame_info($frame));
1090              
1091             my $frame_type = amqp_frame_type($frame);
1092              
1093             if($frame_type eq 'Heartbeat') {
1094             # Ignore these completely. Since we have the last frame update at the data-read
1095             # level, there's nothing for us to do here.
1096             $self->debug_printf("Heartbeat received");
1097              
1098             # A peer that receives an invalid heartbeat frame MUST raise a connection
1099             # exception with reply code 501 (frame error)
1100             $self->close(
1101             reply_code => 501,
1102             reply_text => 'Frame error - heartbeat should have channel 0'
1103             ) if $frame->channel;
1104              
1105             return $self;
1106             } elsif(my $ch = $self->channel_by_id($frame->channel)) {
1107             $self->debug_printf("Processing frame %s on channel %d", $frame_type, $ch);
1108             return $self if $ch->next_pending($frame);
1109             }
1110              
1111             $self->debug_printf("Processing connection frame %s", $frame_type);
1112              
1113             $self->next_pending($frame_type, $frame);
1114              
1115             return $self;
1116             }
1117              
1118             =head2 split_payload
1119              
1120             Splits a message into separate frames.
1121              
1122             Takes the $payload as a scalar containing byte data, and the following parameters:
1123              
1124             =over 4
1125              
1126             =item * exchange - where we're sending the message
1127              
1128             =item * routing_key - other part of message destination
1129              
1130             =back
1131              
1132             Additionally, the following headers can be passed:
1133              
1134             =over 4
1135              
1136             =item * content_type
1137              
1138             =item * content_encoding
1139              
1140             =item * headers
1141              
1142             =item * delivery_mode
1143              
1144             =item * priority
1145              
1146             =item * correlation_id
1147              
1148             =item * reply_to
1149              
1150             =item * expiration
1151              
1152             =item * message_id
1153              
1154             =item * timestamp
1155              
1156             =item * type
1157              
1158             =item * user_id
1159              
1160             =item * app_id
1161              
1162             =item * cluster_id
1163              
1164             =back
1165              
1166             Returns list of frames suitable for passing to L.
1167              
1168             =cut
1169              
1170             sub split_payload {
1171             my $self = shift;
1172             my $payload = shift;
1173             my %opts = @_;
1174              
1175             # Get the original content length first
1176             my $payload_size = length $payload;
1177              
1178             my @body_frames;
1179             while (length $payload) {
1180             my $chunk = substr $payload, 0, $self->frame_max - PAYLOAD_HEADER_LENGTH, '';
1181             push @body_frames, Net::AMQP::Frame::Body->new(
1182             payload => $chunk
1183             );
1184             }
1185              
1186             return
1187             Net::AMQP::Protocol::Basic::Publish->new(
1188             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(ticket exchange routing_key mandatory immediate)
1189             ),
1190             Net::AMQP::Frame::Header->new(
1191             weight => $opts{weight} || 0,
1192             body_size => $payload_size,
1193             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
1194             map {; $_ => $opts{$_} } grep defined($opts{$_}), qw(
1195             content_type
1196             content_encoding
1197             headers
1198             delivery_mode
1199             priority
1200             correlation_id
1201             reply_to
1202             expiration
1203             message_id
1204             timestamp
1205             type
1206             user_id
1207             app_id
1208             cluster_id
1209             )
1210             ),
1211             ),
1212             @body_frames;
1213             }
1214              
1215             =head2 send_frame
1216              
1217             Send a single frame.
1218              
1219             Takes the $frame instance followed by these optional named parameters:
1220              
1221             =over 4
1222              
1223             =item * channel - which channel we should send on
1224              
1225             =back
1226              
1227             Returns a L which will resolve to an empty list
1228             when the frame has been written (this does not guarantee that the server has received it).
1229              
1230             =cut
1231              
1232             sub send_frame {
1233             my $self = shift;
1234             my $frame = shift;
1235             my %args = @_;
1236              
1237             # Apply defaults and wrap as required
1238             $frame = $frame->frame_wrap if $frame->isa("Net::AMQP::Protocol::Base");
1239             die "Frame has channel ID " . $frame->channel . " but we wanted " . $args{channel}
1240             if defined $frame->channel && defined $args{channel} && $frame->channel != $args{channel};
1241              
1242             $frame->channel($args{channel} // 0) unless defined $frame->channel;
1243              
1244             $self->debug_printf("Sending %s", amqp_frame_info($frame));
1245              
1246             # Get bytes to send across our transport
1247             my $data = $frame->to_raw_frame;
1248              
1249             # warn "Sending data: " . Dumper($frame) . "\n";
1250             $self->write(
1251             $data,
1252             )->on_done($self->curry::reset_heartbeat)
1253             }
1254              
1255             =head2 header_bytes
1256              
1257             Byte string representing the header bytes we should send on initial TCP connect.
1258             Net::AMQP uses AMQP\x01\x01\x09\x01, which does not appear to comply with AMQP 0.9.1
1259             section 4.2.2.
1260              
1261             =cut
1262              
1263             sub header_bytes { "AMQP\x00\x00\x09\x01" }
1264              
1265             sub _add_to_loop {
1266             my ($self, $loop) = @_;
1267             $self->debug_printf("Added %s to loop", $self);
1268             }
1269              
1270             =head1 future
1271              
1272             Returns a new L instance.
1273              
1274             Supports optional named parameters for setting label etc.
1275              
1276             =cut
1277              
1278             sub future {
1279             my $self = shift;
1280             my $f = $self->loop->new_future;
1281             while(my ($k, $v) = splice @_, 0, 2) {
1282             $f->can($k) ? $f->$k($v) : die "Unable to call method $k on $f";
1283             }
1284             $f
1285             }
1286              
1287             1;
1288              
1289             __END__