File Coverage

blib/lib/Queue/Q/ClaimFIFO/Redis.pm
Criterion Covered Total %
statement 33 100 33.0
branch 0 18 0.0
condition 0 10 0.0
subroutine 11 21 52.3
pod 8 10 80.0
total 52 159 32.7


line stmt bran cond sub pod time code
1             package Queue::Q::ClaimFIFO::Redis;
2 3     3   88551 use strict;
  3         6  
  3         76  
3 3     3   15 use warnings;
  3         6  
  3         81  
4 3     3   15 use Carp qw(croak);
  3         6  
  3         141  
5              
6 3     3   15 use Scalar::Util qw(blessed);
  3         10  
  3         235  
7 3     3   25 use Redis;
  3         6  
  3         77  
8 3     3   2061 use Redis::ScriptCache;
  3         7012  
  3         86  
9              
10 3     3   1077 use Queue::Q::ClaimFIFO;
  3         6  
  3         79  
11 3     3   661 use parent 'Queue::Q::ClaimFIFO';
  3         286  
  3         19  
12              
13             use Class::XSAccessor {
14 3         20 getters => [qw(server port queue_name db _redis_conn _script_cache)],
15 3     3   196 };
  3         5  
16              
17 3     3   989 use constant CLAIMED_SUFFIX => '_claimed';
  3         4  
  3         217  
18 3     3   15 use constant STORAGE_SUFFIX => '_storage';
  3         5  
  3         3660  
