File Coverage

blib/lib/AnyEvent/RabbitMQ/Fork.pm
Criterion Covered Total %
statement 36 141 25.5
branch 0 50 0.0
condition 0 11 0.0
subroutine 12 38 31.5
pod 1 3 33.3
total 49 243 20.1


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Fork;
2             $AnyEvent::RabbitMQ::Fork::VERSION = '0.6';
3             # ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
4              
5             =head1 NAME
6              
7             AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
8              
9             =cut
10              
11 1     1   1386 use Moo;
  1         11561  
  1         5  
12 1     1   2098 use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
  1         76378  
  1         15  
13 1     1   1501 use Scalar::Util qw(weaken);
  1         2  
  1         60  
14 1     1   7 use Carp qw(croak);
  1         2  
  1         45  
15 1     1   572 use File::ShareDir qw(dist_file);
  1         21377  
  1         72  
16              
17 1         6 use constant DEFAULT_AMQP_SPEC =>
18 1     1   9 dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');
  1         2  
19              
20 1     1   931 use namespace::clean;
  1         11543  
  1         7  
21              
22 1     1   1056 use AnyEvent::Fork;
  1         20896  
  1         38  
23 1     1   653 use AnyEvent::Fork::RPC;
  1         1641  
  1         39  
24              
25 1     1   485 use Net::AMQP;
  1         84239  
  1         36  
26              
27 1     1   502 use AnyEvent::RabbitMQ::Fork::Channel;
  1         4  
  1         999  
