File Coverage

blib/lib/MR/IProto/Cluster.pm
Criterion Covered Total %
statement 21 84 25.0
branch 0 26 0.0
condition 0 3 0.0
subroutine 7 16 43.7
pod 2 2 100.0
total 30 131 22.9


line stmt bran cond sub pod time code
1             package MR::IProto::Cluster;
2              
3             =head1 NAME
4              
5             MR::IProto::Cluster - cluster of servers
6              
7             =head1 DESCRIPTION
8              
9             This class is used to implement balancing between several servers.
10              
11             =cut
12              
13 1     1   8 use Mouse;
  1         2  
  1         12  
14 1     1   442 use Mouse::Util::TypeConstraints;
  1         3  
  1         8  
15 1     1   865 use MR::IProto::Cluster::Server;
  1         3  
  1         38  
16 1     1   1002 use String::CRC32 qw(crc32);
  1         13694  
  1         136  
17              
18             =head1 EXPORTED CONSTANTS
19              
20             =over
21              
22             =item RR
23              
24             Round robin algorithm
25              
26             =item HASH
27              
28             Hashing algorithm using CRC32
29              
30             =item KETAMA
31              
32             Ketama algorithm
33              
34             =back
35              
36             =cut
37              
38 1     1   16 use Exporter 'import';
  1         2  
  1         362  
39             our @EXPORT_OK = qw( RR HASH KETAMA );
40              
41             coerce 'MR::IProto::Cluster'
42             => from 'Str'
43             => via { __PACKAGE__->new( servers => $_ ) };
44              
45             subtype 'MR::IProto::Cluster::Servers'
46             => as 'ArrayRef[MR::IProto::Cluster::Server]'
47             => where { scalar @_ };
48             coerce 'MR::IProto::Cluster::Servers'
49             => from 'Str'
50             => via {
51             my $type = find_type_constraint('MR::IProto::Cluster::Server');
52             [ map $type->coerce($_), split /,/, $_ ];
53             };
54              
55             use constant {
56 1         2404 RR => 1,
57             HASH => 2,
58             KETAMA => 3,
59 1     1   8 };
  1         3  
