| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package AnyEvent::Stomper::Cluster; | 
| 2 |  |  |  |  |  |  |  | 
| 3 | 2 |  |  | 2 |  | 3421 | use 5.008000; | 
|  | 2 |  |  |  |  | 6 |  | 
| 4 | 2 |  |  | 2 |  | 8 | use strict; | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 32 |  | 
| 5 | 2 |  |  | 2 |  | 5 | use warnings; | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 74 |  | 
| 6 | 2 |  |  | 2 |  | 6 | use base qw( Exporter ); | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 186 |  | 
| 7 |  |  |  |  |  |  |  | 
| 8 |  |  |  |  |  |  | our $VERSION = '0.33_01'; | 
| 9 |  |  |  |  |  |  |  | 
| 10 | 2 |  |  | 2 |  | 8 | use AnyEvent::Stomper; | 
|  | 2 |  |  |  |  | 3 |  | 
|  | 2 |  |  |  |  | 50 |  | 
| 11 | 2 |  |  | 2 |  | 8 | use AnyEvent::Stomper::Error; | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 43 |  | 
| 12 |  |  |  |  |  |  |  | 
| 13 | 2 |  |  | 2 |  | 6 | use Scalar::Util qw( weaken ); | 
|  | 2 |  |  |  |  | 3 |  | 
|  | 2 |  |  |  |  | 78 |  | 
| 14 | 2 |  |  | 2 |  | 6 | use Carp qw( croak ); | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 152 |  | 
| 15 |  |  |  |  |  |  |  | 
| 16 |  |  |  |  |  |  | our %ERROR_CODES; | 
| 17 |  |  |  |  |  |  |  | 
| 18 |  |  |  |  |  |  | BEGIN { | 
| 19 | 2 |  |  | 2 |  | 9 | %ERROR_CODES = %AnyEvent::Stomper::Error::ERROR_CODES; | 
| 20 | 2 |  |  |  |  | 6 | our @EXPORT_OK   = keys %ERROR_CODES; | 
| 21 | 2 |  |  |  |  | 40 | our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK ); | 
| 22 |  |  |  |  |  |  | } | 
| 23 |  |  |  |  |  |  |  | 
| 24 | 2 |  |  | 2 |  | 48 | use constant \%ERROR_CODES; | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 645 |  | 
| 25 |  |  |  |  |  |  |  | 
| 26 |  |  |  |  |  |  | my %ACK_CMDS = ( | 
| 27 |  |  |  |  |  |  | ACK  => 1, | 
| 28 |  |  |  |  |  |  | NACK => 1, | 
| 29 |  |  |  |  |  |  | ); | 
| 30 |  |  |  |  |  |  |  | 
| 31 |  |  |  |  |  |  | my %CAN_REPEAT = ( | 
| 32 |  |  |  |  |  |  | SEND      => 1, | 
| 33 |  |  |  |  |  |  | SUBSCRIBE => 1, | 
| 34 |  |  |  |  |  |  | BEGIN     => 1, | 
| 35 |  |  |  |  |  |  | ); | 
| 36 |  |  |  |  |  |  |  | 
| 37 |  |  |  |  |  |  |  | 
| 38 |  |  |  |  |  |  | sub new { | 
| 39 | 4 |  |  | 4 | 1 | 4765 | my $class  = shift; | 
| 40 | 4 |  |  |  |  | 8 | my %params = @_; | 
| 41 |  |  |  |  |  |  |  | 
| 42 | 4 |  |  |  |  | 7 | my $self = bless {}, $class; | 
| 43 |  |  |  |  |  |  |  | 
| 44 | 4 | 100 |  |  |  | 12 | unless ( defined $params{nodes} ) { | 
| 45 | 1 |  |  |  |  | 131 | croak 'Nodes not specified'; | 
| 46 |  |  |  |  |  |  | } | 
| 47 | 3 | 100 |  |  |  | 9 | unless ( ref( $params{nodes} ) eq 'ARRAY' ) { | 
| 48 | 1 |  |  |  |  | 108 | croak 'Nodes must be specified as array reference'; | 
| 49 |  |  |  |  |  |  | } | 
| 50 | 2 | 100 |  |  |  | 4 | unless ( @{ $params{nodes} } ) { | 
|  | 2 |  |  |  |  | 6 |  | 
| 51 | 1 |  |  |  |  | 171 | croak 'Specified empty list of nodes'; | 
| 52 |  |  |  |  |  |  | } | 
| 53 |  |  |  |  |  |  |  | 
| 54 | 1 |  |  |  |  | 4 | $self->{nodes}              = $params{nodes}; | 
| 55 | 1 |  |  |  |  | 2 | $self->{on_node_connect}    = $params{on_node_connect}; | 
| 56 | 1 |  |  |  |  | 1 | $self->{on_node_disconnect} = $params{on_node_disconnect}; | 
| 57 | 1 |  |  |  |  | 2 | $self->{on_node_error}      = $params{on_node_error}; | 
| 58 | 1 |  |  |  |  | 4 | $self->on_error( $params{on_error} ); | 
| 59 |  |  |  |  |  |  |  | 
| 60 | 1 |  |  |  |  | 2 | my %node_params; | 
| 61 | 1 |  |  |  |  | 2 | foreach my $name ( qw( login passcode vhost heartbeat connection_timeout | 
| 62 |  |  |  |  |  |  | reconnect_interval handle_params default_headers command_headers ) ) | 
| 63 |  |  |  |  |  |  | { | 
| 64 | 9 | 50 |  |  |  | 13 | next unless defined $params{$name}; | 
| 65 | 0 |  |  |  |  | 0 | $node_params{$name} = $params{$name}; | 
| 66 |  |  |  |  |  |  | } | 
| 67 | 1 |  |  |  |  | 2 | $self->{_node_params} = \%node_params; | 
| 68 |  |  |  |  |  |  |  | 
| 69 | 1 |  |  |  |  | 4 | $self->_reset_internals; | 
| 70 | 1 |  |  |  |  | 3 | $self->_init; | 
| 71 |  |  |  |  |  |  |  | 
| 72 | 1 |  |  |  |  | 3 | return $self; | 
| 73 |  |  |  |  |  |  | } | 
| 74 |  |  |  |  |  |  |  | 
| 75 |  |  |  |  |  |  | sub execute { | 
| 76 | 0 |  |  | 0 | 1 | 0 | my $self     = shift; | 
| 77 | 0 |  |  |  |  | 0 | my $cmd_name = shift; | 
| 78 |  |  |  |  |  |  |  | 
| 79 | 0 |  |  |  |  | 0 | my $cmd = $self->_prepare( $cmd_name, [@_] ); | 
| 80 | 0 |  |  |  |  | 0 | $self->_execute($cmd); | 
| 81 |  |  |  |  |  |  |  | 
| 82 | 0 |  |  |  |  | 0 | return; | 
| 83 |  |  |  |  |  |  | } | 
| 84 |  |  |  |  |  |  |  | 
| 85 |  |  |  |  |  |  | # Generate methods | 
| 86 |  |  |  |  |  |  | { | 
| 87 | 2 |  |  | 2 |  | 9 | no strict qw( refs ); | 
|  | 2 |  |  |  |  | 2 |  | 
|  | 2 |  |  |  |  | 1941 |  | 
| 88 |  |  |  |  |  |  |  | 
| 89 |  |  |  |  |  |  | foreach my $name ( qw( send subscribe unsubscribe ack nack begin commit | 
| 90 |  |  |  |  |  |  | abort disconnect ) ) | 
| 91 |  |  |  |  |  |  | { | 
| 92 |  |  |  |  |  |  | *{$name} = sub { | 
| 93 | 0 |  |  | 0 |  | 0 | my $self = shift; | 
| 94 |  |  |  |  |  |  |  | 
| 95 | 0 |  |  |  |  | 0 | my $cmd = $self->_prepare( $name, [@_] ); | 
| 96 | 0 |  |  |  |  | 0 | $self->_execute($cmd); | 
| 97 |  |  |  |  |  |  |  | 
| 98 | 0 |  |  |  |  | 0 | return; | 
| 99 |  |  |  |  |  |  | } | 
| 100 |  |  |  |  |  |  | } | 
| 101 |  |  |  |  |  |  | } | 
| 102 |  |  |  |  |  |  |  | 
| 103 |  |  |  |  |  |  | sub nodes { | 
| 104 | 1 |  |  | 1 | 1 | 3655 | my $self = shift; | 
| 105 | 1 |  |  |  |  | 2 | return values %{ $self->{_nodes_pool} }; | 
|  | 1 |  |  |  |  | 4 |  | 
| 106 |  |  |  |  |  |  | } | 
| 107 |  |  |  |  |  |  |  | 
| 108 |  |  |  |  |  |  | sub on_error { | 
| 109 | 1 |  |  | 1 | 1 | 1 | my $self = shift; | 
| 110 |  |  |  |  |  |  |  | 
| 111 | 1 | 50 |  |  |  | 3 | if (@_) { | 
| 112 | 1 |  |  |  |  | 1 | my $on_error = shift; | 
| 113 |  |  |  |  |  |  |  | 
| 114 | 1 | 50 |  |  |  | 3 | if ( defined $on_error ) { | 
| 115 | 0 |  |  |  |  | 0 | $self->{on_error} = $on_error; | 
| 116 |  |  |  |  |  |  | } | 
| 117 |  |  |  |  |  |  | else { | 
| 118 |  |  |  |  |  |  | $self->{on_error} = sub { | 
| 119 | 0 |  |  | 0 |  | 0 | my $err = shift; | 
| 120 | 0 |  |  |  |  | 0 | warn $err->message . "\n"; | 
| 121 | 1 |  |  |  |  | 5 | }; | 
| 122 |  |  |  |  |  |  | } | 
| 123 |  |  |  |  |  |  | } | 
| 124 |  |  |  |  |  |  |  | 
| 125 | 1 |  |  |  |  | 1 | return $self->{on_error}; | 
| 126 |  |  |  |  |  |  | } | 
| 127 |  |  |  |  |  |  |  | 
| 128 |  |  |  |  |  |  | sub force_disconnect { | 
| 129 | 0 |  |  | 0 | 1 | 0 | my $self = shift; | 
| 130 |  |  |  |  |  |  |  | 
| 131 | 0 |  |  |  |  | 0 | foreach my $node ( $self->nodes ) { | 
| 132 | 0 |  |  |  |  | 0 | $node->force_disconnect; | 
| 133 |  |  |  |  |  |  | } | 
| 134 | 0 |  |  |  |  | 0 | $self->_reset_internals; | 
| 135 |  |  |  |  |  |  |  | 
| 136 | 0 |  |  |  |  | 0 | return; | 
| 137 |  |  |  |  |  |  | } | 
| 138 |  |  |  |  |  |  |  | 
| 139 |  |  |  |  |  |  | sub _init { | 
| 140 | 1 |  |  | 1 |  | 1 | my $self = shift; | 
| 141 |  |  |  |  |  |  |  | 
| 142 | 1 |  |  |  |  | 1 | my $nodes_pool = $self->{_nodes_pool}; | 
| 143 |  |  |  |  |  |  |  | 
| 144 | 1 |  |  |  |  | 2 | foreach my $node_params ( @{ $self->{nodes} } ) { | 
|  | 1 |  |  |  |  | 2 |  | 
| 145 | 3 |  |  |  |  | 7 | my $hostport = "$node_params->{host}:$node_params->{port}"; | 
| 146 |  |  |  |  |  |  |  | 
| 147 | 3 | 50 |  |  |  | 6 | unless ( defined $nodes_pool->{$hostport} ) { | 
| 148 |  |  |  |  |  |  | $nodes_pool->{$hostport} | 
| 149 | 3 |  |  |  |  | 5 | = $self->_new_node( $node_params->{host}, $node_params->{port} ); | 
| 150 |  |  |  |  |  |  | } | 
| 151 |  |  |  |  |  |  | } | 
| 152 |  |  |  |  |  |  |  | 
| 153 | 1 |  |  |  |  | 2 | $self->{_nodes}       = [ keys %{ $self->{_nodes_pool} } ]; | 
|  | 1 |  |  |  |  | 3 |  | 
| 154 | 1 |  |  |  |  | 4 | $self->{_active_node} = $self->_next_node; | 
| 155 |  |  |  |  |  |  |  | 
| 156 | 1 |  |  |  |  | 1 | return; | 
| 157 |  |  |  |  |  |  | } | 
| 158 |  |  |  |  |  |  |  | 
| 159 |  |  |  |  |  |  | sub _new_node { | 
| 160 | 3 |  |  | 3 |  | 3 | my $self = shift; | 
| 161 | 3 |  |  |  |  | 3 | my $host = shift; | 
| 162 | 3 |  |  |  |  | 3 | my $port = shift; | 
| 163 |  |  |  |  |  |  |  | 
| 164 |  |  |  |  |  |  | return AnyEvent::Stomper->new( | 
| 165 | 3 |  |  |  |  | 2 | %{ $self->{_node_params} }, | 
|  | 3 |  |  |  |  | 8 |  | 
| 166 |  |  |  |  |  |  | host          => $host, | 
| 167 |  |  |  |  |  |  | port          => $port, | 
| 168 |  |  |  |  |  |  | lazy          => 1, | 
| 169 |  |  |  |  |  |  | on_connect    => $self->_create_on_node_connect( $host, $port ), | 
| 170 |  |  |  |  |  |  | on_disconnect => $self->_create_on_node_disconnect( $host, $port ), | 
| 171 |  |  |  |  |  |  | on_error      => $self->_create_on_node_error( $host, $port ), | 
| 172 |  |  |  |  |  |  | ); | 
| 173 |  |  |  |  |  |  | } | 
| 174 |  |  |  |  |  |  |  | 
| 175 |  |  |  |  |  |  | sub _create_on_node_connect { | 
| 176 | 3 |  |  | 3 |  | 2 | my $self = shift; | 
| 177 | 3 |  |  |  |  | 3 | my $host = shift; | 
| 178 | 3 |  |  |  |  | 4 | my $port = shift; | 
| 179 |  |  |  |  |  |  |  | 
| 180 | 3 |  |  |  |  | 5 | weaken($self); | 
| 181 |  |  |  |  |  |  |  | 
| 182 |  |  |  |  |  |  | return sub { | 
| 183 | 0 | 0 |  | 0 |  | 0 | if ( defined $self->{on_node_connect} ) { | 
| 184 | 0 |  |  |  |  | 0 | $self->{on_node_connect}->( $host, $port ); | 
| 185 |  |  |  |  |  |  | } | 
| 186 | 3 |  |  |  |  | 11 | }; | 
| 187 |  |  |  |  |  |  | } | 
| 188 |  |  |  |  |  |  |  | 
| 189 |  |  |  |  |  |  | sub _create_on_node_disconnect { | 
| 190 | 3 |  |  | 3 |  | 2 | my $self = shift; | 
| 191 | 3 |  |  |  |  | 3 | my $host = shift; | 
| 192 | 3 |  |  |  |  | 2 | my $port = shift; | 
| 193 |  |  |  |  |  |  |  | 
| 194 | 3 |  |  |  |  | 4 | weaken($self); | 
| 195 |  |  |  |  |  |  |  | 
| 196 |  |  |  |  |  |  | return sub { | 
| 197 | 0 | 0 |  | 0 |  | 0 | if ( defined $self->{on_node_disconnect} ) { | 
| 198 | 0 |  |  |  |  | 0 | $self->{on_node_disconnect}->( $host, $port ); | 
| 199 |  |  |  |  |  |  | } | 
| 200 | 3 |  |  |  |  | 8 | }; | 
| 201 |  |  |  |  |  |  | } | 
| 202 |  |  |  |  |  |  |  | 
| 203 |  |  |  |  |  |  | sub _create_on_node_error { | 
| 204 | 3 |  |  | 3 |  | 3 | my $self = shift; | 
| 205 | 3 |  |  |  |  | 3 | my $host = shift; | 
| 206 | 3 |  |  |  |  | 3 | my $port = shift; | 
| 207 |  |  |  |  |  |  |  | 
| 208 | 3 |  |  |  |  | 5 | weaken($self); | 
| 209 |  |  |  |  |  |  |  | 
| 210 |  |  |  |  |  |  | return sub { | 
| 211 | 0 |  |  | 0 |  | 0 | my $err = shift; | 
| 212 |  |  |  |  |  |  |  | 
| 213 | 0 |  |  |  |  | 0 | my $err_code = $err->code; | 
| 214 |  |  |  |  |  |  |  | 
| 215 | 0 | 0 | 0 |  |  | 0 | if ( $err_code != E_OPRN_ERROR | 
| 216 |  |  |  |  |  |  | && $err_code != E_CONN_CLOSED_BY_CLIENT ) | 
| 217 |  |  |  |  |  |  | { | 
| 218 | 0 |  |  |  |  | 0 | $self->{_active_node} = $self->_next_node; | 
| 219 |  |  |  |  |  |  | } | 
| 220 |  |  |  |  |  |  |  | 
| 221 | 0 | 0 |  |  |  | 0 | if ( defined $self->{on_node_error} ) { | 
| 222 | 0 |  |  |  |  | 0 | $self->{on_node_error}->( $err, $host, $port ); | 
| 223 |  |  |  |  |  |  | } | 
| 224 | 3 |  |  |  |  | 10 | }; | 
| 225 |  |  |  |  |  |  | } | 
| 226 |  |  |  |  |  |  |  | 
| 227 |  |  |  |  |  |  | sub _prepare { | 
| 228 | 0 |  |  | 0 |  | 0 | my $self     = shift; | 
| 229 | 0 |  |  |  |  | 0 | my $cmd_name = uc(shift); | 
| 230 | 0 |  |  |  |  | 0 | my $args     = shift; | 
| 231 |  |  |  |  |  |  |  | 
| 232 | 0 |  |  |  |  | 0 | my %params; | 
| 233 |  |  |  |  |  |  |  | 
| 234 | 0 | 0 | 0 |  |  | 0 | if ( ref( $args->[-1] ) eq 'CODE' | 
| 235 | 0 |  |  |  |  | 0 | && scalar @{$args} % 2 > 0 ) | 
| 236 |  |  |  |  |  |  | { | 
| 237 | 0 | 0 |  |  |  | 0 | if ( $cmd_name eq 'SUBSCRIBE' ) { | 
| 238 | 0 |  |  |  |  | 0 | $params{on_message} = pop @{$args}; | 
|  | 0 |  |  |  |  | 0 |  | 
| 239 |  |  |  |  |  |  | } | 
| 240 |  |  |  |  |  |  | else { | 
| 241 | 0 |  |  |  |  | 0 | $params{on_receipt} = pop @{$args}; | 
|  | 0 |  |  |  |  | 0 |  | 
| 242 |  |  |  |  |  |  | } | 
| 243 |  |  |  |  |  |  | } | 
| 244 |  |  |  |  |  |  |  | 
| 245 | 0 |  |  |  |  | 0 | my %headers = @{$args}; | 
|  | 0 |  |  |  |  | 0 |  | 
| 246 |  |  |  |  |  |  |  | 
| 247 | 0 |  |  |  |  | 0 | foreach my $name ( qw( body on_receipt on_message on_node_error ) ) { | 
| 248 | 0 | 0 |  |  |  | 0 | if ( defined $headers{$name} ) { | 
| 249 | 0 |  |  |  |  | 0 | $params{$name} = delete $headers{$name}; | 
| 250 |  |  |  |  |  |  | } | 
| 251 |  |  |  |  |  |  | } | 
| 252 | 0 | 0 |  |  |  | 0 | if ( exists $ACK_CMDS{$cmd_name} ) { | 
| 253 | 0 |  |  |  |  | 0 | $params{message} = delete $headers{message}; | 
| 254 |  |  |  |  |  |  | } | 
| 255 |  |  |  |  |  |  |  | 
| 256 | 0 |  |  |  |  | 0 | my $cmd = { | 
| 257 |  |  |  |  |  |  | name    => $cmd_name, | 
| 258 |  |  |  |  |  |  | headers => \%headers, | 
| 259 |  |  |  |  |  |  | %params, | 
| 260 |  |  |  |  |  |  | }; | 
| 261 |  |  |  |  |  |  |  | 
| 262 | 0 | 0 |  |  |  | 0 | unless ( defined $cmd->{on_receipt} ) { | 
| 263 | 0 |  |  |  |  | 0 | weaken($self); | 
| 264 |  |  |  |  |  |  |  | 
| 265 |  |  |  |  |  |  | $cmd->{on_receipt} = sub { | 
| 266 | 0 |  |  | 0 |  | 0 | my $receipt = shift; | 
| 267 | 0 |  |  |  |  | 0 | my $err     = shift; | 
| 268 |  |  |  |  |  |  |  | 
| 269 | 0 | 0 |  |  |  | 0 | if ( defined $err ) { | 
| 270 | 0 |  |  |  |  | 0 | $self->{on_error}->($err); | 
| 271 | 0 |  |  |  |  | 0 | return; | 
| 272 |  |  |  |  |  |  | } | 
| 273 | 0 |  |  |  |  | 0 | }; | 
| 274 |  |  |  |  |  |  | } | 
| 275 |  |  |  |  |  |  |  | 
| 276 | 0 |  |  |  |  | 0 | return $cmd; | 
| 277 |  |  |  |  |  |  | } | 
| 278 |  |  |  |  |  |  |  | 
| 279 |  |  |  |  |  |  | sub _execute { | 
| 280 | 0 |  |  | 0 |  | 0 | my $self      = shift; | 
| 281 | 0 |  |  |  |  | 0 | my $cmd       = shift; | 
| 282 | 0 |  | 0 |  |  | 0 | my $fails_cnt = shift || 0; | 
| 283 |  |  |  |  |  |  |  | 
| 284 | 0 |  |  |  |  | 0 | my $hostport = $self->{_active_node}; | 
| 285 | 0 |  |  |  |  | 0 | my $node     = $self->{_nodes_pool}{$hostport}; | 
| 286 |  |  |  |  |  |  |  | 
| 287 | 0 |  |  |  |  | 0 | weaken($self); | 
| 288 |  |  |  |  |  |  |  | 
| 289 | 0 |  |  |  |  | 0 | $node->execute( $cmd->{name}, %{ $cmd->{headers} }, | 
| 290 |  |  |  |  |  |  | body => $cmd->{body}, | 
| 291 |  |  |  |  |  |  |  | 
| 292 |  |  |  |  |  |  | on_receipt => sub { | 
| 293 | 0 |  |  | 0 |  | 0 | my $receipt = shift; | 
| 294 | 0 |  |  |  |  | 0 | my $err     = shift; | 
| 295 |  |  |  |  |  |  |  | 
| 296 | 0 | 0 |  |  |  | 0 | if ( defined $err ) { | 
| 297 | 0 |  |  |  |  | 0 | my $err_code = $err->code; | 
| 298 |  |  |  |  |  |  |  | 
| 299 | 0 |  | 0 |  |  | 0 | my $on_node_error = $cmd->{on_node_error} || $self->{on_node_error}; | 
| 300 | 0 | 0 |  |  |  | 0 | if ( defined $on_node_error ) { | 
| 301 | 0 |  |  |  |  | 0 | my $node = $self->{_nodes_pool}{$hostport}; | 
| 302 | 0 |  |  |  |  | 0 | $on_node_error->( $err, $node->host, $node->port ); | 
| 303 |  |  |  |  |  |  | } | 
| 304 |  |  |  |  |  |  |  | 
| 305 | 0 | 0 | 0 |  |  | 0 | if ( $CAN_REPEAT{ $cmd->{name} } | 
|  |  |  | 0 |  |  |  |  | 
|  |  |  | 0 |  |  |  |  | 
| 306 |  |  |  |  |  |  | && $err_code != E_OPRN_ERROR | 
| 307 |  |  |  |  |  |  | && $err_code != E_CONN_CLOSED_BY_CLIENT | 
| 308 | 0 |  |  |  |  | 0 | && ++$fails_cnt < scalar @{ $self->{_nodes} } ) | 
| 309 |  |  |  |  |  |  | { | 
| 310 | 0 |  |  |  |  | 0 | $self->_execute( $cmd, $fails_cnt ); | 
| 311 | 0 |  |  |  |  | 0 | return; | 
| 312 |  |  |  |  |  |  | } | 
| 313 |  |  |  |  |  |  |  | 
| 314 | 0 |  |  |  |  | 0 | $cmd->{on_receipt}->( $receipt, $err ); | 
| 315 |  |  |  |  |  |  |  | 
| 316 | 0 |  |  |  |  | 0 | return; | 
| 317 |  |  |  |  |  |  | } | 
| 318 |  |  |  |  |  |  |  | 
| 319 | 0 |  |  |  |  | 0 | $cmd->{on_receipt}->($receipt); | 
| 320 |  |  |  |  |  |  | }, | 
| 321 |  |  |  |  |  |  |  | 
| 322 |  |  |  |  |  |  | defined $cmd->{message} | 
| 323 |  |  |  |  |  |  | ? ( message => $cmd->{message} ) | 
| 324 |  |  |  |  |  |  | : (), | 
| 325 |  |  |  |  |  |  |  | 
| 326 |  |  |  |  |  |  | defined $cmd->{on_message} | 
| 327 |  |  |  |  |  |  | ? ( on_message => $cmd->{on_message} ) | 
| 328 | 0 | 0 |  |  |  | 0 | : (), | 
|  |  | 0 |  |  |  |  |  | 
| 329 |  |  |  |  |  |  | ); | 
| 330 |  |  |  |  |  |  |  | 
| 331 | 0 |  |  |  |  | 0 | return; | 
| 332 |  |  |  |  |  |  | } | 
| 333 |  |  |  |  |  |  |  | 
| 334 |  |  |  |  |  |  | sub _next_node { | 
| 335 | 1 |  |  | 1 |  | 1 | my $self = shift; | 
| 336 |  |  |  |  |  |  |  | 
| 337 | 1 | 50 | 0 |  |  | 2 | unless ( defined $self->{_node_index} ) { | 
| 338 | 1 |  |  |  |  | 1 | $self->{_node_index} = int( rand( scalar @{ $self->{_nodes} } ) ); | 
|  | 1 |  |  |  |  | 32 |  | 
| 339 |  |  |  |  |  |  | } | 
| 340 |  |  |  |  |  |  | elsif ( $self->{_node_index} == scalar @{ $self->{_nodes} } ) { | 
| 341 |  |  |  |  |  |  | $self->{_node_index} = 0; | 
| 342 |  |  |  |  |  |  | } | 
| 343 |  |  |  |  |  |  |  | 
| 344 | 1 |  |  |  |  | 4 | return $self->{_nodes}[ $self->{_node_index}++ ]; | 
| 345 |  |  |  |  |  |  | } | 
| 346 |  |  |  |  |  |  |  | 
| 347 |  |  |  |  |  |  | sub _reset_internals { | 
| 348 | 1 |  |  | 1 |  | 2 | my $self = shift; | 
| 349 |  |  |  |  |  |  |  | 
| 350 | 1 |  |  |  |  | 2 | $self->{_nodes_pool}  = {}; | 
| 351 | 1 |  |  |  |  | 3 | $self->{_nodes}       = undef; | 
| 352 | 1 |  |  |  |  | 1 | $self->{_node_index}  = undef; | 
| 353 | 1 |  |  |  |  | 3 | $self->{_active_node} = undef; | 
| 354 |  |  |  |  |  |  |  | 
| 355 | 1 |  |  |  |  | 2 | return; | 
| 356 |  |  |  |  |  |  | } | 
| 357 |  |  |  |  |  |  |  | 
| 358 |  |  |  |  |  |  | 1; | 
| 359 |  |  |  |  |  |  | __END__ |