28              
29             =head1 SYNOPSIS
30              
31             use AnyEvent::RabbitMQ::Fork;
32              
33             my $cv = AnyEvent->condvar;
34              
35             my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
36             host => 'localhost',
37             port => 5672,
38             user => 'guest',
39             pass => 'guest',
40             vhost => '/',
41             timeout => 1,
42             tls => 0, # Or 1 if you'd like SSL
43             tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
44             on_success => sub {
45             my $ar = shift;
46             $ar->open_channel(
47             on_success => sub {
48             my $channel = shift;
49             $channel->declare_exchange(
50             exchange => 'test_exchange',
51             on_success => sub {
52             $cv->send('Declared exchange');
53             },
54             on_failure => $cv,
55             );
56             },
57             on_failure => $cv,
58             on_close => sub {
59             my $method_frame = shift->method_frame;
60             die $method_frame->reply_code, $method_frame->reply_text;
61             },
62             );
63             },
64             on_failure => $cv,
65             on_read_failure => sub { die @_ },
66             on_return => sub {
67             my $frame = shift;
68             die "Unable to deliver ", Dumper($frame);
69             },
70             on_close => sub {
71             my $why = shift;
72             if (ref($why)) {
73             my $method_frame = $why->method_frame;
74             die $method_frame->reply_code, ": ", $method_frame->reply_text;
75             }
76             else {
77             die $why;
78             }
79             },
80             );
81              
82             print $cv->recv, "\n";
83              
84             =cut
85              
86             has verbose => (is => 'rw', isa => Bool, default => 0);
87             has is_open => (is => 'ro', isa => Bool, default => 0);
88             has login_user => (is => 'ro', isa => Str);
89             has server_properties => (is => 'ro', isa => Str);
90              
91             has worker_class => (is => 'lazy', isa => Str);
92             has channel_class => (is => 'lazy', isa => Str);
93             has worker_function => (is => 'lazy', isa => Str);
94             has init_function => (is => 'lazy', isa => Str);
95              
96 0     0     sub _build_worker_class { return __PACKAGE__ . '::Worker' }
97 0     0     sub _build_channel_class { return __PACKAGE__ . '::Channel' }
98 0     0     sub _build_worker_function { return $_[0]->worker_class . '::run' }
99 0     0     sub _build_init_function { return $_[0]->worker_class . '::init' }
100              
101             has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);
102              
103 0     0     sub _build__drain_cv { return AE::cv }
104              
105             has channels => (
106             is => 'ro',
107             isa => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
108             clearer => 1,
109             default => sub { {} },
110             init_arg => undef,
111             );
112              
113             has cb_registry => (
114             is => 'ro',
115             isa => HashRef,
116             default => sub { {} },
117             clearer => 1,
118             init_arg => undef,
119             );
120              
121             has rpc => (
122             is => 'lazy',
123             isa => CodeRef,
124             predicate => 1,
125             clearer => 1,
126             init_arg => undef,
127             );
128              
129             sub _build_rpc {
130 0     0     my $self = shift;
131 0           weaken(my $wself = $self);
132              
133             return AnyEvent::Fork->new #
134             ->require($self->worker_class) #
135             ->send_arg($self->worker_class, verbose => $self->verbose) #
136             ->AnyEvent::Fork::RPC::run(
137             $self->worker_function,
138             async => 1,
139             serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
140 0 0   0     on_event => sub { $wself && $wself->_on_event(@_) },
141 0 0   0     on_error => sub { $wself && $wself->_on_error(@_) },
142 0 0   0     on_destroy => sub { $wself && $wself->_on_destroy(@_) },
143 0           init => $self->init_function,
144             # TODO look into
145             #done => '',
146             );
147             }
148              
149             =head1 DESCRIPTION
150              
151             This module is mean't to be a close to a drop-in facade for running
152             L in a background process via L.
153              
154             Tha main use case is for programs where other operations block with little
155             control due to difficulty/laziness. In this way, the process hosting the
156             connection RabbitMQ is doing nothing else but processing messages.
157              
158             =cut
159              
160             my $cb_id = 'a'; # textual ++ gives a bigger space than numerical ++
161              
162             sub _delegate {
163 0     0     my ($self, $method, $ch_id, @args, %args) = @_;
164              
165 0 0         unless (@args % 2) {
166 0           %args = @args;
167 0           @args = ();
168 0           foreach my $event (grep { /^on_/ } keys %args) {
  0            
169 0           my $id = $cb_id++;
170              
171             # store the user callback
172 0           $self->cb_registry->{$id} = delete $args{$event};
173              
174             # create a signature to send back to on_event
175 0           $args{$event} = [$id, $event, $method, scalar caller];
176             }
177             }
178              
179             $self->rpc->(
180             $method, $ch_id,
181             (@args ? @args : %args),
182             sub {
183 0 0   0     croak @_ if @_;
184             }
185 0 0         );
186              
187 0           return $self;
188             }
189              
190             =head1 CONSTRCTOR
191              
192             my $ar = AnyEvent::RabbitMQ::Fork->new();
193              
194             =head2 Options
195              
196             =over
197              
198             =item verbose [Bool]
199              
200             Prints a LOT of debugging information to C.
201              
202             =back
203              
204             =cut
205              
206             before verbose => sub {
207             return if @_ < 2;
208             $_[0]->_delegate(verbose => 0, $_[1]);
209             };
210              
211             =head1 METHODS
212              
213             =over
214              
215             =item load_xml_spec([$amqp_spec_xml_path])
216              
217             Declare and load the AMQP Specification you wish to use. The default is to use
218             version 0.9.1 with RabbitMQ specific extensions.
219              
220             B
221              
222             =cut
223              
224             my $_loaded_spec;
225             sub load_xml_spec {
226 0     0 1   my $self = shift;
227 0   0       my $spec = shift || DEFAULT_AMQP_SPEC;
228              
229 0 0 0       if ($_loaded_spec and $_loaded_spec ne $spec) {
    0          
230 0           croak(
231             "Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"
232             );
233             } elsif (!$_loaded_spec) {
234 0           Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
235             }
236              
237 0           return $self->_delegate(load_xml_spec => 0, $spec);
238             }
239              
240             =item connect(%opts)
241              
242             Open connection to an AMQP server to begin work.
243              
244             Arguments:
245              
246             =over
247              
248             =item B
249              
250             =item B
251              
252             =item B
253              
254             =item B
255              
256             =item B
257              
258             =item B TCP timeout in seconds. Default: use L default
259              
260             =item B Boolean to use SSL/TLS or not. Default: 0
261              
262             =item B Hash: (values are negotiated with the server)
263              
264             =over
265              
266             =item B Heartbeat interval in seconds. Default: 0 (off)
267              
268             =item B Maximum channel ID. Default: 65536
269              
270             =item B Maximum frame size in bytes. Default: 131072
271              
272             =back
273              
274             =item B Callback when the connection is successfully established.
275              
276             =item B Called when a failure occurs over the lifetime of the connection.
277              
278             =item B Called when there is a problem reading response from the server.
279              
280             =item B Called if the server returns a published message.
281              
282             =item B Called when the connection is closed remotely.
283              
284             =back
285              
286             B
287              
288             =item open_channel(%opts)
289              
290             Open a logical channel which is where all the AMQP fun is.
291              
292             Arguments:
293              
294             =over
295              
296             =item B Called when the channel is open and ready for use.
297              
298             =item B Called if there is a problem opening the channel.
299              
300             =item B Called when the channel is closed.
301              
302             =back
303              
304             =item close(%opts)
305              
306             Close this connection.
307              
308             =over
309              
310             =item B Called on successful shutdown.
311              
312             =item B Called on failed shutdown. Note: the connection is still
313             closed after this
314              
315             =back
316              
317             =back
318              
319             =cut
320              
321             foreach my $method (qw(connect open_channel close)) {
322 1     1   11 no strict 'refs';
  1         2  
  1         1188  
323             *$method = sub {
324 0     0     my $self = shift;
325 0           return $self->_delegate($method => 0, @_);
326             };
327             }
328              
329             sub drain_writes {
330 0     0 0   my ($self, $to) = @_;
331              
332 0           my $w;
333 0 0         if ($to) {
334             $w = AE::timer $to, 0,
335 0     0     sub { $self->_drain_cv->croak("Timed out after $to") };
  0            
336             }
337              
338 0           $self->_drain_cv->recv;
339 0           $self->_clear_drain_cv;
340 0           undef $w;
341              
342 0           return;
343             }
344              
345             my %event_handlers = (
346             cb => '_handle_callback',
347             cbd => '_handle_callback_destroy',
348             chd => '_handle_channel_destroy',
349             cdw => '_handle_connection_drain_writes',
350             i => '_handle_info',
351             );
352              
353             sub _on_event {
354 0     0     my $self = shift;
355 0           my $type = shift;
356              
357 0 0         if (my $handler = $event_handlers{$type}) {
358 0           $self->$handler(@_);
359             } else {
360 0           croak "Unknown event type: '$type'";
361             }
362              
363 0           return;
364             }
365              
366             sub _handle_callback { ## no critic (Subroutines::RequireArgUnpacking)
367 0     0     my $self = shift;
368 0           my $sig = shift;
369 0           my ($id, $event, $method, $pkg) = @$sig;
370              
371 0 0         warn "_handle_callback $id $event $method $pkg\n" if $self->verbose;
372              
373 0 0         if (my $cb = $self->cb_registry->{$id}) {
374 0 0 0       if (ref($_[0]) eq 'REF' and ref(${ $_[0] }) eq 'ARRAY') {
  0            
375 0           my ($class, @args) = @{ ${ $_[0] } };
  0            
  0            
376              
377 0 0         if ($class eq 'AnyEvent::RabbitMQ') {
    0          
378 0           $_[0] = $self;
379             } elsif ($class eq 'AnyEvent::RabbitMQ::Channel') {
380 0           my $channel_id = shift @args;
381 0   0       $_[0] = $self->channels->{$channel_id}
382             ||= $self->channel_class->new(
383             id => $channel_id,
384             connection => $self
385             );
386             } else {
387 0           croak "Unknown class type: '$class'";
388             }
389             }
390              
391 0           goto &$cb;
392             } else {
393 0           croak "Unknown callback id: '$id'";
394             }
395              
396 0           return;
397             }
398              
399             sub _handle_info {
400 0     0     my ($self, $info) = @_;
401              
402 0           $self->_handle_connection_info(%{ delete $info->{connection} })
403 0 0         if $info->{connection};
404              
405 0           $self->_handle_channel_info($_, %{ $info->{$_} }) foreach keys %$info;
  0            
406              
407 0           return;
408             }
409              
410             # channel information passback
411             sub _handle_channel_info {
412 0     0     my ($self, $ch_id, %args) = @_;
413              
414 0 0         warn "_handle_channel_info $ch_id @{[ %args ]}\n" if $self->verbose;
  0            
415              
416 0 0         if (my $ch = $self->channels->{$ch_id}) {
417 0           @$ch{ keys %args } = values %args;
418             } else {
419 0           croak "Unknown channel: '$ch_id'";
420             }
421              
422 0           return;
423             }
424              
425             sub _handle_channel_destroy {
426 0     0     my ($self, $ch_id) = @_;
427              
428 0 0         warn "_handle_channel_destroy $ch_id\n" if $self->verbose;
429              
430 0           delete $self->channels->{$ch_id};
431              
432 0           return;
433             }
434              
435             # connection information passback
436             sub _handle_connection_info {
437 0     0     my ($self, %args) = @_;
438              
439 0 0         warn "_handle_connection_info @{[ %args ]}\n" if $self->verbose;
  0            
440              
441 0           @$self{ keys %args } = values %args;
442              
443 0           return;
444             }
445              
446             sub _handle_callback_destroy {
447 0     0     my ($self, $id, $event, $method, $pkg) = @_;
448              
449 0 0         warn "_handle_callback_destroy $id $event $method $pkg\n" if $self->verbose;
450              
451 0           delete $self->cb_registry->{$id};
452              
453 0           return;
454             }
455              
456             sub _handle_connection_drain_writes {
457 0     0     my $self = shift;
458              
459 0 0         $self->_drain_cv->send if $self->_has_drain_cv;
460              
461 0           return;
462             }
463              
464             sub _on_error {
465 0     0     my $self = shift;
466              
467 0           croak @_;
468             }
469              
470             sub _on_destroy {
471 0     0     my $self = shift;
472              
473 0 0         warn "_on_destroy\n" if $self->verbose;
474              
475             # TODO implement reconnect
476 0           return;
477             }
478              
479             sub DEMOLISH {
480 0     0 0   my ($self, $in_gd) = @_;
481 0 0         return if $in_gd;
482 0 0         return unless $self->has_rpc;
483              
484 0           $self->rpc->(DEMOLISH => 0, my $cv = AE::cv);
485              
486 0           $cv->recv;
487              
488 0           $self->clear_rpc;
489              
490 0           return;
491             }
492              
493             =head1 AUTHOR
494              
495             William Cox
496              
497             =head1 COPYRIGHT
498              
499             Copyright (c) 2014, the above named author(s).
500              
501             =head1 LICENSE
502              
503             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
504              
505             =cut
506              
507             1;