File Coverage

blib/lib/AnyEvent/Stomper/Cluster.pm
Criterion Covered Total %
statement 105 185 56.7
branch 11 46 23.9
condition 0 23 0.0
subroutine 21 32 65.6
pod 5 5 100.0
total 142 291 48.8


line stmt bran cond sub pod time code
1             package AnyEvent::Stomper::Cluster;
2              
3 2     2   3421 use 5.008000;
  2         6  
4 2     2   8 use strict;
  2         2  
  2         32  
5 2     2   5 use warnings;
  2         2  
  2         74  
6 2     2   6 use base qw( Exporter );
  2         2  
  2         186  
7              
8             our $VERSION = '0.33_01';
9              
10 2     2   8 use AnyEvent::Stomper;
  2         3  
  2         50  
11 2     2   8 use AnyEvent::Stomper::Error;
  2         2  
  2         43  
12              
13 2     2   6 use Scalar::Util qw( weaken );
  2         3  
  2         78  
14 2     2   6 use Carp qw( croak );
  2         2  
  2         152  
15              
16             our %ERROR_CODES;
17              
18             BEGIN {
19 2     2   9 %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES;
20 2         6 our @EXPORT_OK = keys %ERROR_CODES;
21 2         40 our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
22             }
23              
24 2     2   48 use constant \%ERROR_CODES;
  2         2  
  2         645  
