File Coverage

blib/lib/Cache/Memcached/Turnstile.pm
Criterion Covered Total %
statement 51 191 26.7
branch 0 56 0.0
condition 0 13 0.0
subroutine 17 23 73.9
pod 2 2 100.0
total 70 285 24.5


line stmt bran cond sub pod time code
1             package Cache::Memcached::Turnstile;
2 1     1   24214 use 5.10.1;
  1         5  
  1         47  
3 1     1   7 use strict;
  1         1  
  1         34  
4 1     1   5 use warnings;
  1         6  
  1         46  
5              
6             our $VERSION = '0.01';
7              
8 1     1   6 use Exporter 'import';
  1         1  
  1         88  
9             our @EXPORT_OK = qw(cache_get_or_compute multi_cache_get_or_compute);
10             our %EXPORT_TAGS = ('all' => \@EXPORT_OK);
11              
12 1     1   921 use POSIX ();
  1         7752  
  1         38  
13 1     1   1955 use Time::HiRes ();
  1         2017  
  1         31  
14 1     1   1267 use Data::Dumper qw(Dumper);
  1         11527  
  1         101  
15 1     1   2286 use Constant::FromGlobal qw(DEBUG_DUMPS);
  1         11238  
  1         7  
16              
17             # Default value for the compute_time parameter.
18 1     1   176 use constant THUNDER_TIMEOUT => 2;
  1         2  
  1         45  
19              
20             # Structure of a value: [being-reprocessed-flag, real expiration timestamp, value]
21 1     1   4 use constant PROC_FLAG_IDX => 0;
  1         2  
  1         43  
22 1     1   4 use constant TIMEOUT_IDX => 1;
  1         2  
  1         41  
23 1     1   5 use constant VALUE_IDX => 2;
  1         2  
  1         44  
24              
25             # Flag names for being-processed-flag
26 1     1   6 use constant NOT_BEING_PROCESSED => 0;
  1         2  
  1         40  
27 1     1   13 use constant BEING_PROCESSED => 1;
  1         2  
  1         801  
