line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package IPC::PubSub::Cache; |
2
|
2
|
|
|
2
|
|
12
|
use strict; |
|
2
|
|
|
|
|
2
|
|
|
2
|
|
|
|
|
60
|
|
3
|
2
|
|
|
2
|
|
9
|
use warnings; |
|
2
|
|
|
|
|
2
|
|
|
2
|
|
|
|
|
52
|
|
4
|
2
|
|
|
2
|
|
9
|
use File::Spec; |
|
2
|
|
|
|
|
3
|
|
|
2
|
|
|
|
|
45
|
|
5
|
2
|
|
|
2
|
|
1953
|
use Time::HiRes (); |
|
2
|
|
|
|
|
4388
|
|
|
2
|
|
|
|
|
561
|
|
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
#method fetch (Str *@keys --> List of Pair) { ... } |
8
|
|
|
|
|
|
|
#method store (Str $key, Str $val, Num $time, Num $expiry) { ... } |
9
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
#method add_publisher (Str $chan, Str $pub) { ... } |
11
|
|
|
|
|
|
|
#method remove_publisher (Str $chan, Str $pub) { ... } |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
#method get_index (Str $chan, Str $pub --> Int) { ... } |
14
|
|
|
|
|
|
|
#method set_index (Str $chan, Str $pub, Int $index) { ... } |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
#method publisher_indices (Str $chan --> Hash of Int) { ... } |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
sub fetch_data { |
19
|
8
|
|
|
8
|
0
|
12
|
my $self = shift; |
20
|
8
|
|
|
|
|
12
|
my $key = shift; |
21
|
8
|
|
100
|
|
|
40
|
return (($self->fetch("data:$key"))[0] || [])->[-1]; |
22
|
|
|
|
|
|
|
} |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
sub store_data { |
25
|
6
|
|
|
6
|
0
|
11
|
my $self = shift; |
26
|
6
|
|
|
|
|
8
|
my $key = shift; |
27
|
6
|
|
|
|
|
10
|
my $val = shift; |
28
|
6
|
|
|
|
|
28
|
$self->store("data:$key" => $val, -1, 0); |
29
|
|
|
|
|
|
|
} |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
sub modify { |
32
|
12
|
|
|
12
|
0
|
21
|
my $self = shift; |
33
|
12
|
|
|
|
|
20
|
my $key = shift; |
34
|
12
|
100
|
|
|
|
53
|
return $self->fetch_data($key) unless @_; |
35
|
|
|
|
|
|
|
|
36
|
6
|
|
|
|
|
11
|
my $with = shift; |
37
|
|
|
|
|
|
|
|
38
|
6
|
100
|
|
|
|
19
|
if (ref($with) eq 'CODE') { |
39
|
2
|
|
|
|
|
19
|
$self->lock("data:$key"); |
40
|
2
|
|
|
|
|
7
|
local $_ = $self->fetch_data($key); |
41
|
2
|
|
|
|
|
895
|
my $rv = $with->(); |
42
|
2
|
|
|
|
|
18
|
$self->store_data($key => $_); |
43
|
2
|
|
|
|
|
1344
|
$self->unlock("data:$key"); |
44
|
2
|
|
|
|
|
12
|
return $rv; |
45
|
|
|
|
|
|
|
} |
46
|
|
|
|
|
|
|
else { |
47
|
4
|
|
|
|
|
24
|
$self->store_data($key => $with); |
48
|
4
|
|
|
|
|
3289
|
return $with; |
49
|
|
|
|
|
|
|
} |
50
|
|
|
|
|
|
|
} |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
sub get { |
53
|
12
|
|
|
12
|
0
|
29
|
my ($self, $chan, $orig, $curr) = @_; |
54
|
|
|
|
|
|
|
|
55
|
2
|
|
|
2
|
|
14
|
no warnings 'uninitialized'; |
|
2
|
|
|
|
|
4
|
|
|
2
|
|
|
|
|
487
|
|
56
|
8
|
|
|
|
|
2392
|
sort { $a->[0] <=> $b->[0] } $self->fetch( |
|
8
|
|
|
|
|
15
|
|
57
|
|
|
|
|
|
|
map { |
58
|
12
|
|
|
|
|
45
|
my $pub = $_; |
59
|
8
|
|
|
|
|
21
|
my $index = $curr->{$pub}; |
60
|
10
|
|
|
|
|
133
|
map { |
61
|
8
|
|
|
|
|
50
|
"chan:$chan-$pub$_" |
62
|
|
|
|
|
|
|
} (($orig->{$pub}+1) .. $index); |
63
|
|
|
|
|
|
|
} keys(%$curr) |
64
|
|
|
|
|
|
|
); |
65
|
|
|
|
|
|
|
} |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
sub put { |
68
|
16
|
|
|
16
|
0
|
47
|
my ($self, $chan, $pub, $index, $msg, $expiry) = @_; |
69
|
16
|
|
|
|
|
147
|
$self->store("chan:$chan-$pub$index", $msg, Time::HiRes::time(), $expiry); |
70
|
16
|
|
|
|
|
23478
|
$self->set_index($chan, $pub, $index); |
71
|
|
|
|
|
|
|
} |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
|
74
|
2
|
|
|
2
|
|
12
|
use constant LOCK => File::Spec->catdir(File::Spec->tmpdir, 'IPC-PubSub-lock-'); |
|
2
|
|
|
|
|
11
|
|
|
2
|
|
|
|
|
618
|
|
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
my %locks; |
77
|
|
|
|
|
|
|
sub lock { |
78
|
2
|
|
|
2
|
0
|
4
|
my ($self, $chan) = @_; |
79
|
2
|
|
|
|
|
7
|
for my $i (1..10) { |
80
|
2
|
50
|
|
|
|
310
|
return if mkdir((LOCK . unpack("H*", $chan)), 0777); |
81
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep(rand(250000)+250000); |
82
|
|
|
|
|
|
|
} |
83
|
|
|
|
|
|
|
} |
84
|
|
|
|
|
|
|
|
85
|
0
|
|
|
0
|
0
|
0
|
sub disconnect { |
86
|
|
|
|
|
|
|
} |
87
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
END { |
89
|
2
|
|
|
2
|
|
274
|
rmdir(LOCK . unpack("H*", $_)) for keys %locks; |
90
|
|
|
|
|
|
|
} |
91
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
sub unlock { |
93
|
2
|
|
|
2
|
0
|
6
|
my ($self, $chan) = @_; |
94
|
2
|
|
|
|
|
318
|
rmdir(LOCK . unpack("H*", $chan)); |
95
|
2
|
|
|
|
|
6
|
delete $locks{$chan}; |
96
|
|
|
|
|
|
|
} |
97
|
|
|
|
|
|
|
|
98
|
|
|
|
|
|
|
1; |