File Coverage

blib/lib/AnyEvent/RabbitMQ/Fork/Worker.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Fork::Worker;
2             $AnyEvent::RabbitMQ::Fork::Worker::VERSION = '0.4';
3             =head1 NAME
4              
5             AnyEvent::RabbitMQ::Fork::Worker - Fork side magic
6              
7             =head1 DESCRIPTION
8              
9             No user serviceable parts inside. Venture at your own risk.
10              
11             =cut
12              
13 1     1   1539 use Moo;
  1         3  
  1         7  
14 1     1   401 use Types::Standard qw(InstanceOf Bool);
  1         2  
  1         11  
15 1     1   1711 use Guard;
  1         764  
  1         76  
16 1     1   9 use Scalar::Util qw(weaken blessed);
  1         2  
  1         58  
17              
18 1     1   6 use namespace::clean;
  1         3  
  1         9  
19              
20 1     1   774 use AnyEvent::RabbitMQ;
  0            
  0            
21              
22             has verbose => (is => 'rw', isa => Bool, default => 0);
23              
24             has connection => (
25             is => 'lazy',
26             isa => InstanceOf['AnyEvent::RabbitMQ'],
27             clearer => 1,
28             handles => ['channels'],
29             );
30              
31             sub _build_connection {
32             my $self = shift;
33              
34             my $conn = AnyEvent::RabbitMQ->new(verbose => $self->verbose);
35              
36             _cb_hooks($conn);
37              
38             return $conn;
39             }
40              
41             ### RPC Interface ###
42              
43             my $instance;
44              
45             sub init {
46             my $class = shift;
47             $instance = $class->new(@_);
48             return;
49             }
50              
51             sub run {
52             my ($done, $method, $ch_id, @args, %args) = @_;
53              
54             weaken(my $self = $instance);
55              
56             unless (@args % 2) {
57             %args = @args;
58             @args = ();
59             foreach my $event (grep { /^on_/ } keys %args) {
60             # callback signature provided by parent process
61             my $sig = delete $args{$event};
62              
63             my $guard = guard {
64             # inform parent process that this callback is no longer needed
65             AnyEvent::Fork::RPC::event(cbd => @$sig);
66             };
67              
68             # our callback to be used by AE::RMQ
69             $args{$event} = sub {
70             $guard if 0; # keepalive
71              
72             $self->clear_connection
73             if $sig->[-1] eq 'AnyEvent::RabbitMQ'
74             and ($method eq 'close'
75             or ($method eq 'connect' and $event eq 'on_close'));
76              
77             if ((my $isa = blessed $_[0] || q{}) =~ /^AnyEvent::RabbitMQ/) {
78             my $obj = shift;
79             if ($method eq 'open_channel' and $event eq 'on_success') {
80             my $id = $obj->id; # $ch_id == 0 in this scope
81             $obj->{"_$self\_guard"} ||= guard {
82             AnyEvent::Fork::RPC::event(chd => $id);
83             };
84              
85             # needs to be done parent registers channel
86             AE::postpone { _cb_hooks($obj) };
87             }
88              
89             if ($isa eq 'AnyEvent::RabbitMQ') {
90             # replace with our own handling
91             $obj->{_handle}->on_drain(
92             sub {
93             AnyEvent::Fork::RPC::event('cdw');
94             }
95             );
96             }
97              
98             unshift @_,
99             \[
100             $isa,
101             ($isa eq 'AnyEvent::RabbitMQ::Channel' ? $obj->id : ())
102             ];
103             }
104              
105             # these values don't pass muster with Storable
106             delete local @{ $_[0] }{ 'fh', 'on_error', 'on_drain' }
107             if $method eq 'connect'
108             and $event = 'on_failure'
109             and blessed $_[0];
110              
111             AnyEvent::Fork::RPC::event(cb => $sig, @_);
112             };
113             }
114             }
115              
116             if (defined $ch_id and my $ch = $self->channels->{ $ch_id }) {
117             $ch->$method(@args ? @args : %args);
118              
119             $done->();
120             } elsif (defined $ch_id and $ch_id == 0) {
121             if ($method eq 'DEMOLISH') {
122             $self->clear_connection;
123             } else {
124             $self->connection->$method(@args ? @args : %args);
125             }
126              
127             $done->();
128             } else {
129             $ch_id ||= '';
130             $done->("Unknown channel: '$ch_id'");
131             }
132              
133             return;
134             }
135              
136             my %cb_hooks = (
137             channel => {
138             _state => 'is_open',
139             _is_active => 'is_active',
140             _is_confirm => 'is_confirm',
141             },
142             connection => {
143             _state => 'is_open',
144             _login_user => 'login_user',
145             _server_properties => 'server_properties',
146             }
147             );
148             sub _cb_hooks {
149             weaken(my $obj = shift);
150              
151             my ($type, $hooks)
152             = $obj->isa('AnyEvent::RabbitMQ')
153             ? ('connection', $cb_hooks{connection})
154             : ($obj->id, $cb_hooks{channel});
155              
156             foreach my $prop (keys %$hooks) {
157             my $method = $hooks->{$prop};
158             ## no critic (Miscellanea::ProhibitTies)
159             tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',
160             $obj->{$prop}, sub {
161             AnyEvent::Fork::RPC::event(
162             i => { $type => { $method => $obj->$method } });
163             };
164             }
165              
166             return;
167             }
168              
169             =head1 AUTHOR
170              
171             William Cox
172              
173             =head1 COPYRIGHT
174              
175             Copyright (c) 2014, the above named author(s).
176              
177             =head1 LICENSE
178              
179             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
180              
181             =cut
182              
183             package # hide from PAUSE
184             AnyEvent::RabbitMQ::Fork::Worker::TieScalar;
185              
186             use strict;
187             use warnings;
188              
189             sub TIESCALAR { $_[2]->(); return bless [$_[1], $_[2]] => $_[0] }
190             sub FETCH { return $_[0][0] }
191             sub STORE { $_[0][1]->(); return $_[0][0] = $_[1] }
192             sub DESTROY { return @{ $_[0] } = () }
193              
194             1;