60              
61             enum 'MR::IProto::Balance' => (
62             RR,
63             HASH,
64             KETAMA,
65             );
66             coerce 'MR::IProto::Balance'
67             => from 'Str',
68             => via {
69             $_ eq 'hash-crc32' ? HASH
70             : $_ eq 'ketama' ? KETAMA
71             : RR;
72             };
73              
74             =head1 ATTRIBUTES
75              
76             =over
77              
78             =item balance
79              
80             Balancing algorithms.
81             Possible values are constants: RR, HASH, KETAMA.
82             Or their string analogs: 'round-robin', 'hash-crc32', 'ketama'.
83              
84             =cut
85              
86             has balance => (
87             is => 'ro',
88             isa => 'MR::IProto::Balance',
89             default => RR,
90             coerce => 1,
91             );
92              
93             =item servers
94              
95             ArrayRef of L.
96              
97             =cut
98              
99             has servers => (
100             is => 'ro',
101             isa => 'MR::IProto::Cluster::Servers',
102             required => 1,
103             coerce => 1,
104             );
105              
106             =back
107              
108             =cut
109              
110             has _one => (
111             is => 'ro',
112             isa => 'Maybe[MR::IProto::Cluster::Server]',
113             lazy_build => 1,
114             );
115              
116             has _ketama => (
117             is => 'ro',
118             isa => 'ArrayRef[ArrayRef]',
119             lazy_build => 1,
120             );
121              
122             has _rr_servers => (
123             is => 'rw',
124             isa => 'MR::IProto::Cluster::Servers',
125             lazy_build => 1,
126             );
127              
128             has _hash_servers => (
129             is => 'rw',
130             isa => 'MR::IProto::Cluster::Servers',
131             lazy_build => 1,
132             );
133              
134             =head1 PUBLIC METHODS
135              
136             =over
137              
138             =item server( $key? )
139              
140             Get server from balancing using C<$key>.
141              
142             =cut
143              
144             sub server {
145 0     0 1   my ($self, $key) = @_;
146 0           my $one = $self->_one;
147 0 0         return $one if defined $one;
148 0 0         my $method = $self->balance == RR ? '_balance_rr'
    0          
149             : $self->balance == KETAMA ? '_balance_ketama'
150             : '_balance_hash';
151 0           return $self->$method($key);
152             }
153              
154             =item timeout( $new? )
155              
156             Used to set C<$new> timeout value to all servers.
157             If argument is skipped and timeout is equal for all servers then returns
158             it value, if timeout is different then returns undef.
159              
160             =cut
161              
162             sub timeout {
163 0     0 1   my $self = shift;
164 0 0         if(@_) {
165 0           my $timeout = shift;
166 0           $_->timeout($timeout) foreach @{$self->servers};
  0            
167 0           return $timeout;
168             }
169             else {
170 0           my $timeout;
171 0           foreach my $t ( map $_->timeout, @{$self->servers} ) {
  0            
172 0 0 0       return if defined $timeout && $timeout != $t;
173 0 0         $timeout = $t unless defined $timeout;
174             }
175 0           return $timeout;
176             }
177             }
178              
179             =back
180              
181             =cut
182              
183             sub _build__one {
184 0     0     my ($self) = @_;
185 0 0         return @{$self->servers} == 1 ? $self->servers->[0] : undef;
  0            
186             }
187              
188             sub _build__ketama {
189 0     0     my $self = shift;
190 0           my @ketama;
191 0           foreach my $server (@{$self->servers}) {
  0            
192 0           for my $i (0..10) {
193 0           push @ketama, [crc32($server->host.$server->port.$i), $server];
194             }
195             }
196 0           return [ sort { $a->[0] cmp $b->[0] } @ketama ];
  0            
197             }
198              
199             sub _build__rr_servers {
200 0     0     my ($self) = @_;
201 0           return [ @{ $self->servers } ];
  0            
202             }
203              
204             sub _build__hash_servers {
205 0     0     my ($self) = @_;
206 0           return [ map { my @s; my $s = $_; push @s, $s for ( 1 .. $s->weight ); @s } @{ $self->servers } ];
  0            
  0            
  0            
  0            
  0            
207             }
208              
209             sub _balance_rr {
210 0     0     my ($self) = @_;
211 0 0         $self->_clear_rr_servers() if @{$self->_rr_servers} == 0;
  0            
212 0           return splice(@{$self->_rr_servers}, int(rand(@{$self->_rr_servers})), 1);
  0            
  0            
213             }
214              
215             sub _balance_hash {
216 0     0     my ($self, $key) = @_;
217 0           my ($hash_, $hash, $server);
218              
219 0 0         die "Cannot balance hash without key" unless defined $key;
220 0           $hash = $hash_ = crc32($key) >> 16;
221 0           for (0..19) {
222 0           $server = $self->_hash_servers->[$hash % @{$self->_hash_servers}];
  0            
223 0 0         return $server if $server->active;
224 0           $hash += crc32($_, $hash_) >> 16;
225             }
226              
227 0           return $self->_hash_servers->[rand @{$self->_hash_servers}]; #last resort
  0            
228             }
229              
230             sub _balance_ketama {
231 0     0     my ($self, $key) = @_;
232              
233 0 0         die "Cannot balance ketama without key" unless defined $key;
234 0           my $idx = crc32($key);
235              
236 0           foreach (@{$self->_ketama}) {
  0            
237 0 0         next unless ($_);
238 0 0         return $_->[1] if ($_->[0] >= $idx);
239             }
240              
241 0           return $self->_ketama->[0]->[1];
242             }
243              
244             =head1 SEE ALSO
245              
246             L, L.
247              
248             =cut
249              
250 1     1   8 no Mouse;
  1         2  
  1         14  
251             __PACKAGE__->meta->make_immutable();
252              
253             1;