File Coverage

blib/lib/Redis/DistLock.pm
Criterion Covered Total %
statement 63 84 75.0
branch 20 38 52.6
condition 11 31 35.4
subroutine 13 15 86.6
pod 3 5 60.0
total 110 173 63.5


line stmt bran cond sub pod time code
1             package Redis::DistLock;
2              
3 4     4   4226 use strict;
  4         8  
  4         161  
4 4     4   24 use warnings;
  4         7  
  4         209  
5              
6             our $VERSION = '0.07';
7              
8 4     4   1434 use Digest::SHA qw( sha1_hex );
  4         6639  
  4         345  
9 4     4   4731 use MIME::Base64 qw( encode_base64 );
  4         4894  
  4         285  
10 4     4   4167 use Redis;
  4         289612  
  4         166  
11 4     4   4381 use Time::HiRes qw( time );
  4         12822  
  4         23  
12              
13             sub VERSION_CHECK() { 1 }
14             sub RETRY_COUNT() { 3 }
15             sub RETRY_DELAY() { 0.2 }
16             sub DRIFT_FACTOR() { 0.01 }
17             sub RELEASE_SCRIPT() { '
18             if redis.call( "get", KEYS[1] ) == ARGV[1] then
19             return redis.call( "del", KEYS[1] )
20             else
21             return 0
22             end
23             ' }
24 17     17 0 104 sub RELEASE_SHA1() { sha1_hex( RELEASE_SCRIPT ) }
25             sub EXTEND_SCRIPT() { '
26             if redis.call( "set", KEYS[1], ARGV[1], "XX", "PX", ARGV[2] ) then
27             return "OK"
28             else
29             return redis.call( "set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2] )
30             end
31             ' }
32 17     17 0 141 sub EXTEND_SHA1() { sha1_hex( EXTEND_SCRIPT ) }
33              
34             sub DESTROY {
35 11     11   2335 my $self = shift;
36              
37             # only has locks when auto release is enabled
38 11 50       19 return if @{ $self->{locks} || [] } == 0;
  11 50       457  
39              
40 0         0 $self->release( $_ )
41 0         0 for @{ $self->{locks} };
42             }
43              
44             sub new {
45 12     12 1 5439 my $class = shift;
46 0         0 my %args = @_ == 1 && ref( $_[0] )
47 12 50 33     81 ? %{ $_[0] }
48             : @_
49             ;
50              
51 12 100       38 my $version_check = exists( $args{version_check} )
52             ? $args{version_check}
53             : VERSION_CHECK
54             ;
55              
56             my $logger = exists( $args{logger} )
57             ? $args{logger}
58 0     0   0 : sub { warn @_ }
59 12 100       52 ;
60              
61 12         20 my $quorum = int( @{ $args{servers} } / 2 + 1 );
  12         40  
62 12         21 my @servers;
63              
64 12         16 for my $server ( @{ $args{servers} } ) {
  12         26  
65             # connect might fail
66             my $redis = ref( $server )
67             ? $server
68 18 50       44 : eval { Redis->new( server => $server, encoding => undef ) }
  0         0  
69             ;
70 18 50       57 unless ( $redis ) {
71 0         0 $logger->( $@ );
72 0         0 next;
73             }
74 18         31 push( @servers, $redis );
75              
76 18 100       67 if ( $version_check ) {
77 15         71 my $info = $redis->info();
78              
79 15 50 33     469 die( "FATAL: cannot find the right redis version (needs at least 2.6.12 -- $1, $2, $3)" )
      33        
      33        
      33        
80             unless $info &&
81             $info->{redis_version} &&
82             $info->{redis_version} =~ m!\A ([0-9]+) \. ([0-9]+) \. ([0-9]+) \z!x &&
83             (
84             ( $1 > 2 ) ||
85             ( $1 == 2 && $2 > 6 ) ||
86             ( $1 == 2 && $2 == 6 && $3 >= 12 )
87             )
88             ;
89             }
90              
91             # load script on all servers
92 17         67 my $sha1 = $redis->script_load( RELEASE_SCRIPT );
93              
94             # ensure the script is everywhere the same
95 17 50       208 if ( $sha1 ne RELEASE_SHA1 ) {
96 0         0 die( "FATAL: script load results in different checksum!" );
97             }
98              
99 17         55 $sha1 = $redis->script_load( EXTEND_SCRIPT );
100              
101 17 50       130 if ( $sha1 ne EXTEND_SHA1 ) {
102 0         0 die( "FATAL: failed to load extend script: sha1 mismatch!" );
103             }
104             }
105              
106 11 50       31 if ( @servers < $quorum ) {
107 0         0 die( "FATAL: could not establish enough connections (" . int( @servers ) . " < $quorum)" );
108             }
109              
110             my $self = bless( {
111             servers => \@servers,
112             quorum => $quorum,
113             retry_count => $args{retry_count} || RETRY_COUNT,
114             retry_delay => $args{retry_delay} || RETRY_DELAY,
115             locks => [],
116 3     3   15 logger => $logger || sub {},
117 11   50     211 auto_release => $args{auto_release} || 0,
      50        
      100        
      50        
118             }, $class );
119              
120 11         49 return $self;
121             }
122              
123             sub _get_random_id {
124 2     2   109 encode_base64( join( "", map chr( int( rand() * 256 ) ), 1 .. 24 ), "" );
125             }
126              
127             sub lock {
128 2     2 1 8 my ( $self, $resource, $ttl, $value, $extend ) = @_;
129 2         8 my $retry_count = $self->{retry_count};
130              
131 2 50       9 $value = _get_random_id()
132             unless defined( $value );
133              
134 2         14 while ( $retry_count-- > 0 ) {
135 6         41 my $start = time();
136 6         11 my $ok = 0;
137              
138 6         14 for my $redis ( @{ $self->{servers} } ) {
  6         29  
139             # count successful locks, response only needs to be true
140 6 50       17 $ok += eval {
141 6 50       65 ! $extend
142             ? $redis->set( $resource, $value, "NX", "PX", $ttl * 1000 )
143             : $redis->evalsha( EXTEND_SHA1, 1, $resource, $value, $ttl * 1000 )
144             } ? 1 : 0;
145              
146 6 50       143 $self->{logger}->( $@ )
147             if $@;
148             }
149              
150 6         26 my $drift = $ttl * DRIFT_FACTOR + 0.002;
151 6         35 my $validity = $ttl - ( time() - $start ) - $drift;
152              
153 6 50 33     48 if ( $ok >= $self->{quorum} && $validity > 0 ) {
154 0         0 my $lock = {
155             validity => $validity,
156             resource => $resource,
157             value => $value,
158             };
159              
160             # track lock on demand only
161 0 0       0 push( @{ $self->{locks} }, $lock )
  0         0  
162             if $self->{auto_release};
163              
164 0         0 return $lock;
165             }
166              
167 6         472940 select( undef, undef, undef, rand( $self->{retry_delay} ) );
168             }
169              
170 2         69 return undef;
171             }
172              
173             sub release {
174 0     0 1   my $self = shift;
175 0           my ( $resource, $value ) = @_ == 1 && ref( $_[0] )
176 0 0 0       ? @{ $_[0] }{ qw{ resource value } }
177             : @_
178             ;
179              
180             defined or $_ = ""
181 0   0       for $resource, $value;
182              
183 0           for my $redis ( @{ $self->{servers} } ) {
  0            
184 0           $redis->evalsha( RELEASE_SHA1, 1, $resource, $value );
185             }
186             }
187              
188             1;
189              
190             __END__