File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Remote.pm
Criterion Covered Total %
statement 51 61 83.6
branch 5 12 41.6
condition 1 2 50.0
subroutine 20 23 86.9
pod 0 4 0.0
total 77 102 75.4


line stmt bran cond sub pod time code
1             #
2             # Copyright 2008 Paul Driver <frodwith@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Remote;
19 1     1   6714 use Moose;
  1         3  
  1         9  
20 1     1   8582 use Data::UUID;
  1         5  
  1         103  
21 1     1   13 use POE;
  1         5  
  1         12  
22 1     1   1464 use POE::Component::Client::TCP;
  1         3268  
  1         64  
23 1     1   11 use POE::Component::MessageQueue::Storage;
  1         4  
  1         1153  
24              
25             has servers => (
26             is => 'ro',
27             isa => 'ArrayRef[HashRef]',
28             required => 1,
29             );
30              
31             has session_id => (
32             is => 'rw',
33             isa => 'Int',
34             init_arg => undef,
35             );
36              
37             has idmaker => (
38             is => 'ro',
39             isa => 'Data::UUID',
40             init_arg => undef,
41             default => sub { Data::UUID->new() },
42             );
43              
44             sub add_server_method
45             {
46 10     10 0 471 my ($class, $method) = @_;
47             $class->meta->add_method($method, sub {
48 1022     1022   2142688 my $self = shift;
        1022      
        1022      
        1022      
        1022      
        1022      
        1022      
        1022      
49 1022         46626 $poe_kernel->post($self->session_id, remote_call => {
50             method => $method,
51             args => [@_],
52             });
53 10         27 });
54             }
55              
56             my %methods = map {$_ => 1}
57             POE::Component::MessageQueue::Storage->meta->get_required_method_list;
58              
59             delete $methods{storage_shutdown};
60              
61             __PACKAGE__->add_server_method($_) foreach (keys %methods);
62              
63             sub storage_shutdown
64             {
65 1     1 0 848 my ($self, $callback) = @_;
66 1         44 $poe_kernel->post($self->session_id, 'shutdown');
67 1         117 goto $callback;
68             }
69              
70             with qw(POE::Component::MessageQueue::Storage);
71              
72             sub BUILD
73             {
74 1     1 0 1345 my ($self, $args) = @_;
75              
76 1         38 my $servers = $self->servers;
77 1         5 my $si = 1;
78 1         4 my $total_fail = 0;
79             my $retry = sub {
80 0 0   0   0 if ($si >= @$servers)
81             {
82 0 0       0 if (++$total_fail > 2)
83             {
84 0         0 $self->log(emergency =>
85             "Tried to connect to all servers $total_fail times without success."
86             );
87             }
88 0         0 $si = 0;
89             }
90 0         0 my $info = $servers->[$si++];
91 0         0 $poe_kernel->delay(connect => 2 => $info->{host} => $info->{port});
92 1         14 };
93              
94 1         2 my ($host, $port) = @{$servers->[0]}{'host', 'port'};
  1         6  
95              
96             $self->session_id(POE::Component::Client::TCP->new(
97             RemoteAddress => $host,
98             RemotePort => $port,
99             ConnectTimeout => 3,
100             Filter => POE::Filter::Reference->new,
101             ObjectStates => [ $self => ['remote_call'] ],
102              
103             Connected => sub {
104 1     1   1605 my $heap = $_[HEAP];
105             $heap->{server}->put($_)
106 1         3 foreach map { $_->{request} } (values %{ $heap->{calls} });
  1         11  
  1         6  
107 1         182 $total_fail = 0;
108             },
109              
110             ConnectError => sub {
111 0     0   0 $self->log(error => "Could not connect to $host:$port ($total_fail)");
112 0         0 goto $retry;
113             },
114              
115             Disconnected => sub {
116 1 50   1   1087 goto $retry unless $_[HEAP]->{shutdown};
117             },
118              
119             ServerInput => sub {
120 1022     1022   2729768 my ($heap, $request) = @_[HEAP, ARG0];
121 1022         2928 my $id = $request->{callback};
122 1022   50     3989 my $args = $request->{args} || [];
123 1022         3639 my $call = delete $heap->{calls}->{$id};
124 1022 50       4230 if (my $code = $call->{callback})
125             {
126 1022         4387 $code->(@$args);
127             }
128             },
129              
130             ServerError => sub {
131 0     0   0 $self->log(error => "Remote error on ". $_[ARG0] . ": " . $_[ARG2]);
132 0         0 goto $retry;
133             },
134              
135 1         19 ));
136             }
137              
138             sub remote_call
139             {
140 1022     1022 0 209226 my ($self, $heap, $request) = @_[OBJECT, HEAP, ARG0];
141              
142 1022         42302 my $id = $self->idmaker->create_b64();
143 1022         5376 my $call = $heap->{calls}->{$id} = { request => $request };
144              
145 1022         2735 my $args = $request->{args};
146 1022         2643 my $last = $args->[-1];
147 1022 50       4427 if (ref $last eq 'CODE')
148             {
149 1022         2682 $call->{callback} = $last;
150 1022         2673 $args->[-1] = $id;
151             }
152              
153 1022 100       6227 $heap->{server}->put($request) if $heap->{connected};
154             }
155              
156             1;
157              
158             __END__
159              
160             =pod
161              
162             =head1 NAME
163              
164             POE::Component::MessageQueue::Storage::Remote -- Access a remote storage
165             engine via a TCP socket
166              
167             =head1 DESCRIPTION
168              
169             With this module, you can talk to a storage engine running under
170             L<POE::Component::MessageQueue::Storage::Remote::Server> transparently. You
171             can treat this like a normal local store once it's set up, and it can
172             optionally failover to other stores.
173              
174             =head1 CONSTRUCTOR PARAMETERS
175              
176             =over 2
177              
178             =item servers
179              
180             An arrayref of hashrefs of the form C<< {host => 'hostname, port => port} >>.
181             Remote will try these servers in a round robin fashion whenever it fails to
182             connect or gets disconnected. Passing in just one server is an effective way
183             to say "keep connecting to this server until it's up and reconnect to it if
184             you get disconnected."
185              
186             =back
187              
188             =head1 SEE ALSO
189              
190             L<POE::Component::MessageQueue::Storage::Remote::Server>
191              
192             =cut
193