File Coverage

blib/lib/IPC/ConcurrencyLimit/Lock/Redis.pm
Criterion Covered Total %
statement 22 24 91.6
branch n/a
condition n/a
subroutine 8 8 100.0
pod n/a
total 30 32 93.7


line stmt bran cond sub pod time code
1             package IPC::ConcurrencyLimit::Lock::Redis;
2 1     1   24499 use 5.008001;
  1         3  
  1         44  
3 1     1   7 use strict;
  1         2  
  1         50  
4 1     1   6 use warnings;
  1         6  
  1         57  
5              
6             our $VERSION = '0.01';
7              
8 1     1   6 use Carp qw(croak);
  1         2  
  1         82  
9 1     1   1089 use Redis;
  1         158282  
  1         35  
10 1     1   947 use Redis::ScriptCache;
  1         6424  
  1         33  
11 1     1   8 use Digest::SHA1 ();
  1         2  
  1         16  
12 1     1   448 use Data::UUID::MT ();
  0            
  0            
13             use Time::HiRes ();
14              
15             our $UUIDGenerator = Data::UUID::MT->new(version => "4s");
16              
17             use IPC::ConcurrencyLimit::Lock;
18             our @ISA = qw(IPC::ConcurrencyLimit::Lock);
19              
20             use Class::XSAccessor
21             getters => [qw(
22             redis_conn
23             max_procs
24             key_name
25             proc_info
26             script_cache
27             uuid
28             )];
29              
30             our $LuaScript_GetLock = q{
31             local key = KEYS[1]
32             local max_procs = ARGV[1]
33             local proc_info = ARGV[2]
34             local i
35              
36             for i = 1, max_procs, 1 do
37             local x = redis.call('hexists', key, i)
38             if x == 0 then
39             redis.call('hset', key, i, proc_info)
40             return i
41             end
42             end
43              
44             return 0
45             };
46             our $LuaScriptHash_GetLock = Digest::SHA1::sha1_hex($LuaScript_GetLock);
47              
48             # FIXME should this also check the uuid/etc like ClearOldLock?
49             our $LuaScript_ReleaseLock = q{
50             local key = KEYS[1]
51             local lockno = ARGV[1]
52             redis.call('hdel', key, lockno)
53             return 1
54             };
55             our $LuaScriptHash_ReleaseLock = Digest::SHA1::sha1_hex($LuaScript_ReleaseLock);
56              
57              
58             # FIXME use this for heartbeat: Generate new data with update time
59             our $LuaScript_UpdateUUID = q{
60             local key = KEYS[1]
61             local lockno = ARGV[1]
62             local olddata = ARGV[2]
63             local newdata = ARGV[3]
64             if redis.call('hget', key, lockno) == olddata then
65             redis.call('hset', key, lockno, newdata)
66             return 1
67             end
68             return 0
69             };
70             our $LuaScriptHash_UpdateUUID = Digest::SHA1::sha1_hex($LuaScript_UpdateUUID);
71              
72             our $LuaScript_ClearOldLock = q{
73             local key = KEYS[1]
74             local id = ARGV[1]
75             local proc_info = ARGV[2]
76              
77             local cleared = 0
78              
79             local x = redis.call('hget', key, id)
80             if x == proc_info then
81             redis.call('hdel', key, id)
82             cleared = 1
83             end
84              
85             return cleared
86             };
87             our $LuaScriptHash_ClearOldLock = Digest::SHA1::sha1_hex($LuaScript_ClearOldLock);
88              
89              
90             sub new {
91             my $class = shift;
92             my $opt = shift;
93              
94             my $max_procs = $opt->{max_procs}
95             or croak("Need a 'max_procs' parameter");
96             my $redis_conn = $opt->{redis_conn}
97             or croak("Need a 'redis_conn' parameter");
98             my $key_name = $opt->{key_name}
99             or croak("Need a 'key_name' parameter");
100              
101             my $sc = Redis::ScriptCache->new(redis_conn => $redis_conn);
102             $sc->register_script(\$LuaScript_GetLock, $LuaScriptHash_GetLock);
103             $sc->register_script(\$LuaScript_ReleaseLock, $LuaScriptHash_ReleaseLock);
104             $sc->register_script(\$LuaScript_UpdateUUID, $LuaScriptHash_UpdateUUID);
105              
106             my $proc_info = $opt->{proc_info};
107             $proc_info = '' if not defined $proc_info;
108             my $uuid = $UUIDGenerator->create;
109             my $self = bless {
110             max_procs => $max_procs,
111             redis_conn => $redis_conn,
112             key_name => $key_name,
113             id => undef,
114             script_cache => $sc,
115             proc_info => $proc_info,
116             uuid => $uuid,
117             } => $class;
118              
119             $self->_get_lock($key_name, $max_procs, $sc, $uuid . "-" . $proc_info)
120             or return undef;
121              
122             return $self;
123             }
124              
125             sub _get_lock {
126             my ($self, $key, $max_procs, $script_cache, $uuid_proc_info) = @_;
127              
128             my ($rv) = $script_cache->run_script(
129             $LuaScriptHash_GetLock, [1, $key, $max_procs, $uuid_proc_info]
130             );
131              
132             if (defined $rv and $rv > 0) {
133             $self->{id} = $rv;
134             return 1;
135             }
136              
137             return();
138             }
139              
140             sub _release_lock {
141             my $self = shift;
142             my $id = $self->id;
143             return if not $id;
144              
145             $self->script_cache->run_script(
146             $LuaScriptHash_ReleaseLock, [1, $self->key_name, $id]
147             );
148              
149             $self->{id} = undef;
150             }
151              
152             sub _updated_uuid {
153             my ($self) = @_;
154             my $old_uuid = $self->uuid;
155             my $new_uuid = $UUIDGenerator->create;
156             substr($new_uuid, 8, 8) = substr($old_uuid, 8, 8);
157             vec($new_uuid, 15, 4) = vec($old_uuid, 15, 4);
158             return $new_uuid;
159             }
160              
161             sub heartbeat {
162             my $self = shift;
163             my $conn = $self->redis_conn;
164             return() if not $conn;
165              
166             my $proc_info = $self->proc_info;
167             my $new_uuid = $self->_updated_uuid;
168             my $olddata = $self->uuid . "-" . $proc_info;
169             my $newdata = $new_uuid . "-" . $proc_info;
170              
171             my $ok;
172             eval {
173             $ok = $self->script_cache->run_script(
174             $LuaScriptHash_UpdateUUID,
175             [ 1, $self->key_name, $self->id, $olddata, $newdata ]
176             );
177             1
178             } or return(); # server gone away?
179              
180             if (not $ok) {
181             return(); # lock was acquired by somebody else?
182             }
183              
184             $self->{uuid} = $new_uuid;
185             return 1; # probably all fine
186             }
187              
188             sub DESTROY {
189             local $@;
190             my $self = shift;
191             $self->_release_lock();
192             }
193              
194              
195             # This is so ugly because we compile slightly different code depending on whether
196             # we're running on a perl that can do big-endian-forced-quads or not.
197             # FIXME Will work on 64bit perls only. Implementation for 32bit integers welcome.
198             # FIXME is this worth it or should it just do a run-time perl version check like heartbeat()?
199             eval(<<'PRE' . ($] ge '5.010' ? <<'NEW_PERL' : <<'OLD_PERL') . <<'POST')
200             sub clear_old_locks {
201             my ($class, $redis_conn, $key_name, $cutoff) = @_;
202              
203             my %hash = $redis_conn->hgetall($key_name);
204             return if not keys(%hash);
205             my $ncleared = 0;
206             foreach my $lockid (keys %hash) {
207             PRE
208             my ($quad) = unpack("Q>", $hash{$lockid});
209             $quad -= $quad % 16; # 60 bit only
210             NEW_PERL
211             my ($x, $y) = unpack("N2", $hash{$lockid});
212             $y -= $y % 16; # 60 bit only
213             my $quad = $x*2**32 + $y;
214             OLD_PERL
215             if ($quad/1e7 < $cutoff) {
216             $ncleared += $redis_conn->eval($LuaScript_ClearOldLock, 1, $key_name, $lockid, $hash{$lockid});
217             }
218             }
219             return $ncleared;
220             }
221             1
222             POST
223             or do {
224             my $err = $@ || 'Zombie error';
225             die "Failed to compile clear_old_locks code: $err";
226             };
227              
228             1;
229              
230             __END__