19              
20             # in: queue_name, itemkey, value
21             # out: nothing
22             our $EnqueueScript = qq#
23             redis.call('lpush', KEYS[1], ARGV[1])
24             redis.call('hset', KEYS[1] .. "${\STORAGE_SUFFIX()}", ARGV[1], ARGV[2])
25             #;
26              
27             # in: queue_name, time
28             # out: itemkey, value
29             our $ClaimScript = qq#
30             local itemkey = redis.call('rpop', KEYS[1])
31             if not itemkey then
32             return {nil, nil}
33             end
34             local data = redis.call('hget', KEYS[1] .. "${\STORAGE_SUFFIX()}", itemkey)
35             redis.call('zadd', KEYS[1] .. "${\CLAIMED_SUFFIX()}", ARGV[1], itemkey)
36             return {itemkey, data}
37             #;
38              
39             # in: queue_name, itemkey
40             # out: nothing
41             our $FinishScript = qq#
42             redis.call('hdel', KEYS[1] .. "${\STORAGE_SUFFIX()}", ARGV[1])
43             redis.call('zrem', KEYS[1] .. "${\CLAIMED_SUFFIX()}", ARGV[1])
44             #;
45              
46             sub new {
47 0     0 0   my ($class, %params) = @_;
48 0           for (qw(server port queue_name)) {
49             croak("Need '$_' parameter")
50 0 0         if not defined $params{$_};
51             }
52              
53             my $self = bless({
54 0           (map {$_ => $params{$_}} qw(server port queue_name) ),
55 0   0       db => $params{db} || 0,
56             _redis_conn => undef,
57             _script_ok => 0, # not yet known if lua script available
58             } => $class);
59              
60             $self->{_redis_conn} = Redis->new(
61 0 0         %{$params{redis_options} || {}},
  0            
62             encoding => undef, # force undef for binary data
63             server => join(":", $self->server, $self->port),
64             );
65             $self->{_script_cache}
66 0           = Redis::ScriptCache->new(redis_conn => $self->_redis_conn);
67             $self->{_script_cache}->register_script(
68 0           'enqueue_script',
69             $EnqueueScript,
70             );
71             $self->{_script_cache}->register_script(
72 0           'claim_script',
73             $ClaimScript,
74             );
75             $self->{_script_cache}->register_script(
76 0           'finish_script',
77             $FinishScript,
78             );
79              
80 0 0         $self->_redis_conn->select($self->db) if $self->db;
81              
82 0           return $self;
83             }
84              
85              
86             sub enqueue_item {
87 0     0 1   my $self = shift;
88 0 0         croak("Need exactly one item to enqeue")
89             if not @_ == 1;
90              
91 0           my $item = shift;
92 0 0 0       if (blessed($item) and $item->isa("Queue::Q::ClaimFIFO::Item")) {
93 0           croak("Don't pass a Queue::Q::ClaimFIFO::Item object to enqueue_item: "
94             . "Your data structure will be wrapped in one");
95             }
96 0           $item = Queue::Q::ClaimFIFO::Item->new(item_data => $item);
97              
98 0           $self->_script_cache->run_script(
99             'enqueue_script',
100             [1, $self->queue_name, $item->_key, $item->_serialized_data],
101             );
102              
103 0           return $item;
104             }
105              
106             sub enqueue_items {
107 0     0 1   my $self = shift;
108 0 0         return if not @_;
109              
110 0           my @items;
111 0           foreach my $item (@_) {
112 0 0 0       if (blessed($item) and $item->isa("Queue::Q::ClaimFIFO::Item")) {
113 0           croak("Don't pass a Queue::Q::ClaimFIFO::Item object to enqueue_items: "
114             . "Your data structure will be wrapped in one");
115             }
116 0           push @items, Queue::Q::ClaimFIFO::Item->new(item_data => $item);
117             }
118              
119             # FIXME, move loop onto the server or pipeline if possible!
120 0           my $qn = $self->queue_name;
121 0           for (0..$#items) {
122 0           my $key = $items[$_]->_key;
123 0           my $data = $items[$_]->_serialized_data;
124              
125 0           $self->_script_cache->run_script(
126             'enqueue_script',
127             [1, $qn, $key, $data],
128             );
129             }
130              
131 0           return @items;
132             }
133              
134             sub claim_item {
135 0     0 1   my $self = shift;
136              
137 0           my ($key, $serialized_data) = $self->_script_cache->run_script(
138             'claim_script',
139             [1, $self->queue_name, time()],
140             );
141 0 0         return undef if not defined $key;
142              
143 0           my $item = Queue::Q::ClaimFIFO::Item->new(
144             _serialized_data => $serialized_data,
145             _key => $key,
146             );
147 0           $item->{item_data} = $item->_deserialize_data($serialized_data);
148              
149 0           return $item;
150             }
151              
152             sub claim_items {
153 0     0 1   my $self = shift;
154 0   0       my $n = shift || 1;
155              
156 0           my @items;
157 0           for (1..$n) {
158             # TODO Lua script for multiple items!
159 0           my ($key, $serialized_data) = $self->_script_cache->run_script(
160             'claim_script',
161             [1, $self->queue_name, time()],
162             );
163 0 0         last if not defined $key;
164              
165 0           my $item = Queue::Q::ClaimFIFO::Item->new(
166             _serialized_data => $serialized_data,
167             _key => $key,
168             );
169 0           $item->{item_data} = $item->_deserialize_data($serialized_data);
170 0           push @items, $item;
171             }
172              
173 0           return @items;
174             }
175              
176             sub mark_item_as_done {
177 0     0 1   my ($self, $item) = @_;
178              
179 0           my $key = $item->_key;
180 0           $self->_script_cache->run_script(
181             'finish_script',
182             [1, $self->queue_name, $key],
183             );
184             }
185              
186             sub mark_items_as_done {
187 0     0 1   my ($self) = shift;
188              
189 0           foreach (@_) {
190             # TODO Lua script for multiple items!
191 0           my $key = $_->_key;
192 0           $self->_script_cache->run_script(
193             'finish_script',
194             [1, $self->queue_name, $key],
195             );
196             }
197             }
198              
199             sub flush_queue {
200 0     0 1   my $self = shift;
201 0           $self->_redis_conn->del($self->queue_name);
202 0           $self->_redis_conn->del($self->queue_name . CLAIMED_SUFFIX);
203 0           $self->_redis_conn->del($self->queue_name . STORAGE_SUFFIX);
204             }
205              
206             sub queue_length {
207 0     0 1   my $self = shift;
208 0           my ($len) = $self->_redis_conn->llen($self->queue_name);
209 0           return $len;
210             }
211              
212             sub claimed_count {
213 0     0 0   my $self = shift;
214 0           my ($len) = $self->_redis_conn->zcard($self->queue_name . CLAIMED_SUFFIX);
215 0           return $len;
216             }
217              
218             1;