File Coverage

blib/lib/Directory/Queue/Simple.pm
Criterion Covered Total %
statement 128 161 79.5
branch 29 70 41.4
condition 3 21 14.2
subroutine 23 27 85.1
pod 13 13 100.0
total 196 292 67.1


line stmt bran cond sub pod time code
1             #+##############################################################################
2             # #
3             # File: Directory/Queue/Simple.pm #
4             # #
5             # Description: object oriented interface to a simple directory based queue #
6             # #
7             #-##############################################################################
8              
9             #
10             # module definition
11             #
12              
13             package Directory::Queue::Simple;
14 4     4   1230 use strict;
  4         11  
  4         133  
15 4     4   19 use warnings;
  4         6  
  4         299  
16             our $VERSION = "2.2";
17             our $REVISION = sprintf("%d.%02d", q$Revision: 1.19 $ =~ /(\d+)\.(\d+)/);
18              
19             #
20             # used modules
21             #
22              
23 4     4   391 use Directory::Queue qw(_create _name _touch SYSBUFSIZE /Regexp/ /special/);
  4         7  
  4         20  
24 4     4   1145 use No::Worries::Die qw(dief);
  4         9  
  4         28  
25 4     4   1338 use No::Worries::File qw(file_read file_write);
  4         22647  
  4         23  
26 4     4   439 use No::Worries::Stat qw(ST_MTIME);
  4         7  
  4         23  
27 4     4   1411 use No::Worries::Warn qw(warnf);
  4         1748  
  4         21  
28 4     4   323 use POSIX qw(:errno_h);
  4         6  
  4         17  
29              
30             #
31             # inheritance
32             #
33              
34             our(@ISA) = qw(Directory::Queue);
35              
36             #
37             # constants
38             #
39              
40             # suffix indicating a temporary element
41 4     4   1275 use constant TEMPORARY_SUFFIX => ".tmp";
  4         5  
  4         244  
42              
43             # suffix indicating a locked element
44 4     4   21 use constant LOCKED_SUFFIX => ".lck";
  4         7  
  4         7102  
