line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Queue::Dir; |
2
|
|
|
|
|
|
|
# $Id: Dir.pm,v 1.13 2003/03/09 16:18:48 lem Exp $ |
3
|
|
|
|
|
|
|
|
4
|
|
|
|
|
|
|
require 5.005_62; |
5
|
|
|
|
|
|
|
|
6
|
11
|
|
|
11
|
|
21718
|
use strict; |
|
11
|
|
|
|
|
22
|
|
|
11
|
|
|
|
|
605
|
|
7
|
11
|
|
|
11
|
|
12956
|
use IO::Dir; |
|
11
|
|
|
|
|
355022
|
|
|
11
|
|
|
|
|
571
|
|
8
|
11
|
|
|
11
|
|
100
|
use IO::File; |
|
11
|
|
|
|
|
24
|
|
|
11
|
|
|
|
|
1538
|
|
9
|
11
|
|
|
11
|
|
56
|
use warnings; |
|
11
|
|
|
|
|
19
|
|
|
11
|
|
|
|
|
276
|
|
10
|
11
|
|
|
11
|
|
10856
|
use Sys::Hostname; |
|
11
|
|
|
|
|
15171
|
|
|
11
|
|
|
|
|
800
|
|
11
|
11
|
|
|
11
|
|
71
|
use Fcntl qw(:flock); |
|
11
|
|
|
|
|
24
|
|
|
11
|
|
|
|
|
1390
|
|
12
|
11
|
|
|
11
|
|
10651
|
use Params::Validate qw(:all); |
|
11
|
|
|
|
|
145393
|
|
|
11
|
|
|
|
|
3206
|
|
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
our $Debug = 0; |
15
|
|
|
|
|
|
|
our $hires = 'gettimeofday'; |
16
|
|
|
|
|
|
|
|
17
|
11
|
|
|
11
|
|
11731
|
eval "use Time::HiRes qw(gettimeofday);"; |
|
11
|
|
|
|
|
24436
|
|
|
11
|
|
|
|
|
53
|
|
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
if ($@) { $hires = 'time' } |
20
|
|
|
|
|
|
|
|
21
|
11
|
|
|
11
|
|
99
|
use vars qw($a $b); |
|
11
|
|
|
|
|
19
|
|
|
11
|
|
|
|
|
8098
|
|
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
our $VERSION = 0.01; |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
=pod |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 NAME |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
Queue::Dir - Manage queue directories where each object is a file |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
=head1 SYNOPSIS |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
use Queue::Dir; |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
my $q = new Queue::File ( |
36
|
|
|
|
|
|
|
-id => $my_process_id, |
37
|
|
|
|
|
|
|
-paths => [ '/var/path/to/queue1', ... ], |
38
|
|
|
|
|
|
|
-promiscuous => 1, |
39
|
|
|
|
|
|
|
-sort => 'sortsub', |
40
|
|
|
|
|
|
|
-filter => sub { ... }, |
41
|
|
|
|
|
|
|
-lockdir => 'lock', |
42
|
|
|
|
|
|
|
-lockmax => 300, |
43
|
|
|
|
|
|
|
); |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
my ($fh, $qid) = $q->store($oid); |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
my $qid = $q->next(); |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
my $fh = $q->visit($mode, $qid); |
50
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
my $status = $q->done($qid); |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
my $name = $q->name($qid); |
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=head1 DESCRIPTION |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
C allows the manipulation of objects placed in a |
58
|
|
|
|
|
|
|
queue. The queue is implemented as a directory where each object is |
59
|
|
|
|
|
|
|
stored as a file. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
=head2 METHODS |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
The following methods are defined: |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=over 4 |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=item C |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
B<-id> assigns a unique process-id to this queue object. Defaults to |
70
|
|
|
|
|
|
|
something built from the serialization of the object + C<$$> or |
71
|
|
|
|
|
|
|
something similar. |
72
|
|
|
|
|
|
|
|
73
|
|
|
|
|
|
|
B<-paths> specifies a list of paths to use as storage points for the |
74
|
|
|
|
|
|
|
queue files. If more than one are supplied, round-robin will be used |
75
|
|
|
|
|
|
|
to store objects there. |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
When B<-promiscuous> is true (the default), objects stored with any |
78
|
|
|
|
|
|
|
other C object are accessible. If set to false, only |
79
|
|
|
|
|
|
|
files whose id matches the value for B<-id> are visible. |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
B<-sort> allows for the specification of a sorting function, used to |
82
|
|
|
|
|
|
|
decide the order in which the queue files will be used. The function |
83
|
|
|
|
|
|
|
is invoked in the same fashion as C, getting two variables |
84
|
|
|
|
|
|
|
(C<$a> and C<$b>) and returning -1, 0 or 1 depending on |
85
|
|
|
|
|
|
|
comparison. C<$a> and C<$b> are hash references whose first element is |
86
|
|
|
|
|
|
|
the queue id of the object and the second element is a the full |
87
|
|
|
|
|
|
|
pathname of such object. |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
The C passed in the B<-filter> parameter can control |
90
|
|
|
|
|
|
|
which files in a given directory to consider as queue objects. By |
91
|
|
|
|
|
|
|
default, all files will be considered part of the queue. This function |
92
|
|
|
|
|
|
|
is called with a reference of the invoking object and the full |
93
|
|
|
|
|
|
|
pathname of each file. A true return value causes the given file to be |
94
|
|
|
|
|
|
|
included in the queue. Note that this is only called if |
95
|
|
|
|
|
|
|
B<-promiscuous> is set to a false value. |
96
|
|
|
|
|
|
|
|
97
|
|
|
|
|
|
|
B<-lockdir> and B<-lockmax> control an optional locking mechanism that |
98
|
|
|
|
|
|
|
reduces the chance of multiple collaborating instances of |
99
|
|
|
|
|
|
|
C objects, from picking the same object from the |
100
|
|
|
|
|
|
|
queue. B<-lockdir>, when present, defines the name of the directory |
101
|
|
|
|
|
|
|
(within each queue directory) to use for storing the lock files. The |
102
|
|
|
|
|
|
|
B<-lockmax> parameter, which defaults to 300 seconds, control for how |
103
|
|
|
|
|
|
|
long the locks are honored. |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
Note that locking is disabled by default. |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
=cut |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
sub new |
110
|
|
|
|
|
|
|
{ |
111
|
23
|
|
|
23
|
1
|
6833
|
my $name = shift; |
112
|
23
|
|
33
|
|
|
133
|
my $class = ref($name) || $name; |
113
|
|
|
|
|
|
|
|
114
|
23
|
50
|
|
|
|
99
|
warn "Queue::Dir::new()\n" if $Debug; |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
my %self = validate_with |
117
|
|
|
|
|
|
|
( |
118
|
|
|
|
|
|
|
params => \@_, |
119
|
|
|
|
|
|
|
ignore_case => 1, |
120
|
|
|
|
|
|
|
strip_leading => '-', |
121
|
|
|
|
|
|
|
spec => |
122
|
|
|
|
|
|
|
{ |
123
|
|
|
|
|
|
|
id => |
124
|
|
|
|
|
|
|
{ |
125
|
|
|
|
|
|
|
type => SCALAR, |
126
|
|
|
|
|
|
|
default => hostname . $$, |
127
|
|
|
|
|
|
|
}, |
128
|
|
|
|
|
|
|
paths => |
129
|
|
|
|
|
|
|
{ |
130
|
|
|
|
|
|
|
type => ARRAYREF, |
131
|
|
|
|
|
|
|
callbacks => |
132
|
|
|
|
|
|
|
{ |
133
|
23
|
|
|
23
|
|
1872
|
directory => sub { $_ = shift; @$_ == grep { -d } @$_; } |
|
23
|
|
|
|
|
69
|
|
|
24
|
|
|
|
|
1370
|
|
134
|
|
|
|
|
|
|
} |
135
|
|
|
|
|
|
|
}, |
136
|
|
|
|
|
|
|
promiscuous => |
137
|
|
|
|
|
|
|
{ |
138
|
|
|
|
|
|
|
type => SCALAR | BOOLEAN, |
139
|
|
|
|
|
|
|
default => 1, |
140
|
|
|
|
|
|
|
}, |
141
|
|
|
|
|
|
|
sort => |
142
|
|
|
|
|
|
|
{ |
143
|
|
|
|
|
|
|
type => SCALAR, |
144
|
|
|
|
|
|
|
default => 'Queue::Dir::_sort', |
145
|
|
|
|
|
|
|
}, |
146
|
|
|
|
|
|
|
lockdir => |
147
|
|
|
|
|
|
|
{ |
148
|
|
|
|
|
|
|
type => SCALAR, |
149
|
|
|
|
|
|
|
default => undef, |
150
|
|
|
|
|
|
|
}, |
151
|
|
|
|
|
|
|
lockmax => |
152
|
|
|
|
|
|
|
{ |
153
|
|
|
|
|
|
|
type => SCALAR, |
154
|
|
|
|
|
|
|
default => 300, |
155
|
|
|
|
|
|
|
callbacks => |
156
|
|
|
|
|
|
|
{ |
157
|
0
|
|
|
0
|
|
0
|
numeric => sub { shift =~ /^\d+$/ }, |
158
|
0
|
|
|
0
|
|
0
|
positive => sub { shift > 0 }, |
159
|
|
|
|
|
|
|
}, |
160
|
|
|
|
|
|
|
}, |
161
|
|
|
|
|
|
|
filter => |
162
|
|
|
|
|
|
|
{ |
163
|
|
|
|
|
|
|
type => CODEREF, |
164
|
|
|
|
|
|
|
default => sub |
165
|
|
|
|
|
|
|
{ |
166
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
167
|
0
|
|
|
|
|
0
|
my $long = shift; |
168
|
|
|
|
|
|
|
|
169
|
0
|
0
|
|
|
|
0
|
return 0 unless $long; |
170
|
|
|
|
|
|
|
|
171
|
0
|
|
|
|
|
0
|
my ($path, $id) = (File::Spec->splitpath($long))[1,2]; |
172
|
|
|
|
|
|
|
|
173
|
0
|
|
|
|
|
0
|
for my $p (@{$self->{paths}}) |
|
0
|
|
|
|
|
0
|
|
174
|
|
|
|
|
|
|
{ |
175
|
0
|
0
|
0
|
|
|
0
|
if (substr($p->[0], $path, 0) == 0 |
|
|
|
0
|
|
|
|
|
176
|
|
|
|
|
|
|
and -f $p->[0] . '/' . $id |
177
|
|
|
|
|
|
|
and $id =~ m!^\d+\.\d+\.$self->{id}\.\d+$!) |
178
|
|
|
|
|
|
|
{ |
179
|
0
|
|
|
|
|
0
|
return 1; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
} |
182
|
|
|
|
|
|
|
|
183
|
0
|
|
|
|
|
0
|
return 0; |
184
|
|
|
|
|
|
|
}, |
185
|
|
|
|
|
|
|
}, |
186
|
23
|
|
|
|
|
144
|
}); |
187
|
|
|
|
|
|
|
|
188
|
21
|
|
|
|
|
406
|
@{$self{paths}} = sort { $a cmp $b } @{$self{paths}}; |
|
21
|
|
|
|
|
68
|
|
|
1
|
|
|
|
|
3
|
|
|
21
|
|
|
|
|
74
|
|
189
|
|
|
|
|
|
|
|
190
|
21
|
|
|
|
|
52
|
$_ = [$_, new IO::Dir $_] for @{$self{paths}}; |
|
21
|
|
|
|
|
196
|
|
191
|
|
|
|
|
|
|
|
192
|
21
|
50
|
|
|
|
1970
|
if (grep { ! defined $_->[1] } @{$self{paths}}) { |
|
22
|
|
|
|
|
134
|
|
|
21
|
|
|
|
|
52
|
|
193
|
0
|
|
|
|
|
0
|
warn "One of the queue paths seems invalid\n"; |
194
|
0
|
|
|
|
|
0
|
return; |
195
|
|
|
|
|
|
|
} |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
# Prime the object with an empty file |
198
|
|
|
|
|
|
|
# inventory. |
199
|
21
|
|
|
|
|
74
|
$self{_files} = []; |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
# We store objects in round-robin. |
202
|
21
|
|
|
|
|
47
|
$self{_rr} = 0; |
203
|
21
|
|
|
|
|
52
|
$self{_current} = [0, 0]; |
204
|
|
|
|
|
|
|
|
205
|
21
|
|
|
|
|
55
|
my $self = bless \%self, $class; |
206
|
|
|
|
|
|
|
|
207
|
21
|
100
|
|
|
|
129
|
$self->_clean_locks if $self->{lockdir}; |
208
|
|
|
|
|
|
|
|
209
|
21
|
|
|
|
|
492
|
return $self->_refresh; |
210
|
|
|
|
|
|
|
} |
211
|
|
|
|
|
|
|
|
212
|
60
|
|
|
60
|
|
99
|
sub _sort { $a->[0] cmp $b->[0]; } |
213
|
11
|
|
|
11
|
|
69
|
sub _timestamp { no strict "refs"; return join '', &$hires; } |
|
11
|
|
|
28
|
|
26
|
|
|
11
|
|
|
|
|
22291
|
|
|
28
|
|
|
|
|
282
|
|
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
# Update the inventory of queue |
216
|
|
|
|
|
|
|
# objects, if required. |
217
|
|
|
|
|
|
|
sub _refresh |
218
|
|
|
|
|
|
|
{ |
219
|
64
|
|
|
64
|
|
297
|
my $self = shift; |
220
|
|
|
|
|
|
|
|
221
|
64
|
50
|
|
|
|
146
|
warn "Queue::Dir::_refresh()\n" if $Debug; |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
# warn "_files ", scalar @{$self->{_files}}, " _current[0] ", |
224
|
|
|
|
|
|
|
# $self->{_current}->[0], "\n"; |
225
|
|
|
|
|
|
|
|
226
|
64
|
100
|
66
|
|
|
70
|
unless (@{$self->{_files}} or $self->{_current}->[0]) |
|
64
|
|
|
|
|
378
|
|
227
|
|
|
|
|
|
|
{ |
228
|
47
|
50
|
|
|
|
98
|
warn "Queue::Dir::_refresh() running\n" if $Debug; |
229
|
|
|
|
|
|
|
|
230
|
47
|
|
|
|
|
56
|
for my $p (@{$self->{paths}}) |
|
47
|
|
|
|
|
98
|
|
231
|
|
|
|
|
|
|
{ |
232
|
|
|
|
|
|
|
# warn "p\n"; |
233
|
48
|
|
|
|
|
222
|
$p->[1]->rewind; |
234
|
48
|
|
|
|
|
702
|
while (defined (my $f = $p->[1]->read)) |
235
|
|
|
|
|
|
|
{ |
236
|
173
|
100
|
100
|
|
|
2552
|
next if $f eq '.' or $f eq '..'; |
237
|
77
|
100
|
|
|
|
1067
|
next unless -f $p->[0] . '/' . $f; |
238
|
|
|
|
|
|
|
# warn "f\n"; |
239
|
46
|
|
|
|
|
216
|
my $t = [$f, $p->[0] . '/' . $f]; |
240
|
46
|
0
|
33
|
|
|
128
|
if (!$self->{promiscuous} |
241
|
|
|
|
|
|
|
and !$self->{filter}->($t->[1])) |
242
|
|
|
|
|
|
|
{ |
243
|
0
|
|
|
|
|
0
|
next; |
244
|
|
|
|
|
|
|
} |
245
|
46
|
|
|
|
|
52
|
push @{$self->{_files}}, $t; |
|
46
|
|
|
|
|
197
|
|
246
|
|
|
|
|
|
|
} |
247
|
|
|
|
|
|
|
} |
248
|
|
|
|
|
|
|
# XXX - I seem unable to specify the sort |
249
|
|
|
|
|
|
|
# function directly. |
250
|
47
|
|
|
|
|
515
|
my $sort = $self->{sort}; |
251
|
47
|
|
|
|
|
60
|
@{$self->{_files}} = sort $sort @{$self->{_files}}; |
|
47
|
|
|
|
|
111
|
|
|
47
|
|
|
|
|
191
|
|
252
|
|
|
|
|
|
|
# $self->{_current} = shift @{$self->{_files}} || [0,0]; |
253
|
|
|
|
|
|
|
} |
254
|
|
|
|
|
|
|
|
255
|
64
|
|
|
|
|
158
|
return $self; |
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
# Give a $qid, fetch pathname |
259
|
|
|
|
|
|
|
sub _name |
260
|
|
|
|
|
|
|
{ |
261
|
76
|
|
|
76
|
|
91
|
my $self = shift; |
262
|
76
|
|
33
|
|
|
203
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
# First, try to find this object in |
265
|
|
|
|
|
|
|
# out cached structures |
266
|
|
|
|
|
|
|
|
267
|
76
|
50
|
|
|
|
237
|
for my $t (($self->{_current}->[1] ? $self->{_current} : ()), |
|
76
|
|
|
|
|
176
|
|
268
|
|
|
|
|
|
|
@{$self->{_files}}) |
269
|
|
|
|
|
|
|
{ |
270
|
81
|
100
|
|
|
|
238
|
if ($qid eq $t->[0]) { return $t->[1]; } |
|
46
|
|
|
|
|
532
|
|
271
|
|
|
|
|
|
|
} |
272
|
|
|
|
|
|
|
|
273
|
|
|
|
|
|
|
# As a last resort, attempt to find |
274
|
|
|
|
|
|
|
# the objext in the fs |
275
|
|
|
|
|
|
|
|
276
|
30
|
|
|
|
|
39
|
for my $p (@{$self->{paths}}) |
|
30
|
|
|
|
|
59
|
|
277
|
|
|
|
|
|
|
{ |
278
|
42
|
|
|
|
|
186
|
$p->[1]->rewind; |
279
|
42
|
|
|
|
|
308
|
while (my $n = $p->[1]->read) |
280
|
|
|
|
|
|
|
{ |
281
|
131
|
100
|
|
|
|
1267
|
if ($n eq $qid) |
282
|
|
|
|
|
|
|
{ |
283
|
29
|
|
|
|
|
215
|
return $p->[0] . '/' . $n; |
284
|
|
|
|
|
|
|
} |
285
|
|
|
|
|
|
|
} |
286
|
|
|
|
|
|
|
} |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
# Otherwise, we have to fail... |
289
|
|
|
|
|
|
|
|
290
|
1
|
|
|
|
|
12
|
return; |
291
|
|
|
|
|
|
|
} |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
sub _clean_locks |
294
|
|
|
|
|
|
|
{ |
295
|
10
|
|
|
10
|
|
14
|
my $self = shift; |
296
|
|
|
|
|
|
|
|
297
|
10
|
50
|
|
|
|
97
|
return unless $self->{lockdir}; |
298
|
|
|
|
|
|
|
|
299
|
10
|
|
|
|
|
13
|
for my $p (@{$self->{paths}}) |
|
10
|
|
|
|
|
25
|
|
300
|
|
|
|
|
|
|
{ |
301
|
10
|
|
|
|
|
26
|
my $lock = $p->[0] . '/' . $self->{lockdir}; |
302
|
10
|
|
|
|
|
334
|
mkdir $lock; |
303
|
10
|
|
|
|
|
52
|
my $d = new IO::Dir $lock; |
304
|
10
|
|
|
|
|
554
|
while (my $f = $d->read) |
305
|
|
|
|
|
|
|
{ |
306
|
21
|
100
|
100
|
|
|
429
|
next if $f eq '.' or $f eq '..'; |
307
|
1
|
|
|
|
|
3
|
my $name = $lock . '/' . $f; |
308
|
1
|
50
|
|
|
|
86
|
if ((stat($name))[9] + $self->{lockmax} < time) |
309
|
|
|
|
|
|
|
{ |
310
|
0
|
|
|
|
|
0
|
unlink $name; |
311
|
|
|
|
|
|
|
} |
312
|
|
|
|
|
|
|
} |
313
|
|
|
|
|
|
|
} |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
} |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
# The test below might seem redundant, but |
318
|
|
|
|
|
|
|
# it's an attempt to improve in a lot of |
319
|
|
|
|
|
|
|
# broken NFS locking implementations. |
320
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
sub _lock |
322
|
|
|
|
|
|
|
{ |
323
|
47
|
|
|
47
|
|
72
|
my $self = shift; |
324
|
47
|
|
|
|
|
67
|
my $qid = shift; |
325
|
|
|
|
|
|
|
|
326
|
47
|
|
|
|
|
182
|
$self->{lockfh} = new IO::File; |
327
|
|
|
|
|
|
|
|
328
|
47
|
50
|
|
|
|
1308
|
warn "_lock $qid\n" if $Debug; |
329
|
|
|
|
|
|
|
|
330
|
47
|
100
|
|
|
|
186
|
return 1 unless $self->{lockdir}; |
331
|
|
|
|
|
|
|
|
332
|
15
|
|
|
|
|
91
|
$self->{lockfile} = $self->{paths}->[(split(/\./, $qid))[1]]->[0]; |
333
|
|
|
|
|
|
|
|
334
|
15
|
50
|
|
|
|
57
|
return unless $self->{lockfile}; |
335
|
|
|
|
|
|
|
|
336
|
15
|
|
|
|
|
309
|
$self->{_key} = $self->{id} . '-' . $$ . '-' . int(rand(10000)); |
337
|
15
|
|
|
|
|
52
|
$self->{lockfile} .= '/' . $self->{lockdir} . '/' . $qid; |
338
|
|
|
|
|
|
|
|
339
|
15
|
50
|
|
|
|
46
|
warn "_lock lockfile is $self->{lockfile}\n" if $Debug; |
340
|
|
|
|
|
|
|
|
341
|
15
|
100
|
|
|
|
282
|
if (-f $self->{lockfile}) |
342
|
|
|
|
|
|
|
{ |
343
|
3
|
50
|
|
|
|
30
|
if ((stat(_))[9] + $self->{lockmax} < time) |
344
|
|
|
|
|
|
|
{ |
345
|
0
|
0
|
|
|
|
0
|
warn "_lock forcing unlink (stale) lockfile\n" if $Debug; |
346
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
347
|
|
|
|
|
|
|
} |
348
|
|
|
|
|
|
|
else |
349
|
|
|
|
|
|
|
{ |
350
|
3
|
50
|
|
|
|
10
|
warn "_lock failing due to previous lock\n" if $Debug; |
351
|
3
|
|
|
|
|
25
|
return; |
352
|
|
|
|
|
|
|
} |
353
|
|
|
|
|
|
|
} |
354
|
|
|
|
|
|
|
# Store our key in the lock file |
355
|
|
|
|
|
|
|
|
356
|
12
|
50
|
|
|
|
55
|
$self->{lockfh}->open($self->{lockfile}, O_RDWR | O_CREAT) or return; |
357
|
12
|
|
|
|
|
1013
|
$self->{lockfh}->autoflush(1); |
358
|
|
|
|
|
|
|
|
359
|
12
|
50
|
|
|
|
583
|
unless (flock $self->{lockfh}, LOCK_EX | LOCK_NB) |
360
|
|
|
|
|
|
|
{ |
361
|
0
|
|
|
|
|
0
|
$self->{lockfh}->close; |
362
|
0
|
|
|
|
|
0
|
$self->{lockfh} = undef; |
363
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
364
|
0
|
|
|
|
|
0
|
$self->{lockfile} = undef; |
365
|
0
|
|
|
|
|
0
|
return; |
366
|
|
|
|
|
|
|
} |
367
|
12
|
|
|
|
|
69
|
$self->{lockfh}->print($self->{_key}); |
368
|
|
|
|
|
|
|
|
369
|
12
|
50
|
|
|
|
6501
|
warn "_lock key $self->{_key} stored\n" if $Debug; |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
# Verify that the key is indeed in the |
372
|
|
|
|
|
|
|
# lock file |
373
|
|
|
|
|
|
|
|
374
|
12
|
|
|
|
|
77
|
$self->{lockfh}->seek(0, 0); |
375
|
12
|
|
|
|
|
536
|
chomp(my $rkey = $self->{lockfh}->getline); |
376
|
|
|
|
|
|
|
|
377
|
12
|
50
|
|
|
|
543
|
warn "_lock key $rkey recovered\n" if $Debug; |
378
|
|
|
|
|
|
|
|
379
|
12
|
50
|
|
|
|
61
|
unless ($rkey eq $self->{_key}) |
380
|
|
|
|
|
|
|
{ |
381
|
0
|
|
|
|
|
0
|
$self->{lockfh}->close; |
382
|
0
|
|
|
|
|
0
|
$self->{lockfh} = undef; |
383
|
0
|
|
|
|
|
0
|
unlink $self->{lockfile}; |
384
|
0
|
|
|
|
|
0
|
$self->{lockfile} = undef; |
385
|
0
|
|
|
|
|
0
|
return; |
386
|
|
|
|
|
|
|
} |
387
|
|
|
|
|
|
|
|
388
|
12
|
50
|
|
|
|
27
|
warn "_lock key matched\n" if $Debug; |
389
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
# If all this passed, the lock is ours |
391
|
12
|
|
|
|
|
51
|
return 1; |
392
|
|
|
|
|
|
|
} |
393
|
|
|
|
|
|
|
|
394
|
|
|
|
|
|
|
=pod |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=item Cstore();> |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
Store a file in the queue. Returns an array whose first element is an |
399
|
|
|
|
|
|
|
C object for writing to the file. The second element is |
400
|
|
|
|
|
|
|
the identifier of the object in the queue. |
401
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
If you created the C object with locking enabled, you must |
403
|
|
|
|
|
|
|
call C<-Eunlock> after closing the file handle. |
404
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
=cut |
406
|
|
|
|
|
|
|
|
407
|
|
|
|
|
|
|
sub store |
408
|
|
|
|
|
|
|
{ |
409
|
28
|
|
|
28
|
1
|
10770
|
my $self = shift; |
410
|
28
|
|
|
|
|
158
|
my $fh = new IO::File; |
411
|
28
|
|
|
|
|
1233
|
my $queue = $self->{paths}->[$self->{_rr}]; |
412
|
28
|
|
|
|
|
64
|
my $qid = _timestamp . '.' . $self->{_rr} . '.' . $self->{id}; |
413
|
28
|
|
|
|
|
56
|
my $counter = 0; |
414
|
28
|
|
|
|
|
33
|
my $pname; |
415
|
|
|
|
|
|
|
|
416
|
28
|
50
|
|
|
|
88
|
warn "Queue::Dir::store() qid=$qid\n" if $Debug; |
417
|
|
|
|
|
|
|
|
418
|
28
|
|
|
|
|
54
|
$self->{_rr} ++; |
419
|
28
|
|
|
|
|
38
|
$self->{_rr} %= @{$self->{paths}}; |
|
28
|
|
|
|
|
68
|
|
420
|
|
|
|
|
|
|
|
421
|
28
|
|
|
|
|
614
|
while (-f ($pname = $queue->[0] . '/' . $qid . '.' . $counter)) |
422
|
|
|
|
|
|
|
{ |
423
|
0
|
|
|
|
|
0
|
++ $counter; |
424
|
|
|
|
|
|
|
} |
425
|
|
|
|
|
|
|
|
426
|
28
|
|
|
|
|
60
|
$qid .= '.' . $counter; |
427
|
|
|
|
|
|
|
|
428
|
28
|
50
|
|
|
|
121
|
$fh->open($pname, "w") or return; |
429
|
28
|
|
|
|
|
3082
|
$self->{_current} = [$qid, $pname]; |
430
|
|
|
|
|
|
|
|
431
|
28
|
|
|
|
|
105
|
$self->_lock($qid); |
432
|
|
|
|
|
|
|
|
433
|
28
|
|
|
|
|
101
|
return ($fh, $qid); |
434
|
|
|
|
|
|
|
} |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
=pod |
437
|
|
|
|
|
|
|
|
438
|
|
|
|
|
|
|
=item Cnext();> |
439
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
Returns the queue identifier of the next file to be processed. When |
441
|
|
|
|
|
|
|
the queue is empty, returns undef. |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
Note that if multiple consumers are working on the same queues in |
444
|
|
|
|
|
|
|
promiscuous mode, the file referenced by the returned id might be |
445
|
|
|
|
|
|
|
removed at any time so care must be used. |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
Entries will be returned in an arbitrary order. |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub next |
452
|
|
|
|
|
|
|
{ |
453
|
46
|
|
|
46
|
1
|
8165
|
my $self = shift; |
454
|
|
|
|
|
|
|
|
455
|
46
|
100
|
|
|
|
56
|
$self->_refresh unless @{$self->{_files}}; |
|
46
|
|
|
|
|
169
|
|
456
|
|
|
|
|
|
|
|
457
|
46
|
|
100
|
|
|
56
|
$self->{_current} = shift @{$self->{_files}} || [0, 0]; |
458
|
|
|
|
|
|
|
|
459
|
46
|
50
|
|
|
|
132
|
warn "Queue::Dir::next() current=", $self->{_current}->[0], "\n" if $Debug; |
460
|
|
|
|
|
|
|
|
461
|
|
|
|
|
|
|
# warn "next: Current queue has\n"; |
462
|
|
|
|
|
|
|
# foreach (@{$self->{_files}}) |
463
|
|
|
|
|
|
|
# { |
464
|
|
|
|
|
|
|
# warn " $_->[1]\n"; |
465
|
|
|
|
|
|
|
# } |
466
|
|
|
|
|
|
|
|
467
|
46
|
100
|
|
|
|
114
|
unless ($self->{_current}->[0]) |
468
|
|
|
|
|
|
|
{ |
469
|
21
|
|
|
|
|
43
|
$self->_refresh; |
470
|
21
|
|
|
|
|
70
|
return; |
471
|
|
|
|
|
|
|
} |
472
|
|
|
|
|
|
|
|
473
|
25
|
|
|
|
|
94
|
return $self->{_current}->[0]; |
474
|
|
|
|
|
|
|
} |
475
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
=pod |
477
|
|
|
|
|
|
|
|
478
|
|
|
|
|
|
|
=item Cvisit($mode, $qid);> |
479
|
|
|
|
|
|
|
|
480
|
|
|
|
|
|
|
On success, returns an C object, opened according to the |
481
|
|
|
|
|
|
|
specified C<$mode> for the file with C<$qid>. If C<$mode> is not |
482
|
|
|
|
|
|
|
specified, it defaults to a read from the start of the file. If |
483
|
|
|
|
|
|
|
C<$qid> is not specified, it defaults to the next entry, as if |
484
|
|
|
|
|
|
|
C<-Enext()> were called. In order for the file to be eligible, |
485
|
|
|
|
|
|
|
either the C object is not created with locking enabled or |
486
|
|
|
|
|
|
|
the file in the queue is not locked. |
487
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
It can fail in a number of situations. The obvious one, is when the |
489
|
|
|
|
|
|
|
queue is empty. The second one, happens when the desired file is no |
490
|
|
|
|
|
|
|
longer in the queue, which can happen if multiple consumers are |
491
|
|
|
|
|
|
|
accessing the queue in promiscuous mode. |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
To help disambiguate both scenarios, undef will be returned on an |
494
|
|
|
|
|
|
|
empty queue. A defined but false value will be returned when the |
495
|
|
|
|
|
|
|
desired file is missing but others remain in the queue. |
496
|
|
|
|
|
|
|
|
497
|
|
|
|
|
|
|
The object in the queue will be automatically locked if this option is |
498
|
|
|
|
|
|
|
enabled when C<-Enew> was called. In this case, you should call |
499
|
|
|
|
|
|
|
the C<-Eunlock> method. |
500
|
|
|
|
|
|
|
|
501
|
|
|
|
|
|
|
=cut |
502
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
sub visit |
504
|
|
|
|
|
|
|
{ |
505
|
20
|
|
|
20
|
1
|
3385
|
my $self = shift; |
506
|
20
|
|
100
|
|
|
101
|
my $mode = shift || "r"; |
507
|
20
|
|
100
|
|
|
139
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
508
|
|
|
|
|
|
|
|
509
|
20
|
50
|
|
|
|
50
|
warn "Queue::Dir::visit() qid=$qid\n" if $Debug; |
510
|
|
|
|
|
|
|
|
511
|
20
|
100
|
|
|
|
57
|
return unless $qid; |
512
|
|
|
|
|
|
|
|
513
|
19
|
|
|
|
|
85
|
my $fh = new IO::File; |
514
|
19
|
|
|
|
|
492
|
my $name; |
515
|
|
|
|
|
|
|
|
516
|
19
|
|
33
|
|
|
58
|
until ($name = $self->_name($qid) |
|
|
|
66
|
|
|
|
|
|
|
|
66
|
|
|
|
|
517
|
|
|
|
|
|
|
and -f $name |
518
|
|
|
|
|
|
|
and $self->_lock($qid) |
519
|
|
|
|
|
|
|
and $fh->open($name, $mode)) |
520
|
|
|
|
|
|
|
{ |
521
|
3
|
50
|
|
|
|
16
|
unless ($qid = $self->next) |
522
|
|
|
|
|
|
|
{ |
523
|
3
|
50
|
|
|
|
6
|
if (@{$self->{_files}}) |
|
3
|
|
|
|
|
10
|
|
524
|
|
|
|
|
|
|
{ |
525
|
3
|
50
|
|
|
|
25
|
warn "Queue::Dir::visit() ret undef\n" if $Debug; |
526
|
3
|
|
|
|
|
16
|
return undef; |
527
|
|
|
|
|
|
|
} |
528
|
|
|
|
|
|
|
else |
529
|
|
|
|
|
|
|
{ |
530
|
0
|
0
|
|
|
|
0
|
warn "Queue::Dir::visit() ret 0\n" if $Debug; |
531
|
0
|
|
|
|
|
0
|
return 0; |
532
|
|
|
|
|
|
|
} |
533
|
|
|
|
|
|
|
} |
534
|
|
|
|
|
|
|
} |
535
|
|
|
|
|
|
|
|
536
|
16
|
|
|
|
|
921
|
return $fh; |
537
|
|
|
|
|
|
|
} |
538
|
|
|
|
|
|
|
|
539
|
|
|
|
|
|
|
=pod |
540
|
|
|
|
|
|
|
|
541
|
|
|
|
|
|
|
=item C<$q-Edone($qid);> |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
Disposes the queue file whose C<$qid> matches the given identifier as |
544
|
|
|
|
|
|
|
well as its potential lock. If none is specified, defaults to the last |
545
|
|
|
|
|
|
|
one used in a C<-Evisit()>. |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
It is a bad idea (or at least rough manners) to C the file |
548
|
|
|
|
|
|
|
without invoking C<-Edone>. Besides, C<-Edone> will do it for |
549
|
|
|
|
|
|
|
you. |
550
|
|
|
|
|
|
|
|
551
|
|
|
|
|
|
|
=cut |
552
|
|
|
|
|
|
|
|
553
|
|
|
|
|
|
|
sub done |
554
|
|
|
|
|
|
|
{ |
555
|
27
|
|
|
27
|
1
|
2903
|
my $self = shift; |
556
|
27
|
|
66
|
|
|
91
|
my $qid = shift || $self->{_current}->[0]; |
557
|
27
|
|
|
|
|
37
|
my $wipe = 0; |
558
|
|
|
|
|
|
|
|
559
|
27
|
50
|
|
|
|
65
|
warn "Queue::Dir::done() qid=$qid\n" if $Debug; |
560
|
|
|
|
|
|
|
|
561
|
27
|
50
|
|
|
|
90
|
return if $qid eq 0; |
562
|
|
|
|
|
|
|
|
563
|
27
|
|
|
|
|
69
|
my $name = $self->_name($qid); |
564
|
|
|
|
|
|
|
|
565
|
27
|
100
|
|
|
|
232
|
return unless $name; |
566
|
|
|
|
|
|
|
|
567
|
26
|
|
|
|
|
83
|
$self->unlock($qid); |
568
|
|
|
|
|
|
|
|
569
|
26
|
|
|
|
|
1950
|
unlink $name; |
570
|
|
|
|
|
|
|
|
571
|
26
|
|
|
|
|
58
|
for (my $i = 0; |
|
31
|
|
|
|
|
293
|
|
572
|
|
|
|
|
|
|
$i < @{$self->{_files}}; |
573
|
|
|
|
|
|
|
$i ++) |
574
|
|
|
|
|
|
|
{ |
575
|
7
|
100
|
|
|
|
28
|
if ($self->{_files}->[$i]->[0] eq $qid) |
576
|
|
|
|
|
|
|
{ |
577
|
2
|
|
|
|
|
3
|
splice(@{$self->{_files}}, $i, 1); |
|
2
|
|
|
|
|
7
|
|
578
|
2
|
|
|
|
|
7
|
return; |
579
|
|
|
|
|
|
|
} |
580
|
|
|
|
|
|
|
} |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
} |
583
|
|
|
|
|
|
|
|
584
|
|
|
|
|
|
|
=pod |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
=item Cname($qid);> |
587
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
Returns the full pathname of the queue file whose id matches |
589
|
|
|
|
|
|
|
C<$qid>. If none is supplied, defaults to the last one obtained |
590
|
|
|
|
|
|
|
through a C<-Estore()>, C<-Enext()> or C<-Evisit()>. |
591
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
It could return C is the queue object no longer exists. |
593
|
|
|
|
|
|
|
|
594
|
|
|
|
|
|
|
=cut |
595
|
|
|
|
|
|
|
|
596
|
|
|
|
|
|
|
sub name |
597
|
|
|
|
|
|
|
{ |
598
|
30
|
|
|
30
|
1
|
10513
|
my $self = shift; |
599
|
30
|
|
33
|
|
|
79
|
my $qid = shift || $self->{_current}->[0] || $self->next; |
600
|
30
|
50
|
|
|
|
51
|
warn "Queue::Dir::name() qid=$qid\n" if $Debug; |
601
|
30
|
|
|
|
|
51
|
return $self->_name($qid); |
602
|
|
|
|
|
|
|
} |
603
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
=pod |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
=item C<-Eunlock($qid)> |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
Removes any locks outstanding in the file identified by C<$qid>, or |
609
|
|
|
|
|
|
|
the last Ced file. Use of this method is only required if the |
610
|
|
|
|
|
|
|
object is created with locking enabled. |
611
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
=cut |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
sub unlock |
615
|
|
|
|
|
|
|
{ |
616
|
33
|
|
|
33
|
1
|
7261
|
my $self = shift; |
617
|
33
|
|
66
|
|
|
106
|
my $qid = shift || $self->{_current}->[0]; |
618
|
33
|
|
|
|
|
146
|
my $fh = new IO::File; |
619
|
|
|
|
|
|
|
|
620
|
33
|
50
|
|
|
|
889
|
warn "unlock $qid\n" if $Debug; |
621
|
|
|
|
|
|
|
|
622
|
33
|
100
|
|
|
|
124
|
return 1 unless $self->{lockdir}; |
623
|
14
|
100
|
|
|
|
49
|
return 1 unless $self->{lockfh}; |
624
|
|
|
|
|
|
|
|
625
|
9
|
|
|
|
|
121
|
close $self->{lockfh}; |
626
|
9
|
|
|
|
|
19
|
$self->{lockfh} = undef; |
627
|
|
|
|
|
|
|
|
628
|
9
|
|
|
|
|
863
|
unlink $self->{lockfile}; |
629
|
9
|
|
|
|
|
25
|
$self->{lockfile} = undef; |
630
|
|
|
|
|
|
|
|
631
|
9
|
|
|
|
|
35
|
return 1; |
632
|
|
|
|
|
|
|
} |
633
|
|
|
|
|
|
|
|
634
|
|
|
|
|
|
|
1; |
635
|
|
|
|
|
|
|
__END__ |