25              
26             my %ACK_CMDS = (
27             ACK => 1,
28             NACK => 1,
29             );
30              
31             my %CAN_REPEAT = (
32             SEND => 1,
33             SUBSCRIBE => 1,
34             BEGIN => 1,
35             );
36              
37              
38             sub new {
39 4     4 1 4765 my $class = shift;
40 4         8 my %params = @_;
41              
42 4         7 my $self = bless {}, $class;
43              
44 4 100       12 unless ( defined $params{nodes} ) {
45 1         131 croak 'Nodes not specified';
46             }
47 3 100       9 unless ( ref( $params{nodes} ) eq 'ARRAY' ) {
48 1         108 croak 'Nodes must be specified as array reference';
49             }
50 2 100       4 unless ( @{ $params{nodes} } ) {
  2         6  
51 1         171 croak 'Specified empty list of nodes';
52             }
53              
54 1         4 $self->{nodes} = $params{nodes};
55 1         2 $self->{on_node_connect} = $params{on_node_connect};
56 1         1 $self->{on_node_disconnect} = $params{on_node_disconnect};
57 1         2 $self->{on_node_error} = $params{on_node_error};
58 1         4 $self->on_error( $params{on_error} );
59              
60 1         2 my %node_params;
61 1         2 foreach my $name ( qw( login passcode vhost heartbeat connection_timeout
62             reconnect_interval handle_params default_headers command_headers ) )
63             {
64 9 50       13 next unless defined $params{$name};
65 0         0 $node_params{$name} = $params{$name};
66             }
67 1         2 $self->{_node_params} = \%node_params;
68              
69 1         4 $self->_reset_internals;
70 1         3 $self->_init;
71              
72 1         3 return $self;
73             }
74              
75             sub execute {
76 0     0 1 0 my $self = shift;
77 0         0 my $cmd_name = shift;
78              
79 0         0 my $cmd = $self->_prepare( $cmd_name, [@_] );
80 0         0 $self->_execute($cmd);
81              
82 0         0 return;
83             }
84              
85             # Generate methods
86             {
87 2     2   9 no strict qw( refs );
  2         2  
  2         1941  
88              
89             foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit
90             abort disconnect ) )
91             {
92             *{$name} = sub {
93 0     0   0 my $self = shift;
94              
95 0         0 my $cmd = $self->_prepare( $name, [@_] );
96 0         0 $self->_execute($cmd);
97              
98 0         0 return;
99             }
100             }
101             }
102              
103             sub nodes {
104 1     1 1 3655 my $self = shift;
105 1         2 return values %{ $self->{_nodes_pool} };
  1         4  
106             }
107              
108             sub on_error {
109 1     1 1 1 my $self = shift;
110              
111 1 50       3 if (@_) {
112 1         1 my $on_error = shift;
113              
114 1 50       3 if ( defined $on_error ) {
115 0         0 $self->{on_error} = $on_error;
116             }
117             else {
118             $self->{on_error} = sub {
119 0     0   0 my $err = shift;
120 0         0 warn $err->message . "\n";
121 1         5 };
122             }
123             }
124              
125 1         1 return $self->{on_error};
126             }
127              
128             sub force_disconnect {
129 0     0 1 0 my $self = shift;
130              
131 0         0 foreach my $node ( $self->nodes ) {
132 0         0 $node->force_disconnect;
133             }
134 0         0 $self->_reset_internals;
135              
136 0         0 return;
137             }
138              
139             sub _init {
140 1     1   1 my $self = shift;
141              
142 1         1 my $nodes_pool = $self->{_nodes_pool};
143              
144 1         2 foreach my $node_params ( @{ $self->{nodes} } ) {
  1         2  
145 3         7 my $hostport = "$node_params->{host}:$node_params->{port}";
146              
147 3 50       6 unless ( defined $nodes_pool->{$hostport} ) {
148             $nodes_pool->{$hostport}
149 3         5 = $self->_new_node( $node_params->{host}, $node_params->{port} );
150             }
151             }
152              
153 1         2 $self->{_nodes} = [ keys %{ $self->{_nodes_pool} } ];
  1         3  
154 1         4 $self->{_active_node} = $self->_next_node;
155              
156 1         1 return;
157             }
158              
159             sub _new_node {
160 3     3   3 my $self = shift;
161 3         3 my $host = shift;
162 3         3 my $port = shift;
163              
164             return AnyEvent::Stomper->new(
165 3         2 %{ $self->{_node_params} },
  3         8  
166             host => $host,
167             port => $port,
168             lazy => 1,
169             on_connect => $self->_create_on_node_connect( $host, $port ),
170             on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
171             on_error => $self->_create_on_node_error( $host, $port ),
172             );
173             }
174              
175             sub _create_on_node_connect {
176 3     3   2 my $self = shift;
177 3         3 my $host = shift;
178 3         4 my $port = shift;
179              
180 3         5 weaken($self);
181              
182             return sub {
183 0 0   0   0 if ( defined $self->{on_node_connect} ) {
184 0         0 $self->{on_node_connect}->( $host, $port );
185             }
186 3         11 };
187             }
188              
189             sub _create_on_node_disconnect {
190 3     3   2 my $self = shift;
191 3         3 my $host = shift;
192 3         2 my $port = shift;
193              
194 3         4 weaken($self);
195              
196             return sub {
197 0 0   0   0 if ( defined $self->{on_node_disconnect} ) {
198 0         0 $self->{on_node_disconnect}->( $host, $port );
199             }
200 3         8 };
201             }
202              
203             sub _create_on_node_error {
204 3     3   3 my $self = shift;
205 3         3 my $host = shift;
206 3         3 my $port = shift;
207              
208 3         5 weaken($self);
209              
210             return sub {
211 0     0   0 my $err = shift;
212              
213 0         0 my $err_code = $err->code;
214              
215 0 0 0     0 if ( $err_code != E_OPRN_ERROR
216             && $err_code != E_CONN_CLOSED_BY_CLIENT )
217             {
218 0         0 $self->{_active_node} = $self->_next_node;
219             }
220              
221 0 0       0 if ( defined $self->{on_node_error} ) {
222 0         0 $self->{on_node_error}->( $err, $host, $port );
223             }
224 3         10 };
225             }
226              
227             sub _prepare {
228 0     0   0 my $self = shift;
229 0         0 my $cmd_name = uc(shift);
230 0         0 my $args = shift;
231              
232 0         0 my %params;
233              
234 0 0 0     0 if ( ref( $args->[-1] ) eq 'CODE'
235 0         0 && scalar @{$args} % 2 > 0 )
236             {
237 0 0       0 if ( $cmd_name eq 'SUBSCRIBE' ) {
238 0         0 $params{on_message} = pop @{$args};
  0         0  
239             }
240             else {
241 0         0 $params{on_receipt} = pop @{$args};
  0         0  
242             }
243             }
244              
245 0         0 my %headers = @{$args};
  0         0  
246              
247 0         0 foreach my $name ( qw( body on_receipt on_message on_node_error ) ) {
248 0 0       0 if ( defined $headers{$name} ) {
249 0         0 $params{$name} = delete $headers{$name};
250             }
251             }
252 0 0       0 if ( exists $ACK_CMDS{$cmd_name} ) {
253 0         0 $params{message} = delete $headers{message};
254             }
255              
256 0         0 my $cmd = {
257             name => $cmd_name,
258             headers => \%headers,
259             %params,
260             };
261              
262 0 0       0 unless ( defined $cmd->{on_receipt} ) {
263 0         0 weaken($self);
264              
265             $cmd->{on_receipt} = sub {
266 0     0   0 my $receipt = shift;
267 0         0 my $err = shift;
268              
269 0 0       0 if ( defined $err ) {
270 0         0 $self->{on_error}->($err);
271 0         0 return;
272             }
273 0         0 };
274             }
275              
276 0         0 return $cmd;
277             }
278              
279             sub _execute {
280 0     0   0 my $self = shift;
281 0         0 my $cmd = shift;
282 0   0     0 my $fails_cnt = shift || 0;
283              
284 0         0 my $hostport = $self->{_active_node};
285 0         0 my $node = $self->{_nodes_pool}{$hostport};
286              
287 0         0 weaken($self);
288              
289 0         0 $node->execute( $cmd->{name}, %{ $cmd->{headers} },
290             body => $cmd->{body},
291              
292             on_receipt => sub {
293 0     0   0 my $receipt = shift;
294 0         0 my $err = shift;
295              
296 0 0       0 if ( defined $err ) {
297 0         0 my $err_code = $err->code;
298              
299 0   0     0 my $on_node_error = $cmd->{on_node_error} || $self->{on_node_error};
300 0 0       0 if ( defined $on_node_error ) {
301 0         0 my $node = $self->{_nodes_pool}{$hostport};
302 0         0 $on_node_error->( $err, $node->host, $node->port );
303             }
304              
305 0 0 0     0 if ( $CAN_REPEAT{ $cmd->{name} }
      0        
      0        
306             && $err_code != E_OPRN_ERROR
307             && $err_code != E_CONN_CLOSED_BY_CLIENT
308 0         0 && ++$fails_cnt < scalar @{ $self->{_nodes} } )
309             {
310 0         0 $self->_execute( $cmd, $fails_cnt );
311 0         0 return;
312             }
313              
314 0         0 $cmd->{on_receipt}->( $receipt, $err );
315              
316 0         0 return;
317             }
318              
319 0         0 $cmd->{on_receipt}->($receipt);
320             },
321              
322             defined $cmd->{message}
323             ? ( message => $cmd->{message} )
324             : (),
325              
326             defined $cmd->{on_message}
327             ? ( on_message => $cmd->{on_message} )
328 0 0       0 : (),
    0          
329             );
330              
331 0         0 return;
332             }
333              
334             sub _next_node {
335 1     1   1 my $self = shift;
336              
337 1 50 0     2 unless ( defined $self->{_node_index} ) {
338 1         1 $self->{_node_index} = int( rand( scalar @{ $self->{_nodes} } ) );
  1         32  
339             }
340             elsif ( $self->{_node_index} == scalar @{ $self->{_nodes} } ) {
341             $self->{_node_index} = 0;
342             }
343              
344 1         4 return $self->{_nodes}[ $self->{_node_index}++ ];
345             }
346              
347             sub _reset_internals {
348 1     1   2 my $self = shift;
349              
350 1         2 $self->{_nodes_pool} = {};
351 1         3 $self->{_nodes} = undef;
352 1         1 $self->{_node_index} = undef;
353 1         3 $self->{_active_node} = undef;
354              
355 1         2 return;
356             }
357              
358             1;
359             __END__