File Coverage

blib/lib/AnyEvent/RabbitMQ/Fork/Worker.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Fork::Worker;
2             $AnyEvent::RabbitMQ::Fork::Worker::VERSION = '0.5';
3             =head1 NAME
4              
5             AnyEvent::RabbitMQ::Fork::Worker - Fork side magic
6              
7             =head1 DESCRIPTION
8              
9             No user serviceable parts inside. Venture at your own risk.
10              
11             =cut
12              
13 1     1   954 use Moo;
  1         2  
  1         5  
14 1     1   265 use Types::Standard qw(InstanceOf Bool);
  1         2  
  1         11  
15 1     1   1100 use Guard;
  1         538  
  1         67  
16 1     1   5 use Scalar::Util qw(weaken blessed);
  1         1  
  1         48  
17              
18 1     1   5 use namespace::clean;
  1         1  
  1         8  
19              
20 1     1   416 use AnyEvent::RabbitMQ 1.18;
  0            
  0            
21              
22             has verbose => (is => 'rw', isa => Bool, default => 0);
23              
24             has connection => (
25             is => 'lazy',
26             isa => InstanceOf['AnyEvent::RabbitMQ'],
27             clearer => 1,
28             handles => ['channels'],
29             );
30              
31             sub _build_connection {
32             my $self = shift;
33              
34             my $conn = AnyEvent::RabbitMQ->new(verbose => $self->verbose);
35              
36             _cb_hooks($conn);
37              
38             return $conn;
39             }
40              
41             ### RPC Interface ###
42              
43             my $instance;
44              
45             sub init {
46             my $class = shift;
47             $instance = $class->new(@_);
48             return;
49             }
50              
51             sub run {
52             my ($done, $method, $ch_id, @args, %args) = @_;
53              
54             weaken(my $self = $instance);
55              
56             unless (@args % 2) {
57             %args = @args;
58             @args = ();
59             foreach my $event (grep { /^on_/ } keys %args) {
60             # callback signature provided by parent process
61             my $sig = delete $args{$event};
62              
63             # our callback to be used by AE::RMQ
64             $args{$event} = $self->_generate_callback($method, $event, $sig);
65             }
66             }
67              
68             if (defined $ch_id and my $ch = $self->channels->{ $ch_id }) {
69             $ch->$method(@args ? @args : %args);
70              
71             $done->();
72             } elsif (defined $ch_id and $ch_id == 0) {
73             if ($method eq 'DEMOLISH') {
74             $self->clear_connection;
75             } else {
76             $self->connection->$method(@args ? @args : %args);
77             }
78              
79             $done->();
80             } else {
81             $ch_id ||= '';
82             $done->("Unknown channel: '$ch_id'");
83             }
84              
85             return;
86             }
87              
88             my %cb_hooks = (
89             channel => {
90             _state => 'is_open',
91             _is_active => 'is_active',
92             _is_confirm => 'is_confirm',
93             },
94             connection => {
95             _state => 'is_open',
96             _login_user => 'login_user',
97             _server_properties => 'server_properties',
98             }
99             );
100             sub _cb_hooks {
101             weaken(my $obj = shift);
102              
103             my ($type, $hooks)
104             = $obj->isa('AnyEvent::RabbitMQ')
105             ? ('connection', $cb_hooks{connection})
106             : ($obj->id, $cb_hooks{channel});
107              
108             foreach my $prop (keys %$hooks) {
109             my $method = $hooks->{$prop};
110             ## no critic (Miscellanea::ProhibitTies)
111             tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',
112             $obj->{$prop}, sub {
113             AnyEvent::Fork::RPC::event(
114             i => { $type => { $method => $obj->$method } });
115             };
116             }
117              
118             return;
119             }
120              
121             sub _generate_callback {
122             my ($self, $method, $event, $sig) = @_;
123              
124             my $should_clear_connection = (
125             $sig->[-1] eq 'AnyEvent::RabbitMQ' and ($method eq 'close'
126             or ($method eq 'connect' and $event eq 'on_close'))
127             ) ? 1 : 0;
128              
129             my $guard = guard {
130             # inform parent process that this callback is no longer needed
131             AnyEvent::Fork::RPC::event(cbd => @$sig);
132             };
133              
134             # our callback to be used by AE::RMQ
135             weaken(my $wself = $self);
136             return sub {
137             $guard if 0; # keepalive
138              
139             $wself->clear_connection if $should_clear_connection;
140              
141             if ((my $isa = blessed $_[0] || q{}) =~ /^AnyEvent::RabbitMQ/) {
142             # we put our sentry value in place later
143             my $obj = shift;
144              
145             if ($method eq 'open_channel' and $event eq 'on_success') {
146             my $id = $obj->id;
147             $obj->{"_$wself\_guard"} ||= guard {
148             # channel was GC'd by AE::RMQ
149             AnyEvent::Fork::RPC::event(chd => $id);
150             };
151              
152             # needs to be done after parent registers channel
153             AE::postpone { _cb_hooks($obj) };
154             }
155              
156             if ($isa eq 'AnyEvent::RabbitMQ') {
157             # replace with our own handling
158             $obj->{_handle}
159             ->on_drain(sub { AnyEvent::Fork::RPC::event('cdw') });
160             }
161              
162             # this is our signal back to the parent as to what kind of object
163             # it was
164             unshift @_,
165             \[$isa, ($isa eq 'AnyEvent::RabbitMQ::Channel' ? $obj->id : ())];
166             }
167              
168             # these values don't pass muster with Storable
169             delete local @{ $_[0] }{ 'fh', 'on_error', 'on_drain' }
170             if $method eq 'connect'
171             and $event = 'on_failure'
172             and blessed $_[0];
173              
174             # tell the parent to run the users callback known by $sig
175             AnyEvent::Fork::RPC::event(cb => $sig, @_);
176             };
177             }
178              
179             =head1 AUTHOR
180              
181             William Cox
182              
183             =head1 COPYRIGHT
184              
185             Copyright (c) 2014, the above named author(s).
186              
187             =head1 LICENSE
188              
189             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
190              
191             =cut
192              
193             package # hide from PAUSE
194             AnyEvent::RabbitMQ::Fork::Worker::TieScalar;
195              
196             use strict;
197             use warnings;
198              
199             sub TIESCALAR { $_[2]->(); return bless [$_[1], $_[2]] => $_[0] }
200             sub FETCH { return $_[0][0] }
201             sub STORE { $_[0][1]->(); return $_[0][0] = $_[1] }
202             sub DESTROY { return @{ $_[0] } = () }
203              
204             1;