45              
46             #
47             # object constructor
48             #
49              
50             sub new : method {
51 9     9 1 1488 my($class, %option) = @_;
52 9         13 my($self);
53              
54             # default object
55 9         49 $self = __PACKAGE__->SUPER::_new(%option);
56 9         22 foreach my $name (qw(path maxlock maxtemp rndhex umask)) {
57 45         64 delete($option{$name});
58             }
59             # check granularity
60 9 50       23 if (defined($option{granularity})) {
61             dief("invalid granularity: %s", $option{granularity})
62 0 0       0 unless $option{granularity} =~ /^\d+$/;
63 0         0 $self->{granularity} = delete($option{granularity});
64             } else {
65 9         38 $self->{granularity} = 60; # default
66             }
67             # check unexpected options
68 9         21 foreach my $name (keys(%option)) {
69 0         0 dief("unexpected option: %s", $name);
70             }
71             # so far so good...
72 9         30 return($self);
73             }
74              
75             #
76             # helpers for the add methods
77             #
78              
79             sub _add_dir ($) {
80 16     16   18 my($self) = @_;
81 16         18 my($time);
82              
83 16         21 $time = time();
84 16 50       45 $time -= $time % $self->{granularity} if $self->{granularity};
85 16         47 return(sprintf("%08x", $time));
86             }
87              
88             sub _add_data ($$) {
89 16     16   21 my($self, $dataref) = @_;
90 16         20 my($dir, $name, $path, $fh);
91              
92 16         33 $dir = _add_dir($self);
93 16         27 while (1) {
94 18         40 $name = _name($self->{rndhex});
95 18         44 $path = $self->{path}."/".$dir."/".$name . TEMPORARY_SUFFIX;
96 18         52 $fh = _create($path, $self->{umask});
97 18 100       51 last if $fh;
98             _special_mkdir($self->{path}."/".$dir, $self->{umask})
99 2 50       16 if $! == ENOENT;
100             }
101 16         55 file_write($path, handle => $fh, data => $dataref);
102 16         1369 return($dir, $path);
103             }
104              
105             sub _add_path ($$$) {
106 16     16   31 my($self, $tmp, $dir) = @_;
107 16         25 my($name, $new);
108              
109 16         19 while (1) {
110 16         39 $name = _name($self->{rndhex});
111 16         39 $new = $self->{path}."/".$dir."/".$name;
112             # N.B. we use link() + unlink() to make sure $new is never overwritten
113 16 50       436 if (link($tmp, $new)) {
114 16 50       350 unlink($tmp) or dief("cannot unlink(%s): %s", $tmp, $!);
115 16         122 return($dir."/".$name);
116             }
117 0 0       0 dief("cannot link(%s, %s): %s", $tmp, $new, $!) unless $! == EEXIST;
118             }
119             }
120              
121             #
122             # add a new element to the queue and return its name
123             #
124              
125             sub add : method {
126 16     16 1 530 my($self, $data) = @_;
127 16         25 my($dir, $path);
128              
129 16         36 ($dir, $path) = _add_data($self, \$data);
130 16         56 return(_add_path($self, $path, $dir));
131             }
132              
133             sub add_ref : method {
134 0     0 1 0 my($self, $dataref) = @_;
135 0         0 my($dir, $path);
136              
137 0         0 ($dir, $path) = _add_data($self, $dataref);
138 0         0 return(_add_path($self, $path, $dir));
139             }
140              
141             sub add_path : method {
142 0     0 1 0 my($self, $path) = @_;
143 0         0 my($dir);
144              
145 0         0 $dir = _add_dir($self);
146 0         0 _special_mkdir($self->{path}."/".$dir, $self->{umask});
147 0         0 return(_add_path($self, $path, $dir));
148             }
149              
150             #
151             # get a locked element
152             #
153              
154             sub get : method {
155 1     1 1 4 my($self, $name) = @_;
156              
157 1         5 return(file_read($self->{path}."/".$name . LOCKED_SUFFIX));
158             }
159              
160             sub get_ref : method {
161 1     1 1 607 my($self, $name) = @_;
162 1         1 my($data);
163              
164 1         10 return(file_read($self->{path}."/".$name . LOCKED_SUFFIX, data => \$data));
165             }
166              
167             sub get_path : method {
168 0     0 1 0 my($self, $name) = @_;
169              
170 0         0 return($self->{path}."/".$name . LOCKED_SUFFIX);
171             }
172              
173             #
174             # lock an element:
175             # - return true on success
176             # - return false in case the element could not be locked (in permissive mode)
177             #
178              
179             sub lock : method { ## no critic 'ProhibitBuiltinHomonyms'
180 14     14 1 282 my($self, $name, $permissive) = @_;
181 14         19 my($path, $lock, $time, $ignored);
182              
183 14 50       31 $permissive = 1 unless defined($permissive);
184 14         25 $path = $self->{path}."/".$name;
185 14         18 $lock = $path . LOCKED_SUFFIX;
186 14 50       434 unless (link($path, $lock)) {
187 0 0 0     0 return(0) if $permissive and ($! == EEXIST or $! == ENOENT);
      0        
188 0         0 dief("cannot link(%s, %s): %s", $path, $lock, $!);
189             }
190             # we also touch the element to indicate the lock time
191 14         35 $time = time();
192 14 50       151 unless (utime($time, $time, $path)) {
193 0 0 0     0 if ($permissive and $! == ENOENT) {
194             # RACE: the element file does not exist anymore
195             # (this can happen if an other process locked & removed the element
196             # while our link() was in progress... yes, this can happen!
197             # we do our best and ignore what unlink() returns)
198 0         0 $ignored = unlink($lock);
199 0         0 return(0);
200             }
201             # otherwise this is unexpected...
202 0         0 dief("cannot utime(%d, %d, %s): %s", $time, $time, $path, $!);
203             }
204             # so far so good
205 14         49 return(1);
206             }
207              
208             #
209             # unlock an element:
210             # - return true on success
211             # - return false in case the element could not be unlocked (in permissive mode)
212             #
213              
214             sub unlock : method {
215 1     1 1 614 my($self, $name, $permissive) = @_;
216 1         2 my($path, $lock);
217              
218 1 50       4 $permissive = 0 unless defined($permissive);
219 1         3 $path = $self->{path}."/".$name;
220 1         3 $lock = $path . LOCKED_SUFFIX;
221 1 50       32 return(1) if unlink($lock);
222 0 0 0     0 return(0) if $permissive and $! == ENOENT;
223 0         0 dief("cannot unlink(%s): %s", $lock, $!);
224             }
225              
226             #
227             # touch an element to indicate that it is still being used
228             #
229              
230             sub touch : method {
231 0     0 1 0 my($self, $element) = @_;
232              
233 0         0 _touch($self->{"path"}."/".$element);
234             }
235              
236             #
237             # remove a locked element from the queue
238             #
239              
240             sub remove : method {
241 14     14 1 54 my($self, $name) = @_;
242 14         19 my($path, $lock);
243              
244 14         28 $path = $self->{path}."/".$name;
245 14         23 $lock = $path . LOCKED_SUFFIX;
246 14 50       301 unlink($path) or dief("cannot unlink(%s): %s", $path, $!);
247 14 100       511 unlink($lock) or dief("cannot unlink(%s): %s", $lock, $!);
248             }
249              
250             #
251             # return the number of elements in the queue, locked or not (but not temporary)
252             #
253              
254             sub count : method {
255 8     8 1 2635 my($self) = @_;
256 8         13 my($count, @list);
257              
258 8         11 $count = 0;
259             # get the list of directories
260 8         20 foreach my $name (_special_getdir($self->{path}, "strict")) {
261 8 50       90 push(@list, $1) if $name =~ /^($_DirectoryRegexp)$/o; # untaint
262             }
263             # count the elements inside
264 8         17 foreach my $name (@list) {
265             $count += grep(/^(?:$_ElementRegexp)$/o,
266 8         29 _special_getdir($self->{path}."/".$name));
267             }
268             # that's all
269 8         41 return($count);
270             }
271              
272             #
273             # purge an intermediate directory
274             #
275              
276             sub _purge_dir ($$$) {
277 1     1   3 my($dir, $oldtemp, $oldlock) = @_;
278 1         2 my($path, @stat);
279              
280 1         3 foreach my $name (grep(/\./, _special_getdir($dir))) {
281 1         5 $path = $dir."/".$name;
282 1         12 @stat = stat($path);
283 1 50       4 unless (@stat) {
284 0 0       0 dief("cannot stat(%s): %s", $path, $!) unless $! == ENOENT;
285 0         0 next;
286             }
287 1 50 33     6 next if substr($name, -4) eq TEMPORARY_SUFFIX
288             and $stat[ST_MTIME] >= $oldtemp;
289 1 50 33     6 next if substr($name, -4) eq LOCKED_SUFFIX
290             and $stat[ST_MTIME] >= $oldlock;
291 1         5 warnf("removing too old volatile file: %s", $path);
292 1 50       86 next if unlink($path);
293 0 0       0 dief("cannot unlink(%s): %s", $path, $!) unless $! == ENOENT;
294             }
295             }
296              
297             #
298             # purge the queue
299             #
300              
301             sub purge : method {
302 1     1 1 29 my($self, %option) = @_;
303 1         3 my(@list, $path, $oldtemp, $oldlock);
304              
305             # check options
306 1 50       5 $option{maxtemp} = $self->{maxtemp} unless defined($option{maxtemp});
307 1 50       4 $option{maxlock} = $self->{maxtemp} unless defined($option{maxlock});
308 1         4 foreach my $name (keys(%option)) {
309 2 50       11 dief("unexpected option: %s", $name)
310             unless $name =~ /^(maxtemp|maxlock)$/;
311             dief("invalid %s: %s", $name, $option{$name})
312 2 50       8 unless $option{$name} =~ /^\d+$/;
313             }
314             # get the list of intermediate directories
315 1         2 @list = ();
316 1         4 foreach my $name (_special_getdir($self->{path}, "strict")) {
317 1 50       31 push(@list, $1) if $name =~ /^($_DirectoryRegexp)$/o; # untaint
318             }
319             # remove the old temporary or locked elements
320 1         3 $oldtemp = $oldlock = 0;
321 1 50       14 $oldtemp = time() - $option{maxtemp} if $option{maxtemp};
322 1 50       3 $oldlock = time() - $option{maxlock} if $option{maxlock};
323 1 50 33     4 if ($oldtemp or $oldlock) {
324 1         3 foreach my $name (@list) {
325 1         4 _purge_dir($self->{path}."/".$name, $oldtemp, $oldlock);
326             }
327             }
328             # try to purge all but the last intermediate directory
329 1 50       11 if (@list > 1) {
330 0           @list = sort(@list);
331 0           pop(@list);
332 0           foreach my $name (@list) {
333 0           $path = $self->{path}."/".$name;
334 0 0         _special_rmdir($path) unless _special_getdir($path);
335             }
336             }
337             }
338              
339             1;
340              
341             __END__