28              
29             sub cache_get_or_compute {
30 0     0 1   my ($memd, %args) = @_;
31              
32             # named parameters: key, expiration, compute_cb, compute_time, wait
33              
34             # FIXME the local thing and recursion is a nasty hack.
35 0 0         if (!ref($args{wait})) {
36 0   0       my $wait_time = $args{wait} || $args{compute_time} || 0.1; # 100ms default
37             $args{wait} = sub {
38 0     0     my ($memd, $args) = @_;
39 0           Time::HiRes::sleep($wait_time);
40             # retry once only
41 0           return cache_get_or_compute($memd, %$args, "wait" => sub {return()});
  0            
42 0           };
43             }
44              
45             # Needs to be after the {wait} defaults handling since
46             # it refers to {compute_time} and wants the original value.
47 0   0       $args{compute_time} ||= THUNDER_TIMEOUT;
48              
49             # memcached says: timeouts >= 30days are timestamps. Yuck.
50             # Transform to relative value for sanity for now.
51 0   0       my $expiration = $args{expiration} // 0; # 0 == permanent
52 0 0         $args{expiration} = $expiration = $expiration - time()
53             if $expiration > 30*24*60*60;
54              
55 0           my $val_array = $memd->get($args{key});
56 0 0         if ($val_array) {
57 0 0         if ($val_array->[TIMEOUT_IDX] > time()) {
58             # Data not timed out yet.
59              
60 0 0         if (@$val_array >= 3) {
61             # All is well, cache hit.
62 0           return $val_array->[VALUE_IDX];
63             }
64             else {
65             # Not timed out, no data available, but there's an entry.
66             # Must be being processed for the first time.
67 0           return $args{wait}->($memd, \%args);
68             }
69              
70 0           die "Assert: Shouldn't be reached!";
71             }
72              
73             # Here, we know for sure that the data's timed out!
74              
75 0 0         if ($val_array->[PROC_FLAG_IDX]) {
76             # Data timed out. Somebody working on it already!
77 0           return $args{wait}->($memd, \%args);
78             }
79             else {
80             # Nobody working on it. And data is timed out. Requires re-computation and
81             # re-setting the value to include a flag to indicate it's being worked on.
82              
83             # Re-get using gets to get the CAS value.
84 0           my $cas_val = $memd->gets($args{key});
85 0 0         if (not defined $cas_val) {
    0          
86             # Must have been deleted/evicted in the meantime.
87             # *Attempt* to become the one to fill the cache.
88 0           return _try_to_compute($memd, \%args);
89             }
90             elsif ($cas_val->[1][PROC_FLAG_IDX]) {
91             # Somebody else is now working on it.
92 0           return $args{wait}->($memd, \%args);
93             }
94             else {
95 0           my $placeholder = [BEING_PROCESSED, 0];
96 0           $cas_val->[1] = $placeholder;
97 0 0         if ($memd->cas($args{key}, @$cas_val, POSIX::ceil($args{compute_time}))) {
98             # We inserted our placeholder. That means WE need to do the work.
99 0           return _compute_and_set($memd, \%args);
100             }
101             else {
102             # Somebody else is now working on it.
103 0           return $args{wait}->($memd, \%args);
104             }
105              
106 0           die "Assert: Shouldn't be reached!";
107             }
108 0           die "Assert: Shouldn't be reached!";
109             }
110 0           die "Assert: Shouldn't be reached!";
111             } # end if got data back from memcached
112             else {
113             # No data in memcached, so try to compute it ourselves.
114 0           return _try_to_compute($memd, \%args);
115             }
116              
117 0           die "Assert: Shouldn't be reached!";
118             }
119              
120             # Without further ado and checks and stuff, go ahead
121             # and compute the value from scratch and unconditionally
122             # write it to memcached.
123             # One could consider whether it makes sense to do another
124             # "do we need to update things" check after the computation,
125             # but this is only going to extend the validity of the data,
126             # and that's actually the correct thing to do.
127             sub _compute_and_set {
128 0     0     my ($memd, $args) = @_;
129              
130 0           my $real_value = $args->{compute_cb}->($memd, $args);
131              
132 0           my $expiration_at = time() + $args->{expiration};
133 0           $memd->set(
134             $args->{key},
135             [NOT_BEING_PROCESSED, $expiration_at, $real_value],
136             $expiration_at + POSIX::ceil($args->{compute_time})
137             );
138              
139 0           return $real_value;
140             }
141              
142             # Attempt to add a placeholder that says we're in charge of
143             # the computation. If that succeeds, compute. If that fails,
144             # enter fallback logic.
145             sub _try_to_compute {
146 0     0     my ($memd, $args) = @_;
147              
148 0           my $placeholder = [BEING_PROCESSED, 0];
149             # Immediately set that we're the first to generate it
150 0 0         if (not $memd->add($args->{key}, $placeholder, POSIX::ceil($args->{compute_time}))) {
151             # Somebody else is now working on it.
152 0           return $args->{wait}->($memd, $args);
153             }
154             else {
155             # We inserted our placeholder. That means WE need to do the work.
156 0           return _compute_and_set($memd, $args);
157             }
158              
159 0           die "Assert: Shouldn't be reached!";
160             }
161              
162              
163              
164              
165              
166              
167             # Indices for the keys sub-arrays
168 1     1   6 use constant KEYS_KEY_IDX => 0;
  1         1  
  1         50  
169 1     1   5 use constant KEYS_EXPIRE_IDX => 1;
  1         1  
  1         39  
170 1     1   848 use Clone ();
  1         3616  
  1         1587  
