File Coverage

blib/lib/AnyEvent/RabbitMQ/Fork.pm
Criterion Covered Total %
statement 18 18 100.0
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 24 24 100.0


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Fork;
2             $AnyEvent::RabbitMQ::Fork::VERSION = '0.5';
3             # ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
4              
5             =head1 NAME
6              
7             AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
8              
9             =cut
10              
11 1     1   1099 use Moo;
  1         12827  
  1         539  
12 1     1   1746 use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
  1         55879  
  1         28  
13 1     1   1147 use Scalar::Util qw(weaken);
  1         1  
  1         57  
14 1     1   4 use Carp qw(croak);
  1         1  
  1         42  
15 1     1   581 use File::ShareDir qw(dist_file);
  1         5790  
  1         83  
16              
17 1         3 use constant DEFAULT_AMQP_SPEC =>
18 1     1   7 dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');
  1         1  
19              
20             use namespace::clean;
21              
22             use AnyEvent::Fork;
23             use AnyEvent::Fork::RPC;
24              
25             use Net::AMQP;
26              
27             use AnyEvent::RabbitMQ::Fork::Channel;
28              
29             =head1 SYNOPSIS
30              
31             use AnyEvent::RabbitMQ::Fork;
32              
33             my $cv = AnyEvent->condvar;
34              
35             my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
36             host => 'localhost',
37             port => 5672,
38             user => 'guest',
39             pass => 'guest',
40             vhost => '/',
41             timeout => 1,
42             tls => 0, # Or 1 if you'd like SSL
43             tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
44             on_success => sub {
45             my $ar = shift;
46             $ar->open_channel(
47             on_success => sub {
48             my $channel = shift;
49             $channel->declare_exchange(
50             exchange => 'test_exchange',
51             on_success => sub {
52             $cv->send('Declared exchange');
53             },
54             on_failure => $cv,
55             );
56             },
57             on_failure => $cv,
58             on_close => sub {
59             my $method_frame = shift->method_frame;
60             die $method_frame->reply_code, $method_frame->reply_text;
61             },
62             );
63             },
64             on_failure => $cv,
65             on_read_failure => sub { die @_ },
66             on_return => sub {
67             my $frame = shift;
68             die "Unable to deliver ", Dumper($frame);
69             },
70             on_close => sub {
71             my $why = shift;
72             if (ref($why)) {
73             my $method_frame = $why->method_frame;
74             die $method_frame->reply_code, ": ", $method_frame->reply_text;
75             }
76             else {
77             die $why;
78             }
79             },
80             );
81              
82             print $cv->recv, "\n";
83              
84             =cut
85              
86             has verbose => (is => 'rw', isa => Bool, default => 0);
87             has is_open => (is => 'ro', isa => Bool, default => 0);
88             has login_user => (is => 'ro', isa => Str);
89             has server_properties => (is => 'ro', isa => Str);
90              
91             has worker_class => (is => 'lazy', isa => Str);
92             has channel_class => (is => 'lazy', isa => Str);
93             has worker_function => (is => 'lazy', isa => Str);
94             has init_function => (is => 'lazy', isa => Str);
95              
96             sub _build_worker_class { return __PACKAGE__ . '::Worker' }
97             sub _build_channel_class { return __PACKAGE__ . '::Channel' }
98             sub _build_worker_function { return $_[0]->worker_class . '::run' }
99             sub _build_init_function { return $_[0]->worker_class . '::init' }
100              
101             has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);
102              
103             sub _build__drain_cv { return AE::cv }
104              
105             has channels => (
106             is => 'ro',
107             isa => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
108             clearer => 1,
109             default => sub { {} },
110             init_arg => undef,
111             );
112              
113             has cb_registry => (
114             is => 'ro',
115             isa => HashRef,
116             default => sub { {} },
117             clearer => 1,
118             init_arg => undef,
119             );
120              
121             has rpc => (
122             is => 'lazy',
123             isa => CodeRef,
124             predicate => 1,
125             clearer => 1,
126             init_arg => undef,
127             );
128              
129             sub _build_rpc {
130             my $self = shift;
131             weaken(my $wself = $self);
132              
133             return AnyEvent::Fork->new #
134             ->require($self->worker_class) #
135             ->send_arg($self->worker_class, verbose => $self->verbose) #
136             ->AnyEvent::Fork::RPC::run(
137             $self->worker_function,
138             async => 1,
139             serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
140             on_event => sub { $wself->_on_event(@_) },
141             on_error => sub { $wself->_on_error(@_) },
142             on_destroy => sub { $wself->_on_destroy(@_) },
143             init => $self->init_function,
144             # TODO look into
145             #done => '',
146             );
147             }
148              
149             =head1 DESCRIPTION
150              
151             This module is mean't to be a close to a drop-in facade for running
152             L in a background process via L.
153              
154             Tha main use case is for programs where other operations block with little
155             control due to difficulty/laziness. In this way, the process hosting the
156             connection RabbitMQ is doing nothing else but processing messages.
157              
158             =cut
159              
160             my $cb_id = 'a'; # textual ++ gives a bigger space than numerical ++
161              
162             sub _delegate {
163             my ($self, $method, $ch_id, @args, %args) = @_;
164              
165             unless (@args % 2) {
166             %args = @args;
167             @args = ();
168             foreach my $event (grep { /^on_/ } keys %args) {
169             my $id = $cb_id++;
170              
171             # store the user callback
172             $self->cb_registry->{$id} = delete $args{$event};
173              
174             # create a signature to send back to on_event
175             $args{$event} = [$id, $event, $method, scalar caller];
176             }
177             }
178              
179             $self->rpc->(
180             $method, $ch_id,
181             (@args ? @args : %args),
182             sub {
183             croak @_ if @_;
184             }
185             );
186              
187             return $self;
188             }
189              
190             =head1 CONSTRCTOR
191              
192             my $ar = AnyEvent::RabbitMQ::Fork->new();
193              
194             =head2 Options
195              
196             =over
197              
198             =item verbose [Bool]
199              
200             Prints a LOT of debugging information to C.
201              
202             =back
203              
204             =cut
205              
206             before verbose => sub {
207             return if @_ < 2;
208             $_[0]->_delegate(verbose => 0, $_[1]);
209             };
210              
211             =head1 METHODS
212              
213             =over
214              
215             =item load_xml_spec([$amqp_spec_xml_path])
216              
217             Declare and load the AMQP Specification you wish to use. The default id to use
218             version 0.9.1 with RabbitMQ specific extensions.
219              
220             B
221              
222             =cut
223              
224             my $_loaded_spec;
225             sub load_xml_spec {
226             my $self = shift;
227             my $spec = shift || DEFAULT_AMQP_SPEC;
228              
229             if ($_loaded_spec and $_loaded_spec ne $spec) {
230             croak(
231             "Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"
232             );
233             } elsif (!$_loaded_spec) {
234             Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
235             }
236              
237             return $self->_delegate(load_xml_spec => 0, $spec);
238             }
239              
240             =item connect(%opts)
241              
242             Open connection to an AMQP server to begin work.
243              
244             Arguments:
245              
246             =over
247              
248             =item B
249              
250             =item B
251              
252             =item B
253              
254             =item B
255              
256             =item B
257              
258             =item B TCP timeout in seconds. Default: use L default
259              
260             =item B Boolean to use SSL/TLS or not. Default: 0
261              
262             =item B Hash: (values are negotiated with the server)
263              
264             =over
265              
266             =item B Heartbeat interval in seconds. Default: 0 (off)
267              
268             =item B Maximum channel ID. Default: 65536
269              
270             =item B Maximum frame size in bytes. Default: 131072
271              
272             =back
273              
274             =item B Callback when the connection is successfully established.
275              
276             =item B Called when a failure occurs over the lifetime of the connection.
277              
278             =item B Called when there is a problem reading response from the server.
279              
280             =item B Called if the server returns a published message.
281              
282             =item B Called when the connection is closed remotely.
283              
284             =back
285              
286             B
287              
288             =item open_channel(%opts)
289              
290             Open a logical channel which is where all the AMQP fun is.
291              
292             Arguments:
293              
294             =over
295              
296             =item B Called when the channel is open and ready for use.
297              
298             =item B Called if there is a problem opening the channel.
299              
300             =item B Called when the channel is closed.
301              
302             =back
303              
304             =item close(%opts)
305              
306             Close this connection.
307              
308             =over
309              
310             =item B Called on successful shutdown.
311              
312             =item B Called on failed shutdown. Note: the connection is still
313             closed after this
314              
315             =back
316              
317             =back
318              
319             =cut
320              
321             foreach my $method (qw(connect open_channel close)) {
322             no strict 'refs';
323             *$method = sub {
324             my $self = shift;
325             return $self->_delegate($method => 0, @_);
326             };
327             }
328              
329             sub drain_writes {
330             my ($self, $to) = @_;
331              
332             my $w;
333             if ($to) {
334             $w = AE::timer $to, 0,
335             sub { $self->_drain_cv->croak("Timed out after $to") };
336             }
337              
338             $self->_drain_cv->recv;
339             $self->_clear_drain_cv;
340             undef $w;
341              
342             return;
343             }
344              
345             my %event_handlers = (
346             cb => '_handle_callback',
347             cbd => '_handle_callback_destroy',
348             chd => '_handle_channel_destroy',
349             cdw => '_handle_connection_drain_writes',
350             i => '_handle_info',
351             );
352              
353             sub _on_event {
354             my $self = shift;
355             my $type = shift;
356              
357             if (my $handler = $event_handlers{$type}) {
358             $self->$handler(@_);
359             } else {
360             croak "Unknown event type: '$type'";
361             }
362              
363             return;
364             }
365              
366             sub _handle_callback { ## no critic (Subroutines::RequireArgUnpacking)
367             my $self = shift;
368             my $sig = shift;
369             my ($id, $event, $method, $pkg) = @$sig;
370              
371             warn "_handle_callback $id $event $method $pkg\n" if $self->verbose;
372              
373             if (my $cb = $self->cb_registry->{$id}) {
374             if (ref($_[0]) eq 'REF' and ref(${ $_[0] }) eq 'ARRAY') {
375             my ($class, @args) = @{ ${ $_[0] } };
376              
377             if ($class eq 'AnyEvent::RabbitMQ') {
378             $_[0] = $self;
379             } elsif ($class eq 'AnyEvent::RabbitMQ::Channel') {
380             my $channel_id = shift @args;
381             $_[0] = $self->channels->{$channel_id}
382             ||= $self->channel_class->new(
383             id => $channel_id,
384             connection => $self
385             );
386             } else {
387             croak "Unknown class type: '$class'";
388             }
389             }
390              
391             goto &$cb;
392             } else {
393             croak "Unknown callback id: '$id'";
394             }
395              
396             return;
397             }
398              
399             sub _handle_info {
400             my ($self, $info) = @_;
401              
402             $self->_handle_connection_info(%{ delete $info->{connection} })
403             if $info->{connection};
404              
405             $self->_handle_channel_info($_, %{ $info->{$_} }) foreach keys %$info;
406              
407             return;
408             }
409              
410             # channel information passback
411             sub _handle_channel_info {
412             my ($self, $ch_id, %args) = @_;
413              
414             warn "_handle_channel_info $ch_id @{[ %args ]}\n" if $self->verbose;
415              
416             if (my $ch = $self->channels->{$ch_id}) {
417             @$ch{ keys %args } = values %args;
418             } else {
419             croak "Unknown channel: '$ch_id'";
420             }
421              
422             return;
423             }
424              
425             sub _handle_channel_destroy {
426             my ($self, $ch_id) = @_;
427              
428             warn "_handle_channel_destroy $ch_id\n" if $self->verbose;
429              
430             delete $self->channels->{$ch_id};
431              
432             return;
433             }
434              
435             # connection information passback
436             sub _handle_connection_info {
437             my ($self, %args) = @_;
438              
439             warn "_handle_connection_info @{[ %args ]}\n" if $self->verbose;
440              
441             @$self{ keys %args } = values %args;
442              
443             return;
444             }
445              
446             sub _handle_callback_destroy {
447             my ($self, $id, $event, $method, $pkg) = @_;
448              
449             warn "_handle_callback_destroy $id $event $method $pkg\n" if $self->verbose;
450              
451             delete $self->cb_registry->{$id};
452              
453             return;
454             }
455              
456             sub _handle_connection_drain_writes {
457             my $self = shift;
458              
459             $self->_drain_cv->send if $self->_has_drain_cv;
460              
461             return;
462             }
463              
464             sub _on_error {
465             my $self = shift;
466              
467             croak @_;
468             }
469              
470             sub _on_destroy {
471             my $self = shift;
472              
473             warn "_on_destroy\n" if $self->verbose;
474              
475             # TODO implement reconnect
476             return;
477             }
478              
479             sub DEMOLISH {
480             my ($self, $in_gd) = @_;
481             return if $in_gd;
482             return unless $self->has_rpc;
483              
484             $self->rpc->(DEMOLISH => 0, my $cv = AE::cv);
485              
486             $cv->recv;
487              
488             $self->clear_rpc;
489              
490             return;
491             }
492              
493             =head1 AUTHOR
494              
495             William Cox
496              
497             =head1 COPYRIGHT
498              
499             Copyright (c) 2014, the above named author(s).
500              
501             =head1 LICENSE
502              
503             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
504              
505             =cut
506              
507             1;