File Coverage

blib/lib/AnyEvent/RabbitMQ/Fork/Worker.pm
Criterion Covered Total %
statement 24 89 26.9
branch 0 26 0.0
condition 0 28 0.0
subroutine 8 20 40.0
pod 0 2 0.0
total 32 165 19.3


line stmt bran cond sub pod time code
1             package AnyEvent::RabbitMQ::Fork::Worker;
2             $AnyEvent::RabbitMQ::Fork::Worker::VERSION = '0.6';
3             =head1 NAME
4              
5             AnyEvent::RabbitMQ::Fork::Worker - Fork side magic
6              
7             =head1 DESCRIPTION
8              
9             No user serviceable parts inside. Venture at your own risk.
10              
11             =cut
12              
13 1     1   1832 use Moo;
  1         4  
  1         8  
14 1     1   346 use Types::Standard qw(InstanceOf Bool);
  1         2  
  1         9  
15 1     1   787 use Guard;
  1         3  
  1         67  
16 1     1   6 use Scalar::Util qw(weaken blessed);
  1         2  
  1         61  
17              
18 1     1   7 use namespace::clean;
  1         2  
  1         10  
19              
20 1     1   937 use AnyEvent::RabbitMQ 1.18;
  1         48739  
  1         1039  
21              
22             has verbose => (is => 'rw', isa => Bool, default => 0);
23              
24             has connection => (
25             is => 'lazy',
26             isa => InstanceOf['AnyEvent::RabbitMQ'],
27             clearer => 1,
28             handles => ['channels'],
29             );
30              
31             sub _build_connection {
32 0     0     my $self = shift;
33              
34 0           my $conn = AnyEvent::RabbitMQ->new(verbose => $self->verbose);
35              
36 0           _cb_hooks($conn);
37              
38 0           return $conn;
39             }
40              
41             ### RPC Interface ###
42              
43             my $instance;
44              
45             sub init {
46 0     0 0   my $class = shift;
47 0           $instance = $class->new(@_);
48 0           return;
49             }
50              
51             sub run {
52 0     0 0   my ($done, $method, $ch_id, @args, %args) = @_;
53              
54 0           weaken(my $self = $instance);
55              
56 0 0         unless (@args % 2) {
57 0           %args = @args;
58 0           @args = ();
59 0           foreach my $event (grep { /^on_/ } keys %args) {
  0            
60             # callback signature provided by parent process
61 0           my $sig = delete $args{$event};
62              
63             # our callback to be used by AE::RMQ
64 0           $args{$event} = $self->_generate_callback($method, $event, $sig);
65             }
66             }
67              
68 0           my @error;
69 0 0 0       if (defined $ch_id and my $ch = $self->channels->{ $ch_id }) {
    0 0        
70 0 0         $ch->$method(@args ? @args : %args);
71             } elsif (defined $ch_id and $ch_id == 0) {
72 0 0         if ($method eq 'DEMOLISH') {
73 0           $self->clear_connection;
74             } else {
75 0 0         $self->connection->$method(@args ? @args : %args);
76             }
77             } else {
78 0   0       $ch_id ||= '';
79 0           push @error, "Unknown channel: '$ch_id'";
80             }
81              
82 0           return $done->(@error);
83             }
84              
85             my %cb_hooks = (
86             channel => {
87             _state => 'is_open',
88             _is_active => 'is_active',
89             _is_confirm => 'is_confirm',
90             },
91             connection => {
92             _state => 'is_open',
93             _login_user => 'login_user',
94             _server_properties => 'server_properties',
95             }
96             );
97             sub _cb_hooks {
98 0     0     weaken(my $obj = shift);
99              
100             my ($type, $hooks)
101             = $obj->isa('AnyEvent::RabbitMQ')
102             ? ('connection', $cb_hooks{connection})
103 0 0         : ($obj->id, $cb_hooks{channel});
104              
105 0           foreach my $prop (keys %$hooks) {
106 0           my $method = $hooks->{$prop};
107             ## no critic (Miscellanea::ProhibitTies)
108             tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',
109             $obj->{$prop}, sub {
110 0     0     AnyEvent::Fork::RPC::event(
111             i => { $type => { $method => $obj->$method } });
112 0           };
113             }
114              
115 0           return;
116             }
117              
118             sub _generate_callback {
119 0     0     my ($self, $method, $event, $sig) = @_;
120              
121 0           my $is_conn = $sig->[-1] eq 'AnyEvent::RabbitMQ::Fork';
122              
123 0   0       my $should_clear_connection
124             = ($is_conn and ($method eq 'close' or ($method eq 'connect' and $event eq 'on_close')));
125              
126 0   0       my $open_channel_success = ($method eq 'open_channel' and $event eq 'on_success');
127              
128             my $guard = guard {
129             # inform parent process that this callback is no longer needed
130 0     0     AnyEvent::Fork::RPC::event(cbd => @$sig);
131 0           };
132              
133             # our callback to be used by AE::RMQ
134 0           weaken(my $wself = $self);
135             return sub {
136 0     0     $guard if 0; # keepalive
137              
138 0 0         $wself->clear_connection if $should_clear_connection;
139              
140 0   0       my $blessed = blessed($_[0]) || 'UNIVERSAL';
141 0 0 0       if ($blessed->isa('AnyEvent::RabbitMQ') or $blessed->isa('AnyEvent::RabbitMQ::Channel')) {
142             # we put our sentry value in place later
143 0           my $obj = shift;
144             # this is our signal back to the parent as to what kind of object it was
145 0 0         unshift @_, \[$blessed, ($obj->isa('AnyEvent::RabbitMQ::Channel') ? $obj->id : ())];
146              
147 0 0         if ($open_channel_success) {
148 0           my $id = $obj->id;
149             $obj->{"_$wself\_guard"} ||= guard {
150             # channel was GC'd by AE::RMQ
151 0           AnyEvent::Fork::RPC::event(chd => $id);
152 0   0       };
153              
154             # needs to be done after parent registers channel
155 0           AE::postpone { _cb_hooks($obj) };
  0            
156             }
157              
158 0 0         if ($obj->isa('AnyEvent::RabbitMQ')) {
159             # replace with our own handling
160 0           $obj->{_handle}->on_drain(sub { AnyEvent::Fork::RPC::event('cdw') });
  0            
161             }
162             }
163              
164             # these values don't pass muster with Storable
165 0 0 0       delete local @{ $_[0] }{ 'fh', 'on_error', 'on_drain' }
  0   0        
166             #if $blessed->isa('AnyEvent::Handle');
167             if $method eq 'connect' and $event eq 'on_failure' and $blessed->isa('AnyEvent::Handle');
168              
169             # tell the parent to run the users callback known by $sig
170 0           AnyEvent::Fork::RPC::event(cb => $sig, @_);
171 0           };
172             }
173              
174             =head1 AUTHOR
175              
176             William Cox
177              
178             =head1 COPYRIGHT
179              
180             Copyright (c) 2014, the above named author(s).
181              
182             =head1 LICENSE
183              
184             This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
185              
186             =cut
187              
188             package # hide from PAUSE
189             AnyEvent::RabbitMQ::Fork::Worker::TieScalar;
190              
191 1     1   21 use strict;
  1         2  
  1         29  
192 1     1   5 use warnings;
  1         3  
  1         195  
193              
194 0     0     sub TIESCALAR { $_[2]->(); return bless [$_[1], $_[2]] => $_[0] }
  0            
195 0     0     sub FETCH { return $_[0][0] }
196 0     0     sub STORE { $_[0][1]->(); return $_[0][0] = $_[1] }
  0            
197 0     0     sub DESTROY { return @{ $_[0] } = () }
  0            
198              
199             1;