171              
172             #cache_fetch(
173             # $memd,
174             # 'keys' => [
175             # ['key1', $expire1],
176             # ['key2', $expire2],...
177             # ],
178             # compute_time => 2,
179             # compute_cb => sub {my ($memd, $args, $keys) = @_; },
180             # 'wait' => sub {my ($memd, $args, $keys) = @_; },
181             #);
182              
183             sub multi_cache_get_or_compute {
184 0     0 1   my ($memd, %args) = @_;
185              
186             # named parameters: keys => [[key, expiration],[key,expiration]...], compute_cb, compute_time, wait
187              
188 0 0         if (not ref($args{keys}) eq 'ARRAY') {
189 0           Carp::croak("Need 'keys' parameter to be of the form [ [key, expiration], [key, expiration], ... ]");
190             }
191              
192             # FIXME the local thing and recursion is a nasty hack.
193 0 0         if (!ref($args{wait})) {
194 0   0       my $wait_time = $args{wait} || $args{compute_time} || 0.1; # 100ms default
195             $args{wait} = sub {
196 0     0     my ($memd, $args) = @_;
197 0           Time::HiRes::sleep($wait_time);
198             # retry once only
199 0           return multi_cache_get_or_compute($memd, %$args, "wait" => sub {return()});
  0            
200 0           };
201             }
202              
203             # Needs to be after the {wait} defaults handling since
204             # it refers to {compute_time} and wants the original value.
205 0   0       $args{compute_time} ||= THUNDER_TIMEOUT;
206              
207 0           for (@{$args{keys}}) {
  0            
208 0 0         if ($_->[KEYS_EXPIRE_IDX]> 30*24*60*60) {
209 0           $args{keys} = Clone::clone($args{keys}); # avoid action at a distance
210 0           last;
211             }
212             }
213             # memcached says: timeouts >= 30days are timestamps. Yuck.
214             # Transform to relative value for sanity for now.
215 0           my %all_key_expirations;
216 0           foreach my $k_ary (@{$args{keys}}) {
  0            
217 0           my $expiration = $k_ary->[KEYS_EXPIRE_IDX];
218 0 0         $k_ary->[KEYS_EXPIRE_IDX] = $expiration = $expiration - time()
219             if $expiration > 30*24*60*60;
220              
221 0           $all_key_expirations{$k_ary->[KEYS_KEY_IDX]} = $expiration;
222             }
223              
224 0           my %output_hash;
225              
226 0           my @consider_keys = keys %all_key_expirations;
227 0           my $value_hash = $memd->get_multi(@consider_keys);
228              
229 0           my @keys_to_attempt; # keys to *attempt* to get a lock for
230             my @keys_to_wait_for; # keys to simply wait for (or user-specific logic)
231 0           my @keys_to_cas_update; # keys to do a cas dance on
232 0           my @keys_to_compute; # keys to simply compute (where we already have a lock)
233 0           KEY_LOOP: foreach my $key (@consider_keys) {
234 0           my $val_array = $value_hash->{$key};
235              
236             # Simply not found - attempt to compute
237 0 0         if (not $val_array) {
238 0           push @keys_to_attempt, $key;
239 0           next KEY_LOOP;
240             }
241              
242 0 0         if ($val_array->[TIMEOUT_IDX] > time()) {
243             # Data not timed out yet.
244 0 0         if (@$val_array >= 3) {
245             # All is well, cache hit.
246 0           $output_hash{$key} = $val_array->[VALUE_IDX];
247 0           next KEY_LOOP;
248             }
249             else {
250             # Not timed out, no data available, but there's an entry.
251             # Must be being processed for the first time.
252 0           push @keys_to_wait_for, $key;
253 0           next KEY_LOOP;
254             }
255              
256 0           die "Assert: Shouldn't be reached!";
257             }
258              
259             # Here, we know for sure that the data for this key has timed out!
260              
261 0 0         if ($val_array->[PROC_FLAG_IDX]) {
262             # Data timed out. Somebody working on it already!
263 0           push @keys_to_wait_for, $key;
264             }
265             else {
266             # Nobody working on it. And data is timed out. Requires re-computation
267             # and re-setting the value to include a flag to indicate
268             # it's being worked on.
269              
270 0           push @keys_to_cas_update, $key;
271             }
272             } # end while having undecided keys
273 0           if (DEBUG_DUMPS) {
274             warn "Key distribution after first run through:\n";
275             warn "Cache hit: " . Dumper(\%output_hash);
276             warn "Attempt: " . Dumper(\@keys_to_attempt);
277             warn "Compute: " . Dumper(\@keys_to_compute);
278             warn "CAS update: " . Dumper(\@keys_to_cas_update);
279             warn "Wait: " . Dumper(\@keys_to_wait_for);
280             }
281              
282             # First, do a CAS get/update on those keys that need it
283             # since it can feed the other key sets.
284              
285             # Re-get using gets to get the CAS value.
286 0 0         if (@keys_to_cas_update) {
287 0           my $cas_val_hash = $memd->gets_multi(@keys_to_cas_update);
288 0           foreach my $key (keys %$cas_val_hash) {
289 0           my $cas_val = $cas_val_hash->{$key};
290              
291 0 0 0       if (not defined $cas_val) {
  0 0          
    0          
292             # Must have been deleted/evicted in the meantime.
293             # *Attempt* to become the one to fill the cache.
294 0           push @keys_to_attempt, $key;
295             }
296             elsif (@{ $cas_val->[1] } >= 3
297             and $cas_val->[1][TIMEOUT_IDX] > time())
298             {
299             # Somebody managed to set a valid value in the meantime
300             # => All is well, cache hit.
301 0           $output_hash{$key} = $cas_val->[VALUE_IDX];
302 0           delete $cas_val_hash->{$key};
303             }
304             elsif ($cas_val->[1][PROC_FLAG_IDX]) {
305             # Somebody else is now working on it.
306 0           push @keys_to_wait_for, $key;
307 0           delete $cas_val_hash->{$key};
308             }
309             }
310              
311             # All keys in $cas_val_hash can now be marked as "being processed"
312 0 0         if (keys %$cas_val_hash) {
313 0           my $exp_time = POSIX::ceil($args{compute_time});
314 0           my @cas_args = map
315             # key, CAS, value, expiration
316             [ $_,
317             $cas_val_hash->{$_}[0],
318             [BEING_PROCESSED, 0],
319             $exp_time
320             ],
321             keys %$cas_val_hash;
322 0           my @statuses = $memd->cas_multi(@cas_args);
323 0           foreach my $i (0..$#statuses) {
324 0           my $key = $cas_args[$i][0];
325 0 0         if ($statuses[$i]) {
326             # We inserted our placeholder. That means WE need to do the work.
327 0           push @keys_to_compute, $key;
328             }
329             else {
330             # Somebody else is now working on it.
331 0           push @keys_to_wait_for, $key;
332             }
333             }
334             } # end "if have keys that need locking with CAS"
335              
336 0           @keys_to_cas_update = (); # cleanup
337             } # end "if have keys to give the cas treatment"
338 0           if (DEBUG_DUMPS) {
339             warn "Key distribution after CAS:\n";
340             warn "Output: " . Dumper(\%output_hash);
341             warn "Attempt: " . Dumper(\@keys_to_attempt);
342             warn "Compute: " . Dumper(\@keys_to_compute);
343             warn "Wait: " . Dumper(\@keys_to_wait_for);
344             }
345              
346             # Then attempt to get a placeholder for the keys that need computing
347 0 0         if (@keys_to_attempt) {
348 0           my $exp_time = POSIX::ceil($args{compute_time});
349 0           my @add_args = map
350             # key, value, expiration
351             [ $_,
352             [BEING_PROCESSED, 0],
353             $exp_time
354             ],
355             @keys_to_attempt;
356              
357 0           my @statuses = $memd->add_multi(@add_args);
358 0           foreach my $i (0..$#statuses) {
359 0           my $key = $add_args[$i][0];
360 0 0         if ($statuses[$i]) {
361             # We inserted our placeholder. That means WE need to do the work.
362 0           push @keys_to_compute, $key;
363             }
364             else {
365             # Somebody else is now working on it.
366 0           push @keys_to_wait_for, $key;
367             }
368             }
369              
370 0           @keys_to_attempt = (); # cleanup
371             } # end "if have keys to get a lock for"
372 0           if (DEBUG_DUMPS) {
373             warn "Key distribution after lock attempt:\n";
374             warn "Cache hit: " . Dumper(\%output_hash);
375             warn "Compute: " . Dumper(\@keys_to_compute);
376             warn "Wait: " . Dumper(\@keys_to_wait_for);
377             }
378            
379              
380             # Then do the actual computations where necessary
381 0 0         if (@keys_to_compute) {
382 0           my @values = $args{compute_cb}->($memd, \%args, \@keys_to_compute);
383              
384 0           my @set_args;
385 0           my $now = time();
386 0           my $compute_time = POSIX::ceil($args{compute_time});
387 0           foreach my $i (0..$#keys_to_compute) {
388 0           my $key = $keys_to_compute[$i];
389 0           my $expire_at = time() + $all_key_expirations{$key};
390 0           push @set_args, [
391             $key,
392             [NOT_BEING_PROCESSED, $expire_at, $values[$i]],
393             $expire_at + $compute_time
394             ];
395              
396 0           $output_hash{$key} = $values[$i];
397             }
398              
399 0           $memd->set_multi(@set_args);
400              
401 0           @keys_to_compute = (); # cleanup
402             }
403 0           if (DEBUG_DUMPS) {
404             warn "Key distribution after compute:\n";
405             warn "Cache hit: " . Dumper(\%output_hash);
406             warn "Wait: " . Dumper(\@keys_to_wait_for);
407             }
408              
409             # Then perform the waiting actions as necessary
410             # TODO: It may make sense to do things like somehow include
411             # the time it already took to do the previous processing
412             # in order not to pessimize more than necessary.
413 0 0         if (@keys_to_wait_for) {
414 0           my $h = $args{wait}->($memd, \%args, \@keys_to_wait_for);
415 0           $output_hash{$_} = $h->{$_} for keys %$h; # merge output
416              
417 0           @keys_to_wait_for = (); # cleanup
418             }
419              
420 0           return \%output_hash;
421             }
422              
423             1;
424             __END__