| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
=head1 NAME |
|
2
|
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
IPC::DirQueue - disk-based many-to-many task queue |
|
4
|
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
6
|
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" }); |
|
8
|
|
|
|
|
|
|
$dq->enqueue_file("filename"); |
|
9
|
|
|
|
|
|
|
|
|
10
|
|
|
|
|
|
|
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" }); |
|
11
|
|
|
|
|
|
|
my $job = $dq->pickup_queued_job(); |
|
12
|
|
|
|
|
|
|
if (!$job) { print "no jobs left\n"; exit; } |
|
13
|
|
|
|
|
|
|
# ...do something interesting with $job->get_data_path() ... |
|
14
|
|
|
|
|
|
|
$job->finish(); |
|
15
|
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
17
|
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
This module implements a FIFO queueing infrastructure, using a directory |
|
19
|
|
|
|
|
|
|
as the communications and storage media. No daemon process is required to |
|
20
|
|
|
|
|
|
|
manage the queue; all communication takes place via the filesystem. |
|
21
|
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
A common UNIX system design pattern is to use a tool like C as a task |
|
23
|
|
|
|
|
|
|
queueing system; for example, |
|
24
|
|
|
|
|
|
|
C describes the |
|
25
|
|
|
|
|
|
|
use of C as an MP3 jukebox. |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
However, C isn't as efficient as it could be. When used in this way, you |
|
28
|
|
|
|
|
|
|
have to restart each task processor for every new task. If you have a lot of |
|
29
|
|
|
|
|
|
|
startup overhead, this can be very inefficient. With C, a |
|
30
|
|
|
|
|
|
|
processing server can run persistently and cache data needed across multiple |
|
31
|
|
|
|
|
|
|
tasks efficiently; it will not be restarted unless you restart it. |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
Multiple enqueueing and dequeueing processes on multiple hosts (NFS-safe |
|
34
|
|
|
|
|
|
|
locking is used) can run simultaneously, and safely, on the same queue. |
|
35
|
|
|
|
|
|
|
|
|
36
|
|
|
|
|
|
|
Since multiple dequeuers can run simultaneously, this provides a good way |
|
37
|
|
|
|
|
|
|
to process a variable level of incoming tasks using a pre-defined number |
|
38
|
|
|
|
|
|
|
of worker processes. |
|
39
|
|
|
|
|
|
|
|
|
40
|
|
|
|
|
|
|
If you need more CPU power working on a queue, you can simply start |
|
41
|
|
|
|
|
|
|
another dequeuer to help out. If you need less, kill off a few dequeuers. |
|
42
|
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
If you need to take down the server to perform some maintainance or |
|
44
|
|
|
|
|
|
|
upgrades, just kill the dequeuer processes, perform the work, and start up |
|
45
|
|
|
|
|
|
|
new ones. Since there's no 'socket' or similar point of failure aside from |
|
46
|
|
|
|
|
|
|
the directory itself, the queue will just quietly fill with waiting jobs |
|
47
|
|
|
|
|
|
|
until the new dequeuer is ready. |
|
48
|
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
Arbitrary 'name = value' string-pair metadata can be transferred alongside data |
|
50
|
|
|
|
|
|
|
files. In fact, in some cases, you may find it easier to send unused and |
|
51
|
|
|
|
|
|
|
empty data files, and just use the 'metadata' fields to transfer the details of |
|
52
|
|
|
|
|
|
|
what will be worked on. |
|
53
|
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
=head1 METHODS |
|
55
|
|
|
|
|
|
|
|
|
56
|
|
|
|
|
|
|
=over 4 |
|
57
|
|
|
|
|
|
|
|
|
58
|
|
|
|
|
|
|
=cut |
|
59
|
|
|
|
|
|
|
|
|
60
|
|
|
|
|
|
|
package IPC::DirQueue; |
|
61
|
23
|
|
|
23
|
|
302052
|
use strict; |
|
|
23
|
|
|
|
|
64
|
|
|
|
23
|
|
|
|
|
4954
|
|
|
62
|
23
|
|
|
23
|
|
37848
|
use bytes; |
|
|
23
|
|
|
|
|
326
|
|
|
|
23
|
|
|
|
|
126
|
|
|
63
|
23
|
|
|
23
|
|
74674
|
use Time::HiRes qw(); |
|
|
23
|
|
|
|
|
598363
|
|
|
|
23
|
|
|
|
|
893
|
|
|
64
|
23
|
|
|
23
|
|
195
|
use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_RDONLY); |
|
|
23
|
|
|
|
|
51
|
|
|
|
23
|
|
|
|
|
7653
|
|
|
65
|
23
|
|
|
23
|
|
22360
|
use IPC::DirQueue::Job; |
|
|
23
|
|
|
|
|
65
|
|
|
|
23
|
|
|
|
|
642
|
|
|
66
|
23
|
|
|
23
|
|
15457
|
use IPC::DirQueue::IndexClient; |
|
|
23
|
|
|
|
|
82
|
|
|
|
23
|
|
|
|
|
876
|
|
|
67
|
23
|
|
|
23
|
|
268
|
use Errno qw(EEXIST); |
|
|
23
|
|
|
|
|
55
|
|
|
|
23
|
|
|
|
|
6299
|
|
|
68
|
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
our @ISA = (); |
|
70
|
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
our $VERSION = "1.0"; |
|
72
|
|
|
|
|
|
|
|
|
73
|
23
|
|
|
23
|
|
141
|
use constant SLASH => '/'; |
|
|
23
|
|
|
|
|
44
|
|
|
|
23
|
|
|
|
|
234657
|
|
|
74
|
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
# our $DEBUG = 1; |
|
76
|
|
|
|
|
|
|
our $DEBUG; # = 1; |
|
77
|
|
|
|
|
|
|
|
|
78
|
|
|
|
|
|
|
########################################################################### |
|
79
|
|
|
|
|
|
|
|
|
80
|
|
|
|
|
|
|
=item $dq->new ($opts); |
|
81
|
|
|
|
|
|
|
|
|
82
|
|
|
|
|
|
|
Create a new queue object, suitable for either enqueueing jobs |
|
83
|
|
|
|
|
|
|
or picking up already-queued jobs for processing. |
|
84
|
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
C<$opts> is a reference to a hash, which may contain the following |
|
86
|
|
|
|
|
|
|
options: |
|
87
|
|
|
|
|
|
|
|
|
88
|
|
|
|
|
|
|
=over 4 |
|
89
|
|
|
|
|
|
|
|
|
90
|
|
|
|
|
|
|
=item dir => $path_to_directory (no default) |
|
91
|
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
Name the directory where the queue files are stored. This is required. |
|
93
|
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
=item data_file_mode => $mode (default: 0666) |
|
95
|
|
|
|
|
|
|
|
|
96
|
|
|
|
|
|
|
The C-style file mode for data files. This should be specified |
|
97
|
|
|
|
|
|
|
as a string with a leading 0. It will be affected by the current |
|
98
|
|
|
|
|
|
|
process C. |
|
99
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
=item queue_file_mode => $mode (default: 0666) |
|
101
|
|
|
|
|
|
|
|
|
102
|
|
|
|
|
|
|
The C-style file mode for queue control files. This should be |
|
103
|
|
|
|
|
|
|
specified as a string with a leading 0. It will be affected by the |
|
104
|
|
|
|
|
|
|
current process C. |
|
105
|
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item ordered => { 0 | 1 } (default: 1) |
|
107
|
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
Whether the jobs should be processed in order of submission, or |
|
109
|
|
|
|
|
|
|
in no particular order. |
|
110
|
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
=item queue_fanout => { 0 | 1 } (default: 0) |
|
112
|
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
Whether the queue directory should be 'fanned out'. This allows better |
|
114
|
|
|
|
|
|
|
scalability with NFS-shared queues with large numbers of pending files, but |
|
115
|
|
|
|
|
|
|
hurts performance otherwise. It also implies B = 0. (This is |
|
116
|
|
|
|
|
|
|
strictly experimental, has overall poor performance, and is not recommended.) |
|
117
|
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item indexd_uri => $uri (default: undef) |
|
119
|
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
A URI of a C daemon, used to maintain the list of waiting jobs. The |
|
121
|
|
|
|
|
|
|
URI must be of the form C . (This is strictly |
|
122
|
|
|
|
|
|
|
experimental, and is not recommended.) |
|
123
|
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
=item buf_size => $number (default: 65536) |
|
125
|
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
The buffer size to use when copying files, in bytes. |
|
127
|
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
=item active_file_lifetime => $number (default: 600) |
|
129
|
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
The lifetime of an untouched active lockfile, in seconds. See 'STALE LOCKS AND |
|
131
|
|
|
|
|
|
|
SIGNAL HANDLING', below, for more details. |
|
132
|
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
=back |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
=cut |
|
136
|
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
sub new { |
|
138
|
20
|
|
|
20
|
1
|
629666
|
my $class = shift; |
|
139
|
20
|
|
|
|
|
132
|
my $opts = shift; |
|
140
|
20
|
|
50
|
|
|
242
|
$opts ||= { }; |
|
141
|
20
|
|
33
|
|
|
311
|
$class = ref($class) || $class; |
|
142
|
20
|
|
|
|
|
70
|
my $self = $opts; |
|
143
|
20
|
|
|
|
|
94
|
bless ($self, $class); |
|
144
|
|
|
|
|
|
|
|
|
145
|
20
|
50
|
|
|
|
453
|
die "no 'dir' specified" unless $self->{dir}; |
|
146
|
20
|
|
50
|
|
|
1256
|
$self->{data_file_mode} ||= '0666'; |
|
147
|
20
|
|
|
|
|
106
|
$self->{data_file_mode} = oct ($self->{data_file_mode}); |
|
148
|
20
|
|
50
|
|
|
181
|
$self->{queue_file_mode} ||= '0666'; |
|
149
|
20
|
|
|
|
|
65
|
$self->{queue_file_mode} = oct ($self->{queue_file_mode}); |
|
150
|
|
|
|
|
|
|
|
|
151
|
20
|
100
|
|
|
|
161
|
if ($self->{queue_fanout}) { |
|
|
|
100
|
|
|
|
|
|
|
152
|
2
|
|
|
|
|
4
|
$self->{queue_fanout} = 1; |
|
153
|
2
|
|
|
|
|
5
|
$self->{ordered} = 0; # fanout wins |
|
154
|
|
|
|
|
|
|
} |
|
155
|
|
|
|
|
|
|
elsif (!defined $self->{ordered}) { |
|
156
|
10
|
|
|
|
|
27
|
$self->{ordered} = 1; |
|
157
|
|
|
|
|
|
|
} |
|
158
|
|
|
|
|
|
|
|
|
159
|
20
|
|
50
|
|
|
159
|
$self->{buf_size} ||= 65536; |
|
160
|
20
|
|
50
|
|
|
168
|
$self->{active_file_lifetime} ||= 600; |
|
161
|
|
|
|
|
|
|
|
|
162
|
20
|
|
|
|
|
63
|
$self->{ensured_dir_exists} = { }; |
|
163
|
20
|
|
|
|
|
172
|
$self->ensure_dir_exists ($self->{dir}); |
|
164
|
|
|
|
|
|
|
|
|
165
|
20
|
50
|
|
|
|
76
|
if ($self->{indexd_uri}) { |
|
166
|
0
|
|
|
|
|
0
|
$self->{indexclient} = IPC::DirQueue::IndexClient->new({ |
|
167
|
|
|
|
|
|
|
uri => $self->{indexd_uri} |
|
168
|
|
|
|
|
|
|
}); |
|
169
|
|
|
|
|
|
|
} |
|
170
|
|
|
|
|
|
|
|
|
171
|
20
|
|
|
|
|
102
|
$self; |
|
172
|
|
|
|
|
|
|
} |
|
173
|
|
|
|
|
|
|
|
|
174
|
|
|
|
|
|
|
sub dbg; |
|
175
|
|
|
|
|
|
|
|
|
176
|
|
|
|
|
|
|
########################################################################### |
|
177
|
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
=item $dq->enqueue_file ($filename [, $metadata [, $pri] ] ); |
|
179
|
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
|
181
|
|
|
|
|
|
|
C on failure. |
|
182
|
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
C<$filename> is the path to the file to be enqueued. Its contents |
|
184
|
|
|
|
|
|
|
will be read, and will be used as the contents of the data file available |
|
185
|
|
|
|
|
|
|
to dequeuers using C. |
|
186
|
|
|
|
|
|
|
|
|
187
|
|
|
|
|
|
|
C<$metadata> is an optional hash reference; every item of metadata will be |
|
188
|
|
|
|
|
|
|
available to worker processes on the C object, in the |
|
189
|
|
|
|
|
|
|
C<$job-E{metadata}> hashref. Note that using this channel for metadata |
|
190
|
|
|
|
|
|
|
brings with it several restrictions: |
|
191
|
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
=over 4 |
|
193
|
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
=item 1. it requires that the metadata be stored as 'name' => 'value' string pairs |
|
195
|
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
=item 2. neither 'name' nor 'value' may contain newline (\n) or NUL (\0) characters |
|
197
|
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
=item 3. 'name' cannot contain colon (:) characters |
|
199
|
|
|
|
|
|
|
|
|
200
|
|
|
|
|
|
|
=item 4. 'name' cannot start with a capital letter 'Q' and be 4 characters in length |
|
201
|
|
|
|
|
|
|
|
|
202
|
|
|
|
|
|
|
=back |
|
203
|
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
If those restrictions are broken, die() will be called with the following |
|
205
|
|
|
|
|
|
|
error: |
|
206
|
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
die "IPC::DirQueue: invalid metadatum: '$k'"; |
|
208
|
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
This is a change added in release 0.06; prior to that, that metadatum would be |
|
210
|
|
|
|
|
|
|
silently dropped. |
|
211
|
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
An optional priority can be specified; lower priorities are run first. |
|
213
|
|
|
|
|
|
|
Priorities range from 0 to 99, and 50 is default. |
|
214
|
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=cut |
|
216
|
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
sub enqueue_file { |
|
218
|
0
|
|
|
0
|
1
|
0
|
my ($self, $file, $metadata, $pri) = @_; |
|
219
|
0
|
0
|
|
|
|
0
|
if (!open (IN, "<$file")) { |
|
220
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $file for read: $!"; |
|
221
|
0
|
|
|
|
|
0
|
return; |
|
222
|
|
|
|
|
|
|
} |
|
223
|
0
|
|
|
|
|
0
|
my $ret = $self->_enqueue_backend ($metadata, $pri, \*IN); |
|
224
|
0
|
|
|
|
|
0
|
close IN; |
|
225
|
0
|
|
|
|
|
0
|
return $ret; |
|
226
|
|
|
|
|
|
|
} |
|
227
|
|
|
|
|
|
|
|
|
228
|
|
|
|
|
|
|
=item $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] ); |
|
229
|
|
|
|
|
|
|
|
|
230
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
|
231
|
|
|
|
|
|
|
C on failure. C<$pri> and C<$metadata> are as described in |
|
232
|
|
|
|
|
|
|
C<$dq-Eenqueue_file()>. |
|
233
|
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
C<$filehandle> is a perl file handle that must be open for reading. It will be |
|
235
|
|
|
|
|
|
|
closed on completion, regardless of success or failure. Its contents will be |
|
236
|
|
|
|
|
|
|
read, and will be used as the contents of the data file available to dequeuers |
|
237
|
|
|
|
|
|
|
using C. |
|
238
|
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
=cut |
|
240
|
|
|
|
|
|
|
|
|
241
|
|
|
|
|
|
|
sub enqueue_fh { |
|
242
|
10
|
|
|
10
|
1
|
482
|
my ($self, $fhin, $metadata, $pri) = @_; |
|
243
|
10
|
|
|
|
|
25
|
my $ret = $self->_enqueue_backend ($metadata, $pri, $fhin); |
|
244
|
10
|
|
|
|
|
15
|
close $fhin; |
|
245
|
10
|
|
|
|
|
54
|
return $ret; |
|
246
|
|
|
|
|
|
|
} |
|
247
|
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
=item $dq->enqueue_string ($string [, $metadata [, $pri] ] ); |
|
249
|
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
Enqueue a new job for processing. The job data is entirely read from |
|
251
|
|
|
|
|
|
|
C<$string>. Returns C<1> if the job was enqueued, or C on failure. |
|
252
|
|
|
|
|
|
|
C<$pri> and C<$metadata> are as described in C<$dq-Eenqueue_file()>. |
|
253
|
|
|
|
|
|
|
|
|
254
|
|
|
|
|
|
|
=cut |
|
255
|
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
sub enqueue_string { |
|
257
|
1010
|
|
|
1010
|
1
|
204985688
|
my ($self, $string, $metadata, $pri) = @_; |
|
258
|
1010
|
|
|
|
|
3744
|
my $enqd_already = 0; |
|
259
|
|
|
|
|
|
|
return $self->_enqueue_backend ($metadata, $pri, undef, |
|
260
|
|
|
|
|
|
|
sub { |
|
261
|
2020
|
100
|
|
2020
|
|
8181
|
return if $enqd_already++; |
|
262
|
1010
|
|
|
|
|
2839
|
return $string; |
|
263
|
1010
|
|
|
|
|
20278
|
}); |
|
264
|
|
|
|
|
|
|
} |
|
265
|
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=item $dq->enqueue_sub ($subref [, $metadata [, $pri] ] ); |
|
267
|
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
Enqueue a new job for processing. Returns C<1> if the job was enqueued, or |
|
269
|
|
|
|
|
|
|
C on failure. C<$pri> and C<$metadata> are as described in |
|
270
|
|
|
|
|
|
|
C<$dq-Eenqueue_file()>. |
|
271
|
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
C<$subref> is a perl subroutine, which is expected to return one of the |
|
273
|
|
|
|
|
|
|
following each time it is called: |
|
274
|
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
- a string of data bytes to be appended to any existing data. (the |
|
276
|
|
|
|
|
|
|
string may be empty, C<''>, in which case it's a no-op.) |
|
277
|
|
|
|
|
|
|
|
|
278
|
|
|
|
|
|
|
- C when the enqueued data has ended, ie. EOF. |
|
279
|
|
|
|
|
|
|
|
|
280
|
|
|
|
|
|
|
- C if an error occurs. The C message will be converted into |
|
281
|
|
|
|
|
|
|
a warning, and the C call will return C. |
|
282
|
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
(Tip: note that this is a closure, so variables outside the subroutine can be |
|
284
|
|
|
|
|
|
|
accessed safely.) |
|
285
|
|
|
|
|
|
|
|
|
286
|
|
|
|
|
|
|
=cut |
|
287
|
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
sub enqueue_sub { |
|
289
|
100
|
|
|
100
|
1
|
13974
|
my ($self, $subref, $metadata, $pri) = @_; |
|
290
|
100
|
|
|
|
|
232
|
return $self->_enqueue_backend ($metadata, $pri, undef, $subref); |
|
291
|
|
|
|
|
|
|
} |
|
292
|
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
# private implementation. |
|
294
|
|
|
|
|
|
|
sub _enqueue_backend { |
|
295
|
1120
|
|
|
1120
|
|
3503
|
my ($self, $metadata, $pri, $fhin, $callbackin) = @_; |
|
296
|
|
|
|
|
|
|
|
|
297
|
1120
|
50
|
|
|
|
20136
|
if (!defined($pri)) { $pri = 50; } |
|
|
1120
|
|
|
|
|
2561
|
|
|
298
|
1120
|
50
|
33
|
|
|
10309
|
if ($pri < 0 || $pri > 99) { |
|
299
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: bad priority $pri is > 99 or < 0"; |
|
300
|
0
|
|
|
|
|
0
|
return; |
|
301
|
|
|
|
|
|
|
} |
|
302
|
|
|
|
|
|
|
|
|
303
|
1120
|
|
|
|
|
22877
|
my ($now, $nowmsecs) = Time::HiRes::gettimeofday; |
|
304
|
|
|
|
|
|
|
|
|
305
|
1120
|
|
|
|
|
9600
|
my $job = { |
|
306
|
|
|
|
|
|
|
pri => $pri, |
|
307
|
|
|
|
|
|
|
metadata => $metadata, |
|
308
|
|
|
|
|
|
|
time_submitted_secs => $now, |
|
309
|
|
|
|
|
|
|
time_submitted_msecs => $nowmsecs |
|
310
|
|
|
|
|
|
|
}; |
|
311
|
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
# NOTE: this can change until the moment we've renamed the ctrl file |
|
313
|
|
|
|
|
|
|
# into 'queue'! |
|
314
|
1120
|
|
|
|
|
7510
|
my $qfnametmp = $self->new_q_filename($job); |
|
315
|
1120
|
|
|
|
|
2879
|
my $qcnametmp = $qfnametmp; |
|
316
|
|
|
|
|
|
|
|
|
317
|
1120
|
|
|
|
|
5805
|
my $pathtmp = $self->q_subdir('tmp'); |
|
318
|
1120
|
|
|
|
|
13796
|
$self->ensure_dir_exists ($pathtmp); |
|
319
|
|
|
|
|
|
|
|
|
320
|
1120
|
|
|
|
|
3900
|
my $pathtmpctrl = $pathtmp.SLASH.$qfnametmp.".ctrl"; |
|
321
|
1120
|
|
|
|
|
3207
|
my $pathtmpdata = $pathtmp.SLASH.$qfnametmp.".data"; |
|
322
|
|
|
|
|
|
|
|
|
323
|
1120
|
50
|
|
|
|
3366461
|
if (!sysopen (OUT, $pathtmpdata, O_WRONLY|O_CREAT|O_EXCL, |
|
324
|
|
|
|
|
|
|
$self->{data_file_mode})) |
|
325
|
|
|
|
|
|
|
{ |
|
326
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpdata for write: $!"; |
|
327
|
0
|
|
|
|
|
0
|
return; |
|
328
|
|
|
|
|
|
|
} |
|
329
|
1120
|
|
|
|
|
10041
|
my $pathtmpdata_created = 1; |
|
330
|
|
|
|
|
|
|
|
|
331
|
1120
|
|
|
|
|
2661
|
my $siz; |
|
332
|
1120
|
|
|
|
|
12582
|
eval { |
|
333
|
1120
|
|
|
|
|
7088
|
$siz = $self->copy_in_to_out_fh ($fhin, $callbackin, |
|
334
|
|
|
|
|
|
|
\*OUT, $pathtmpdata); |
|
335
|
|
|
|
|
|
|
}; |
|
336
|
1120
|
50
|
|
|
|
3440
|
if ($@) { |
|
337
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: enqueue failed: $@"; |
|
338
|
|
|
|
|
|
|
} |
|
339
|
1120
|
50
|
|
|
|
24436
|
if (!defined $siz) { |
|
340
|
0
|
|
|
|
|
0
|
goto failure; |
|
341
|
|
|
|
|
|
|
} |
|
342
|
1120
|
|
|
|
|
3680
|
$job->{size_bytes} = $siz; |
|
343
|
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# get the data dir |
|
345
|
1120
|
|
|
|
|
5640
|
my $pathdatadir = $self->q_subdir('data'); |
|
346
|
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
# hashing the data dir, using 2 levels of directory hashing. This has a tiny |
|
348
|
|
|
|
|
|
|
# effect on speed in all cases up to 10k queued files, but has good results |
|
349
|
|
|
|
|
|
|
# in terms of the usability of those dirs for users doing direct access, so |
|
350
|
|
|
|
|
|
|
# enabled by default. |
|
351
|
1120
|
|
|
|
|
2613
|
if (1) { |
|
352
|
|
|
|
|
|
|
# take the last two chars for the hashname. In most cases, this will |
|
353
|
|
|
|
|
|
|
# be the last 2 chars of a hash of (hostname, pid), so effectively |
|
354
|
|
|
|
|
|
|
# random. Remove it from the filename entirely, since it's redundant |
|
355
|
|
|
|
|
|
|
# to have it both in the dir name and the filename. |
|
356
|
1120
|
|
|
|
|
11645
|
$qfnametmp =~ s/([A-Za-z0-9+_])([A-Za-z0-9+_])$//; |
|
357
|
1120
|
|
50
|
|
|
9105
|
my $hash1 = $1 || '0'; |
|
358
|
1120
|
|
50
|
|
|
5422
|
my $hash2 = $2 || '0'; |
|
359
|
1120
|
|
|
|
|
1987
|
my $origdatadir = $pathdatadir; |
|
360
|
1120
|
|
|
|
|
4807
|
$pathdatadir = "$pathdatadir/$hash1/$hash2"; |
|
361
|
|
|
|
|
|
|
# check to see if that hashdir exists... build it up if req'd |
|
362
|
1120
|
100
|
|
|
|
55537
|
if (!-d $pathdatadir) { |
|
363
|
15
|
|
|
|
|
216
|
foreach my $dir ($origdatadir, "$origdatadir/$hash1", $pathdatadir) |
|
364
|
|
|
|
|
|
|
{ |
|
365
|
45
|
100
|
|
|
|
359870
|
(-d $dir) or mkdir ($dir); |
|
366
|
|
|
|
|
|
|
} |
|
367
|
|
|
|
|
|
|
} |
|
368
|
|
|
|
|
|
|
} |
|
369
|
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
# now link(2) the data tmpfile into the 'data' dir. |
|
371
|
1120
|
|
|
|
|
8431
|
my $pathdata = $self->link_into_dir ($job, $pathtmpdata, |
|
372
|
|
|
|
|
|
|
$pathdatadir, $qfnametmp); |
|
373
|
1120
|
50
|
|
|
|
3007
|
if (!$pathdata) { |
|
374
|
0
|
|
|
|
|
0
|
goto failure; |
|
375
|
|
|
|
|
|
|
} |
|
376
|
1120
|
|
|
|
|
1868
|
my $pathdata_created = 1; |
|
377
|
1120
|
|
|
|
|
4159
|
$job->{pathdata} = $pathdata; |
|
378
|
|
|
|
|
|
|
|
|
379
|
|
|
|
|
|
|
# ok, write a control file now that data is safe and we know it's |
|
380
|
|
|
|
|
|
|
# new filename... |
|
381
|
1120
|
50
|
|
|
|
5366
|
if (!$self->create_control_file ($job, $pathtmpdata, $pathtmpctrl)) { |
|
382
|
0
|
|
|
|
|
0
|
goto failure; |
|
383
|
|
|
|
|
|
|
} |
|
384
|
1120
|
|
|
|
|
2024
|
my $pathtmpctrl_created = 1; |
|
385
|
|
|
|
|
|
|
|
|
386
|
|
|
|
|
|
|
# now link(2) that into the 'queue' dir. |
|
387
|
1120
|
|
|
|
|
4832
|
my $pathqueuedir = $self->q_subdir('queue'); |
|
388
|
1120
|
|
|
|
|
5335
|
my $fanout = $self->queue_dir_fanout_create($pathqueuedir); |
|
389
|
|
|
|
|
|
|
|
|
390
|
1120
|
|
|
|
|
4762
|
my $pathqueue = $self->link_into_dir ($job, $pathtmpctrl, |
|
391
|
|
|
|
|
|
|
$self->queue_dir_fanout_path($pathqueuedir, $fanout), |
|
392
|
|
|
|
|
|
|
$qcnametmp); |
|
393
|
|
|
|
|
|
|
|
|
394
|
1120
|
50
|
|
|
|
3082
|
if (!$pathqueue) { |
|
395
|
0
|
|
|
|
|
0
|
dbg ("failed to link_into_dir, enq failed"); |
|
396
|
0
|
|
|
|
|
0
|
goto failure; |
|
397
|
|
|
|
|
|
|
} |
|
398
|
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
# and incr the fanout counter for that fanout dir |
|
400
|
1120
|
|
|
|
|
10855
|
$self->queue_dir_fanout_commit($pathqueuedir, $fanout); |
|
401
|
|
|
|
|
|
|
|
|
402
|
|
|
|
|
|
|
# touch the "queue" directory to indicate that it's changed |
|
403
|
|
|
|
|
|
|
# and a file has been enqueued; required to support Reiserfs |
|
404
|
|
|
|
|
|
|
# and XFS, where this is not implicit |
|
405
|
1120
|
|
|
|
|
3787
|
$pathqueuedir = $self->q_subdir('queue'); |
|
406
|
1120
|
50
|
|
|
|
4148
|
$self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir"; |
|
407
|
1120
|
|
|
|
|
6424
|
dbg ("touched $pathqueuedir at ".time); |
|
408
|
|
|
|
|
|
|
|
|
409
|
1120
|
50
|
|
|
|
4273
|
if ($self->{indexclient}) { |
|
410
|
0
|
|
|
|
|
0
|
$self->{indexclient}->enqueue($pathqueuedir, $pathqueue); |
|
411
|
|
|
|
|
|
|
} |
|
412
|
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
# my $pathqueue_created = 1; # not required, we're done! |
|
414
|
1120
|
|
|
|
|
16787
|
return 1; |
|
415
|
|
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
failure: |
|
417
|
0
|
0
|
|
|
|
0
|
if ($pathtmpctrl_created) { |
|
418
|
0
|
0
|
|
|
|
0
|
unlink $pathtmpctrl or warn "IPC::DirQueue: cannot unlink $pathtmpctrl"; |
|
419
|
|
|
|
|
|
|
} |
|
420
|
0
|
0
|
|
|
|
0
|
if ($pathtmpdata_created) { |
|
421
|
0
|
0
|
|
|
|
0
|
unlink $pathtmpdata or warn "IPC::DirQueue: cannot unlink $pathtmpdata"; |
|
422
|
|
|
|
|
|
|
} |
|
423
|
0
|
0
|
|
|
|
0
|
if ($pathdata_created) { |
|
424
|
0
|
0
|
|
|
|
0
|
unlink $pathdata or warn "IPC::DirQueue: cannot unlink $pathdata"; |
|
425
|
|
|
|
|
|
|
} |
|
426
|
0
|
|
|
|
|
0
|
return; |
|
427
|
|
|
|
|
|
|
} |
|
428
|
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
########################################################################### |
|
430
|
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
=item $job = $dq->pickup_queued_job( [ path => $path ] ); |
|
432
|
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
Pick up the next job in the queue, so that it can be processed. |
|
434
|
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
If no job is available for processing, either because the queue is |
|
436
|
|
|
|
|
|
|
empty or because other worker processes are already working on |
|
437
|
|
|
|
|
|
|
them, C is returned; otherwise, a new instance of C |
|
438
|
|
|
|
|
|
|
is returned. |
|
439
|
|
|
|
|
|
|
|
|
440
|
|
|
|
|
|
|
Note that the job is marked as I until C<$job-Efinish()> |
|
441
|
|
|
|
|
|
|
is called. |
|
442
|
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
If the (optional) parameter C is used, its value indicates the path of |
|
444
|
|
|
|
|
|
|
the desired job's data file. By using this, it is possible to cancel |
|
445
|
|
|
|
|
|
|
not-yet-active items from anywhere in the queue, or pick up jobs out of |
|
446
|
|
|
|
|
|
|
sequence. The data path must match the value of the I member of |
|
447
|
|
|
|
|
|
|
the C object passed to the C callback. |
|
448
|
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
=cut |
|
450
|
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
sub pickup_queued_job { |
|
452
|
403
|
|
|
403
|
1
|
2055
|
my ($self, %args) = @_; |
|
453
|
|
|
|
|
|
|
|
|
454
|
403
|
|
|
|
|
876
|
my $pathqueuedir = $self->q_subdir('queue'); |
|
455
|
403
|
|
|
|
|
975
|
my $pathactivedir = $self->q_subdir('active'); |
|
456
|
403
|
|
|
|
|
1128
|
$self->ensure_dir_exists ($pathactivedir); |
|
457
|
|
|
|
|
|
|
|
|
458
|
403
|
|
|
|
|
1227
|
my $iter = $self->queue_iter_start($pathqueuedir); |
|
459
|
|
|
|
|
|
|
|
|
460
|
403
|
|
|
|
|
1677
|
while (1) { |
|
461
|
1457
|
|
|
|
|
3822
|
my $nextfile = $self->queue_iter_next($iter); |
|
462
|
|
|
|
|
|
|
|
|
463
|
1457
|
100
|
|
|
|
3025
|
if (!defined $nextfile) { |
|
464
|
|
|
|
|
|
|
# no more files in the queue, return empty |
|
465
|
3
|
|
|
|
|
7
|
last; |
|
466
|
|
|
|
|
|
|
} |
|
467
|
|
|
|
|
|
|
|
|
468
|
1454
|
|
|
|
|
3081
|
my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile); |
|
469
|
|
|
|
|
|
|
|
|
470
|
1454
|
100
|
|
|
|
4358
|
next if ($nextfilebase !~ /^\d/); |
|
471
|
1059
|
|
|
|
|
2336
|
my $pathactive = $pathactivedir.SLASH.$nextfilebase; |
|
472
|
1059
|
|
|
|
|
1561
|
my $pathqueue = $pathqueuedir.SLASH.$nextfile; |
|
473
|
|
|
|
|
|
|
|
|
474
|
1059
|
100
|
100
|
|
|
4374
|
next if (exists($args{path}) && ($pathqueue ne $args{path})); |
|
475
|
|
|
|
|
|
|
|
|
476
|
400
|
|
|
|
|
42597
|
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size, |
|
477
|
|
|
|
|
|
|
$atime,$mtime,$ctime,$blksize,$blocks) = lstat($pathactive); |
|
478
|
|
|
|
|
|
|
|
|
479
|
400
|
50
|
|
|
|
927
|
if (defined $mtime) { |
|
480
|
|
|
|
|
|
|
# *DO* call time() here. In extremely large dirs, it may take |
|
481
|
|
|
|
|
|
|
# several seconds to traverse the entire listing from start |
|
482
|
|
|
|
|
|
|
# to finish! |
|
483
|
0
|
0
|
|
|
|
0
|
if (time() - $mtime < $self->{active_file_lifetime}) { |
|
484
|
|
|
|
|
|
|
# active lockfile; it's being worked on. skip this file |
|
485
|
0
|
|
|
|
|
0
|
next; |
|
486
|
|
|
|
|
|
|
} |
|
487
|
|
|
|
|
|
|
|
|
488
|
0
|
0
|
|
|
|
0
|
if ($self->worker_still_working($pathactive)) { |
|
489
|
|
|
|
|
|
|
# worker is still alive, although not updating the lock |
|
490
|
0
|
|
|
|
|
0
|
dbg ("worker still working, skip: $pathactive"); |
|
491
|
0
|
|
|
|
|
0
|
next; |
|
492
|
|
|
|
|
|
|
} |
|
493
|
|
|
|
|
|
|
|
|
494
|
|
|
|
|
|
|
# now, we want to try to avoid 2 or 3 dequeuers removing |
|
495
|
|
|
|
|
|
|
# the lockfile simultaneously, as that could cause this race: |
|
496
|
|
|
|
|
|
|
# |
|
497
|
|
|
|
|
|
|
# dqproc1: [checks file] [unlinks] [starts work] |
|
498
|
|
|
|
|
|
|
# dqproc2: [checks file] [unlinks] |
|
499
|
|
|
|
|
|
|
# |
|
500
|
|
|
|
|
|
|
# ie. the second process unlinks the first process' brand-new |
|
501
|
|
|
|
|
|
|
# lockfile! |
|
502
|
|
|
|
|
|
|
# |
|
503
|
|
|
|
|
|
|
# to avoid this, use a random "fudge" on the timeout, so |
|
504
|
|
|
|
|
|
|
# that dqproc2 will wait for possibly much longer than |
|
505
|
|
|
|
|
|
|
# dqproc1 before it decides to unlink it. |
|
506
|
|
|
|
|
|
|
# |
|
507
|
|
|
|
|
|
|
# this isn't perfect. TODO: is there a "rename this fd" syscall |
|
508
|
|
|
|
|
|
|
# accessible from perl? |
|
509
|
|
|
|
|
|
|
|
|
510
|
0
|
|
|
|
|
0
|
my $fudge = get_random_int() % 256; |
|
511
|
0
|
0
|
|
|
|
0
|
if (time() - $mtime < $self->{active_file_lifetime}+$fudge) { |
|
512
|
|
|
|
|
|
|
# within the fudge zone. don't unlink it in this process. |
|
513
|
0
|
|
|
|
|
0
|
dbg ("within active fudge zone, skip: $pathactive"); |
|
514
|
0
|
|
|
|
|
0
|
next; |
|
515
|
|
|
|
|
|
|
} |
|
516
|
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# else, we can kill the stale lockfile |
|
518
|
0
|
0
|
|
|
|
0
|
unlink $pathactive or warn "IPC::DirQueue: unlink failed: $pathactive"; |
|
519
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: killed stale lockfile: $pathactive"; |
|
520
|
|
|
|
|
|
|
} |
|
521
|
|
|
|
|
|
|
|
|
522
|
|
|
|
|
|
|
# ok, we're free to get cracking on this file. |
|
523
|
400
|
|
|
|
|
902
|
my $pathtmp = $self->q_subdir('tmp'); |
|
524
|
400
|
|
|
|
|
28807
|
$self->ensure_dir_exists ($pathtmp); |
|
525
|
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
# use the name of the queue file itself, plus a tmp prefix, plus active |
|
527
|
400
|
|
|
|
|
1582
|
my $pathtmpactive = $pathtmp.SLASH. |
|
528
|
|
|
|
|
|
|
$nextfilebase.".".$self->new_lock_filename().".active"; |
|
529
|
|
|
|
|
|
|
|
|
530
|
400
|
|
|
|
|
5256
|
dbg ("creating tmp active $pathtmpactive"); |
|
531
|
400
|
50
|
|
|
|
54580
|
if (!sysopen (LOCK, $pathtmpactive, O_WRONLY|O_CREAT|O_EXCL, |
|
532
|
|
|
|
|
|
|
$self->{queue_file_mode})) |
|
533
|
|
|
|
|
|
|
{ |
|
534
|
0
|
0
|
|
|
|
0
|
if ($!{EEXIST}) { |
|
535
|
|
|
|
|
|
|
# contention; skip this file |
|
536
|
0
|
|
|
|
|
0
|
dbg ("IPC::DirQueue: $pathtmpactive already created, skipping: $!"); |
|
537
|
|
|
|
|
|
|
} |
|
538
|
|
|
|
|
|
|
else { |
|
539
|
|
|
|
|
|
|
# could be serious; disk space, permissions etc. |
|
540
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpactive for write: $!"; |
|
541
|
|
|
|
|
|
|
} |
|
542
|
0
|
|
|
|
|
0
|
next; |
|
543
|
|
|
|
|
|
|
} |
|
544
|
400
|
|
|
|
|
1377
|
print LOCK $self->gethostname(), "\n", $$, "\n"; |
|
545
|
400
|
|
|
|
|
26219
|
close LOCK; |
|
546
|
|
|
|
|
|
|
|
|
547
|
400
|
50
|
|
|
|
22167
|
if (!-f $pathqueue) { |
|
548
|
|
|
|
|
|
|
# queue file already gone; another worker got it before we did. |
|
549
|
|
|
|
|
|
|
# catch this case before we create a lockfile. |
|
550
|
|
|
|
|
|
|
# see the "pathqueue_gone" comment below for an explanation |
|
551
|
0
|
|
|
|
|
0
|
dbg("IPC::DirQueue: $pathqueue no longer exists, skipping"); |
|
552
|
0
|
|
|
|
|
0
|
goto nextfile; |
|
553
|
|
|
|
|
|
|
} |
|
554
|
|
|
|
|
|
|
|
|
555
|
400
|
|
|
|
|
3767
|
my $job = IPC::DirQueue::Job->new ($self, { |
|
556
|
|
|
|
|
|
|
jobid => $nextfilebase, |
|
557
|
|
|
|
|
|
|
pathqueue => $pathqueue, |
|
558
|
|
|
|
|
|
|
pathactive => $pathactive |
|
559
|
|
|
|
|
|
|
}); |
|
560
|
|
|
|
|
|
|
|
|
561
|
400
|
|
|
|
|
1337
|
my $pathnewactive = $self->link_into_dir_no_retry ($job, |
|
562
|
|
|
|
|
|
|
$pathtmpactive, $pathactivedir, $nextfilebase); |
|
563
|
400
|
50
|
|
|
|
921
|
if (!defined($pathnewactive)) { |
|
564
|
|
|
|
|
|
|
# link failed; another worker got it before we did |
|
565
|
|
|
|
|
|
|
# no need to unlink tmpfile, the "nextfile" action will do that |
|
566
|
0
|
|
|
|
|
0
|
goto nextfile; |
|
567
|
|
|
|
|
|
|
} |
|
568
|
|
|
|
|
|
|
|
|
569
|
400
|
50
|
|
|
|
934
|
if ($pathactive ne $pathnewactive) { |
|
570
|
0
|
|
|
|
|
0
|
die "oops! active paths differ: $pathactive $pathnewactive"; |
|
571
|
|
|
|
|
|
|
} |
|
572
|
|
|
|
|
|
|
|
|
573
|
400
|
50
|
|
|
|
21251
|
if (!open (IN, "<".$pathqueue)) |
|
574
|
|
|
|
|
|
|
{ |
|
575
|
|
|
|
|
|
|
# since we read the list of files upfront, this can happen: |
|
576
|
|
|
|
|
|
|
# |
|
577
|
|
|
|
|
|
|
# dqproc1: [gets lock] [work] [finish_job] |
|
578
|
|
|
|
|
|
|
# dqproc2: [gets lock] |
|
579
|
|
|
|
|
|
|
# |
|
580
|
|
|
|
|
|
|
# "dqproc1" has already completed the job, unlinking both the active |
|
581
|
|
|
|
|
|
|
# *and* queue files, by the time "dqproc2" gets to it. This is OK; |
|
582
|
|
|
|
|
|
|
# just skip the file, since it's already done. [pathqueue_gone] |
|
583
|
|
|
|
|
|
|
|
|
584
|
0
|
|
|
|
|
0
|
dbg("IPC::DirQueue: cannot open $pathqueue for read: $!"); |
|
585
|
0
|
|
|
|
|
0
|
unlink $pathnewactive; |
|
586
|
0
|
|
|
|
|
0
|
next; # NOT "goto nextfile", as pathtmpactive is already unlinked |
|
587
|
|
|
|
|
|
|
} |
|
588
|
|
|
|
|
|
|
|
|
589
|
400
|
|
|
|
|
3944
|
my $red = $self->read_control_file ($job, \*IN); |
|
590
|
400
|
|
|
|
|
4546
|
close IN; |
|
591
|
|
|
|
|
|
|
|
|
592
|
400
|
50
|
|
|
|
1003
|
next if (!$red); |
|
593
|
|
|
|
|
|
|
|
|
594
|
400
|
|
|
|
|
1257
|
$self->queue_iter_stop($iter); |
|
595
|
400
|
|
|
|
|
3889
|
return $job; |
|
596
|
|
|
|
|
|
|
|
|
597
|
0
|
0
|
|
|
|
0
|
nextfile: |
|
598
|
|
|
|
|
|
|
unlink $pathtmpactive or warn "IPC::DirQueue: unlink failed: $pathtmpactive"; |
|
599
|
|
|
|
|
|
|
} |
|
600
|
|
|
|
|
|
|
|
|
601
|
3
|
|
|
|
|
12
|
$self->queue_iter_stop($iter); |
|
602
|
3
|
|
|
|
|
17
|
return; # empty |
|
603
|
|
|
|
|
|
|
} |
|
604
|
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
########################################################################### |
|
606
|
|
|
|
|
|
|
|
|
607
|
|
|
|
|
|
|
=item $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]); |
|
608
|
|
|
|
|
|
|
|
|
609
|
|
|
|
|
|
|
Wait for a job to be queued within the next C<$timeout> seconds. |
|
610
|
|
|
|
|
|
|
|
|
611
|
|
|
|
|
|
|
If there is already a job ready for processing, this will return immediately. |
|
612
|
|
|
|
|
|
|
If one is not available, it will sleep, wake up periodically, check for job |
|
613
|
|
|
|
|
|
|
availabilty, and either carry on sleeping or return the new job if one |
|
614
|
|
|
|
|
|
|
is now available. |
|
615
|
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
If a job becomes available, a new instance of C is |
|
617
|
|
|
|
|
|
|
returned. If the timeout is reached, C is returned. |
|
618
|
|
|
|
|
|
|
|
|
619
|
|
|
|
|
|
|
If C<$timeout> is not specified, or is less than 1, this function will wait |
|
620
|
|
|
|
|
|
|
indefinitely. |
|
621
|
|
|
|
|
|
|
|
|
622
|
|
|
|
|
|
|
The optional parameter C<$pollinterval> indicates how frequently to wake |
|
623
|
|
|
|
|
|
|
up and check for new jobs. It is specified in seconds, and floating-point |
|
624
|
|
|
|
|
|
|
precision is supported. The default is C<1>. |
|
625
|
|
|
|
|
|
|
|
|
626
|
|
|
|
|
|
|
Note that if C<$timeout> is not a round multiple of C<$pollinterval>, |
|
627
|
|
|
|
|
|
|
the nearest round multiple of C<$pollinterval> greater than C<$timeout> |
|
628
|
|
|
|
|
|
|
will be used instead. Also note that C<$timeout> is used as an integer. |
|
629
|
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
=cut |
|
631
|
|
|
|
|
|
|
|
|
632
|
|
|
|
|
|
|
sub wait_for_queued_job { |
|
633
|
310
|
|
|
310
|
1
|
14533
|
my ($self, $timeout, $pollintvl) = @_; |
|
634
|
|
|
|
|
|
|
|
|
635
|
310
|
|
|
|
|
445
|
my $finishtime; |
|
636
|
310
|
50
|
33
|
|
|
2618
|
if ($timeout && $timeout > 0) { |
|
637
|
0
|
|
|
|
|
0
|
$finishtime = time + int ($timeout); |
|
638
|
|
|
|
|
|
|
} |
|
639
|
|
|
|
|
|
|
|
|
640
|
310
|
|
|
|
|
671
|
dbg "wait_for_queued_job starting"; |
|
641
|
|
|
|
|
|
|
|
|
642
|
310
|
50
|
|
|
|
861
|
if ($pollintvl) { |
|
643
|
0
|
|
|
|
|
0
|
$pollintvl *= 1000000; # from secs to usecs |
|
644
|
|
|
|
|
|
|
} else { |
|
645
|
310
|
|
|
|
|
616
|
$pollintvl = 1000000; # default: 1 sec |
|
646
|
|
|
|
|
|
|
} |
|
647
|
|
|
|
|
|
|
|
|
648
|
310
|
|
|
|
|
883
|
my $pathqueuedir = $self->q_subdir('queue'); |
|
649
|
310
|
|
|
|
|
1009
|
$self->ensure_dir_exists ($pathqueuedir); |
|
650
|
|
|
|
|
|
|
|
|
651
|
|
|
|
|
|
|
# TODO: would be nice to use fam for this, where available. But |
|
652
|
|
|
|
|
|
|
# no biggie... |
|
653
|
|
|
|
|
|
|
|
|
654
|
310
|
|
|
|
|
410
|
while (1) { |
|
655
|
|
|
|
|
|
|
# check the stat time on the queue dir *before* we call pickup, |
|
656
|
|
|
|
|
|
|
# to avoid a race condition where a job is added while we're |
|
657
|
|
|
|
|
|
|
# checking in that function. |
|
658
|
|
|
|
|
|
|
|
|
659
|
310
|
|
|
|
|
10740
|
my @stat = stat ($pathqueuedir); |
|
660
|
310
|
|
|
|
|
810
|
my $qdirlaststat = $stat[9]; |
|
661
|
|
|
|
|
|
|
|
|
662
|
310
|
|
|
|
|
809
|
my $job = $self->pickup_queued_job(); |
|
663
|
310
|
50
|
|
|
|
764
|
if ($job) { return $job; } |
|
|
310
|
|
|
|
|
1526
|
|
|
664
|
|
|
|
|
|
|
|
|
665
|
|
|
|
|
|
|
# there's another semi-race condition here, brought about by a lack of |
|
666
|
|
|
|
|
|
|
# sub-second precision from stat(2). if the last enq occurred inside |
|
667
|
|
|
|
|
|
|
# *this* current 1-second window, then *another* one can happen inside this |
|
668
|
|
|
|
|
|
|
# second right afterwards, and we wouldn't notice. |
|
669
|
|
|
|
|
|
|
|
|
670
|
|
|
|
|
|
|
# in other words (ASCII-art alert): |
|
671
|
|
|
|
|
|
|
# TIME | t | t+1 |
|
672
|
|
|
|
|
|
|
# E | enq enq | |
|
673
|
|
|
|
|
|
|
# D | stat pickup_queued_job | |
|
674
|
|
|
|
|
|
|
|
|
675
|
|
|
|
|
|
|
# the enqueuer process E enqueues a job just after the stat, inside the |
|
676
|
|
|
|
|
|
|
# 1-second period "t". dequeuer process D dequeues it with |
|
677
|
|
|
|
|
|
|
# pickup_queued_job(). all is well. But then, E enqueues another job |
|
678
|
|
|
|
|
|
|
# inside the same 1-second period "t", and since the stat() has already |
|
679
|
|
|
|
|
|
|
# happened for "t", and since we've already picked up the job in "t", we |
|
680
|
|
|
|
|
|
|
# don't recheck; result is, we miss this enqueue event. |
|
681
|
|
|
|
|
|
|
# |
|
682
|
|
|
|
|
|
|
# Avoid this by checking in a busy-loop until time(2) says we're out of |
|
683
|
|
|
|
|
|
|
# that "danger zone" 1-second period. Any further enq's would then |
|
684
|
|
|
|
|
|
|
# cause stat(2) to report a different timestamp. |
|
685
|
|
|
|
|
|
|
|
|
686
|
0
|
|
|
|
|
0
|
while (time == $qdirlaststat) { |
|
687
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep ($pollintvl); |
|
688
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat"; |
|
689
|
0
|
|
|
|
|
0
|
my $job = $self->pickup_queued_job(); |
|
690
|
0
|
0
|
|
|
|
0
|
if ($job) { return $job; } |
|
|
0
|
|
|
|
|
0
|
|
|
691
|
|
|
|
|
|
|
} |
|
692
|
|
|
|
|
|
|
|
|
693
|
|
|
|
|
|
|
# sleep until the directory's mtime changes from what it was when |
|
694
|
|
|
|
|
|
|
# we ran pickup_queued_job() last. |
|
695
|
|
|
|
|
|
|
|
|
696
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: sleeping on $pathqueuedir"; |
|
697
|
0
|
|
|
|
|
0
|
while (1) { |
|
698
|
0
|
|
|
|
|
0
|
my $now = time; |
|
699
|
0
|
0
|
0
|
|
|
0
|
if ($finishtime && $now >= $finishtime) { |
|
700
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job timed out"; |
|
701
|
0
|
|
|
|
|
0
|
return undef; # out of time |
|
702
|
|
|
|
|
|
|
} |
|
703
|
|
|
|
|
|
|
|
|
704
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep ($pollintvl); |
|
705
|
|
|
|
|
|
|
|
|
706
|
0
|
|
|
|
|
0
|
@stat = stat ($pathqueuedir); |
|
707
|
|
|
|
|
|
|
# dbg "wait_for_queued_job: stat $stat[9] $qdirlaststat $pathqueuedir"; |
|
708
|
0
|
0
|
0
|
|
|
0
|
last if (defined $stat[9] && |
|
|
|
|
0
|
|
|
|
|
|
709
|
|
|
|
|
|
|
((defined $qdirlaststat && $stat[9] != $qdirlaststat) |
|
710
|
|
|
|
|
|
|
|| !defined $qdirlaststat)); |
|
711
|
|
|
|
|
|
|
} |
|
712
|
|
|
|
|
|
|
|
|
713
|
0
|
|
|
|
|
0
|
dbg "wait_for_queued_job: activity, calling pickup"; |
|
714
|
|
|
|
|
|
|
} |
|
715
|
|
|
|
|
|
|
} |
|
716
|
|
|
|
|
|
|
|
|
717
|
|
|
|
|
|
|
########################################################################### |
|
718
|
|
|
|
|
|
|
|
|
719
|
|
|
|
|
|
|
=item $dq->visit_all_jobs($visitor, $visitcontext); |
|
720
|
|
|
|
|
|
|
|
|
721
|
|
|
|
|
|
|
Visit all the jobs in the queue, in a read-only mode. Used to list |
|
722
|
|
|
|
|
|
|
the entire queue. |
|
723
|
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
The callback function C<$visitor> will be called for each job in |
|
725
|
|
|
|
|
|
|
the queue, like so: |
|
726
|
|
|
|
|
|
|
|
|
727
|
|
|
|
|
|
|
&$visitor ($visitcontext, $job); |
|
728
|
|
|
|
|
|
|
|
|
729
|
|
|
|
|
|
|
C<$visitcontext> is whatever you pass in that variable above. |
|
730
|
|
|
|
|
|
|
C<$job> is a new, read-only instance of C representing |
|
731
|
|
|
|
|
|
|
that job. |
|
732
|
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
If a job is active (being processed), the C<$job> object also contains the |
|
734
|
|
|
|
|
|
|
following additional data: |
|
735
|
|
|
|
|
|
|
|
|
736
|
|
|
|
|
|
|
'active_host': the hostname on which the job is active |
|
737
|
|
|
|
|
|
|
'active_pid': the process ID of the process which picked up the job |
|
738
|
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
=cut |
|
740
|
|
|
|
|
|
|
|
|
741
|
|
|
|
|
|
|
sub visit_all_jobs { |
|
742
|
4
|
|
|
4
|
1
|
997
|
my ($self, $visitor, $visitcontext) = @_; |
|
743
|
|
|
|
|
|
|
|
|
744
|
4
|
|
|
|
|
17
|
my $pathqueuedir = $self->q_subdir('queue'); |
|
745
|
4
|
|
|
|
|
14
|
my $pathactivedir = $self->q_subdir('active'); |
|
746
|
|
|
|
|
|
|
|
|
747
|
4
|
|
|
|
|
37
|
my $iter = $self->queue_iter_start($pathqueuedir); |
|
748
|
|
|
|
|
|
|
|
|
749
|
4
|
|
|
|
|
17
|
my $nextfile; |
|
750
|
4
|
|
|
|
|
9
|
while (1) { |
|
751
|
122
|
|
|
|
|
933
|
$nextfile = $self->queue_iter_next($iter); |
|
752
|
|
|
|
|
|
|
|
|
753
|
122
|
100
|
|
|
|
245
|
if (!defined $nextfile) { |
|
754
|
|
|
|
|
|
|
# no more files in the queue, return empty |
|
755
|
4
|
|
|
|
|
11
|
last; |
|
756
|
|
|
|
|
|
|
} |
|
757
|
|
|
|
|
|
|
|
|
758
|
118
|
|
|
|
|
246
|
my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile); |
|
759
|
|
|
|
|
|
|
|
|
760
|
118
|
100
|
|
|
|
386
|
next if ($nextfilebase !~ /^\d/); |
|
761
|
90
|
|
|
|
|
176
|
my $pathqueue = $pathqueuedir.SLASH.$nextfile; |
|
762
|
90
|
|
|
|
|
261
|
my $pathactive = $pathactivedir.SLASH.$nextfilebase; |
|
763
|
|
|
|
|
|
|
|
|
764
|
90
|
50
|
|
|
|
1495
|
next if (!-f $pathqueue); |
|
765
|
|
|
|
|
|
|
|
|
766
|
90
|
|
|
|
|
106
|
my $acthost; |
|
767
|
|
|
|
|
|
|
my $actpid; |
|
768
|
90
|
50
|
|
|
|
1268
|
if (open (IN, "<$pathactive")) { |
|
769
|
0
|
|
|
|
|
0
|
$acthost = ; chomp $acthost; |
|
|
0
|
|
|
|
|
0
|
|
|
770
|
0
|
|
|
|
|
0
|
$actpid = ; chomp $actpid; |
|
|
0
|
|
|
|
|
0
|
|
|
771
|
0
|
|
|
|
|
0
|
close IN; |
|
772
|
|
|
|
|
|
|
} |
|
773
|
|
|
|
|
|
|
|
|
774
|
90
|
|
|
|
|
741
|
my $job = IPC::DirQueue::Job->new ($self, { |
|
775
|
|
|
|
|
|
|
is_readonly => 1, # means finish() will not rm files |
|
776
|
|
|
|
|
|
|
jobid => $nextfilebase, |
|
777
|
|
|
|
|
|
|
active_host => $acthost, |
|
778
|
|
|
|
|
|
|
active_pid => $actpid, |
|
779
|
|
|
|
|
|
|
pathqueue => $pathqueue, |
|
780
|
|
|
|
|
|
|
pathactive => $pathactive |
|
781
|
|
|
|
|
|
|
}); |
|
782
|
|
|
|
|
|
|
|
|
783
|
90
|
50
|
|
|
|
3209
|
if (!open (IN, "<".$pathqueue)) { |
|
784
|
0
|
|
|
|
|
0
|
dbg ("queue file disappeared, job finished? skip: $pathqueue"); |
|
785
|
0
|
|
|
|
|
0
|
next; |
|
786
|
|
|
|
|
|
|
} |
|
787
|
|
|
|
|
|
|
|
|
788
|
90
|
|
|
|
|
272
|
my $red = $self->read_control_file ($job, \*IN); |
|
789
|
90
|
|
|
|
|
845
|
close IN; |
|
790
|
|
|
|
|
|
|
|
|
791
|
90
|
50
|
|
|
|
200
|
if (!$red) { |
|
792
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot read control file: $pathqueue"; |
|
793
|
0
|
|
|
|
|
0
|
next; |
|
794
|
|
|
|
|
|
|
} |
|
795
|
|
|
|
|
|
|
|
|
796
|
90
|
|
|
|
|
229
|
&$visitor ($visitcontext, $job); |
|
797
|
|
|
|
|
|
|
} |
|
798
|
|
|
|
|
|
|
|
|
799
|
4
|
|
|
|
|
19
|
$self->queue_iter_stop($iter); |
|
800
|
4
|
|
|
|
|
18
|
return; |
|
801
|
|
|
|
|
|
|
} |
|
802
|
|
|
|
|
|
|
|
|
803
|
|
|
|
|
|
|
########################################################################### |
|
804
|
|
|
|
|
|
|
|
|
805
|
|
|
|
|
|
|
# private API: performs logic of IPC::DirQueue::Job::finish(). |
|
806
|
|
|
|
|
|
|
sub finish_job { |
|
807
|
400
|
|
|
400
|
0
|
633
|
my ($self, $job, $isdone) = @_; |
|
808
|
|
|
|
|
|
|
|
|
809
|
400
|
|
|
|
|
1175
|
dbg ("finish_job: ", $job->{pathactive}); |
|
810
|
|
|
|
|
|
|
|
|
811
|
400
|
50
|
|
|
|
967
|
if ($job->{is_readonly}) { |
|
812
|
0
|
|
|
|
|
0
|
return; |
|
813
|
|
|
|
|
|
|
} |
|
814
|
|
|
|
|
|
|
|
|
815
|
400
|
50
|
|
|
|
837
|
if ($isdone) { |
|
816
|
400
|
50
|
|
|
|
95478
|
unlink($job->{pathqueue}) |
|
817
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{pathqueue}"; |
|
818
|
400
|
50
|
|
|
|
35962
|
unlink($job->{QDFN}) |
|
819
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{QDFN}"; |
|
820
|
|
|
|
|
|
|
|
|
821
|
400
|
50
|
|
|
|
1216
|
if ($self->{indexclient}) { |
|
822
|
0
|
|
|
|
|
0
|
my $pathqueuedir = $self->q_subdir('queue'); |
|
823
|
0
|
|
|
|
|
0
|
$self->{indexclient}->dequeue($pathqueuedir, $job->{pathqueue}); |
|
824
|
|
|
|
|
|
|
} |
|
825
|
|
|
|
|
|
|
|
|
826
|
|
|
|
|
|
|
# touch the dir so that other dequeuers re-check; activity can |
|
827
|
|
|
|
|
|
|
# introduce a small race, I think. (don't think this is necessary) |
|
828
|
|
|
|
|
|
|
# $self->touch($pathqueuedir) or warn "touch failed on $pathqueuedir"; |
|
829
|
|
|
|
|
|
|
} |
|
830
|
|
|
|
|
|
|
|
|
831
|
400
|
50
|
|
|
|
55193
|
unlink($job->{pathactive}) |
|
832
|
|
|
|
|
|
|
or warn "IPC::DirQueue: unlink failed: $job->{pathactive}"; |
|
833
|
|
|
|
|
|
|
} |
|
834
|
|
|
|
|
|
|
|
|
835
|
|
|
|
|
|
|
########################################################################### |
|
836
|
|
|
|
|
|
|
|
|
837
|
|
|
|
|
|
|
sub get_dir_filelist_sorted { |
|
838
|
343
|
|
|
343
|
0
|
530
|
my ($self, $dir) = @_; |
|
839
|
|
|
|
|
|
|
|
|
840
|
343
|
100
|
|
|
|
8755
|
if (!opendir (DIR, $dir)) { |
|
841
|
1
|
|
|
|
|
3
|
return []; # no dir? nothing queued |
|
842
|
|
|
|
|
|
|
} |
|
843
|
|
|
|
|
|
|
# have to read the lot, to sort them. |
|
844
|
342
|
|
|
|
|
41730
|
my @files = sort grep { /^\d/ } readdir(DIR); |
|
|
16384
|
|
|
|
|
47175
|
|
|
845
|
342
|
|
|
|
|
6808
|
closedir DIR; |
|
846
|
342
|
|
|
|
|
3726
|
return \@files; |
|
847
|
|
|
|
|
|
|
} |
|
848
|
|
|
|
|
|
|
|
|
849
|
|
|
|
|
|
|
########################################################################### |
|
850
|
|
|
|
|
|
|
|
|
851
|
|
|
|
|
|
|
sub copy_in_to_out_fh { |
|
852
|
1120
|
|
|
1120
|
0
|
3033
|
my ($self, $fhin, $callbackin, $fhout, $outfname) = @_; |
|
853
|
|
|
|
|
|
|
|
|
854
|
1120
|
|
|
|
|
3277
|
my $buf; |
|
855
|
|
|
|
|
|
|
my $len; |
|
856
|
1120
|
|
|
|
|
4241
|
my $siz = 0; |
|
857
|
|
|
|
|
|
|
|
|
858
|
1120
|
|
|
|
|
12121
|
binmode $fhout; |
|
859
|
1120
|
100
|
|
|
|
5595
|
if ($callbackin) { |
|
860
|
1110
|
|
|
|
|
1820
|
while (1) { |
|
861
|
2420
|
|
|
|
|
7238
|
my $stringin = $callbackin->(); |
|
862
|
|
|
|
|
|
|
|
|
863
|
2420
|
100
|
|
|
|
8219
|
if (!defined($stringin)) { |
|
864
|
1110
|
|
|
|
|
2985
|
last; # EOF |
|
865
|
|
|
|
|
|
|
} |
|
866
|
|
|
|
|
|
|
|
|
867
|
1310
|
|
|
|
|
2793
|
$len = length ($stringin); |
|
868
|
1310
|
50
|
|
|
|
3686
|
next if ($len == 0); # empty string, nothing to write |
|
869
|
|
|
|
|
|
|
|
|
870
|
1310
|
50
|
|
|
|
3842752
|
if (!print $fhout $stringin) { |
|
871
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: enqueue: cannot write to $outfname: $!"; |
|
872
|
0
|
|
|
|
|
0
|
close $fhout; |
|
873
|
0
|
|
|
|
|
0
|
return; |
|
874
|
|
|
|
|
|
|
} |
|
875
|
1310
|
|
|
|
|
2468
|
$siz += $len; |
|
876
|
|
|
|
|
|
|
} |
|
877
|
|
|
|
|
|
|
} |
|
878
|
|
|
|
|
|
|
else { |
|
879
|
10
|
|
|
|
|
16
|
binmode $fhin; |
|
880
|
10
|
|
|
|
|
185
|
while (($len = read ($fhin, $buf, $self->{buf_size})) > 0) { |
|
881
|
10
|
50
|
|
|
|
69
|
if (!print $fhout $buf) { |
|
882
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot write to $outfname: $!"; |
|
883
|
0
|
|
|
|
|
0
|
close $fhin; close $fhout; |
|
|
0
|
|
|
|
|
0
|
|
|
884
|
0
|
|
|
|
|
0
|
return; |
|
885
|
|
|
|
|
|
|
} |
|
886
|
10
|
|
|
|
|
37
|
$siz += $len; |
|
887
|
|
|
|
|
|
|
} |
|
888
|
10
|
|
|
|
|
94
|
close $fhin; |
|
889
|
|
|
|
|
|
|
} |
|
890
|
|
|
|
|
|
|
|
|
891
|
1120
|
50
|
|
|
|
43920005
|
if (!close $fhout) { |
|
892
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot close $outfname"; |
|
893
|
0
|
|
|
|
|
0
|
return; |
|
894
|
|
|
|
|
|
|
} |
|
895
|
1120
|
|
|
|
|
6438
|
return $siz; |
|
896
|
|
|
|
|
|
|
} |
|
897
|
|
|
|
|
|
|
|
|
898
|
|
|
|
|
|
|
sub link_into_dir { |
|
899
|
2240
|
|
|
2240
|
0
|
5312
|
my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_; |
|
900
|
2240
|
|
|
|
|
5480
|
$self->ensure_dir_exists ($pathlinkdir); |
|
901
|
2240
|
|
|
|
|
2902
|
my $path; |
|
902
|
|
|
|
|
|
|
|
|
903
|
|
|
|
|
|
|
# retry 10 times; add a random few digits on link(2) failure |
|
904
|
2240
|
|
|
|
|
3661
|
my $maxretries = 10; |
|
905
|
2240
|
|
|
|
|
6374
|
for my $retry (1 .. $maxretries) { |
|
906
|
2240
|
|
|
|
|
6498
|
$path = $pathlinkdir.SLASH.$qfname; |
|
907
|
|
|
|
|
|
|
|
|
908
|
2240
|
|
|
|
|
8335
|
dbg ("link_into_dir retry=", $retry, " tmp=", $pathtmp, " path=", $path); |
|
909
|
|
|
|
|
|
|
|
|
910
|
2240
|
50
|
|
|
|
17790520
|
if (link ($pathtmp, $path)) { |
|
911
|
2240
|
|
|
|
|
5310
|
last; # got it |
|
912
|
|
|
|
|
|
|
} |
|
913
|
|
|
|
|
|
|
|
|
914
|
|
|
|
|
|
|
# link() may return failure, even if it succeeded. |
|
915
|
|
|
|
|
|
|
# use lstat() to verify that link() really failed. |
|
916
|
0
|
|
|
|
|
0
|
my ($dev,$ino,$mode,$nlink,$uid) = lstat($pathtmp); |
|
917
|
0
|
0
|
|
|
|
0
|
if ($nlink == 2) { |
|
918
|
0
|
|
|
|
|
0
|
last; # got it |
|
919
|
|
|
|
|
|
|
} |
|
920
|
|
|
|
|
|
|
|
|
921
|
|
|
|
|
|
|
# failed. check for retry limit first |
|
922
|
0
|
0
|
|
|
|
0
|
if ($retry == $maxretries) { |
|
923
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot link $pathtmp to $path"; |
|
924
|
0
|
|
|
|
|
0
|
return; |
|
925
|
|
|
|
|
|
|
} |
|
926
|
|
|
|
|
|
|
|
|
927
|
|
|
|
|
|
|
# try a new q_filename, use randomness to avoid |
|
928
|
|
|
|
|
|
|
# further collisions |
|
929
|
0
|
|
|
|
|
0
|
$qfname = $self->new_q_filename($job, 1); |
|
930
|
|
|
|
|
|
|
|
|
931
|
0
|
|
|
|
|
0
|
dbg ("link_into_dir retrying: $retry"); |
|
932
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep (250 * $retry); |
|
933
|
|
|
|
|
|
|
} |
|
934
|
|
|
|
|
|
|
|
|
935
|
|
|
|
|
|
|
# got it! unlink(2) the tmp file, since we don't need it. |
|
936
|
2240
|
|
|
|
|
14866
|
dbg ("link_into_dir unlink tmp file: $pathtmp"); |
|
937
|
2240
|
50
|
|
|
|
231222
|
if (!unlink ($pathtmp)) { |
|
938
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot unlink $pathtmp"; |
|
939
|
|
|
|
|
|
|
# non-fatal, we can still continue anyway |
|
940
|
|
|
|
|
|
|
} |
|
941
|
|
|
|
|
|
|
|
|
942
|
2240
|
|
|
|
|
13718
|
dbg ("link_into_dir return: $path"); |
|
943
|
2240
|
|
|
|
|
6846
|
return $path; |
|
944
|
|
|
|
|
|
|
} |
|
945
|
|
|
|
|
|
|
|
|
946
|
|
|
|
|
|
|
sub link_into_dir_no_retry { |
|
947
|
400
|
|
|
400
|
0
|
779
|
my ($self, $job, $pathtmp, $pathlinkdir, $qfname) = @_; |
|
948
|
400
|
|
|
|
|
821
|
$self->ensure_dir_exists ($pathlinkdir); |
|
949
|
|
|
|
|
|
|
|
|
950
|
400
|
|
|
|
|
971
|
dbg ("lidnr: ", $pathtmp, " ", $pathlinkdir, "/", $qfname); |
|
951
|
|
|
|
|
|
|
|
|
952
|
400
|
|
|
|
|
10663
|
my ($dev1,$ino1,$mode1,$nlink1,$uid1) = lstat($pathtmp); |
|
953
|
400
|
50
|
|
|
|
1186
|
if (!defined $nlink1) { |
|
954
|
0
|
|
|
|
|
0
|
warn ("lidnr: tmp file disappeared?! $pathtmp"); |
|
955
|
0
|
|
|
|
|
0
|
return; # not going to have much luck here |
|
956
|
|
|
|
|
|
|
} |
|
957
|
|
|
|
|
|
|
|
|
958
|
400
|
|
|
|
|
857
|
my $path = $pathlinkdir.SLASH.$qfname; |
|
959
|
|
|
|
|
|
|
|
|
960
|
400
|
50
|
|
|
|
5427
|
if (-f $path) { |
|
961
|
0
|
|
|
|
|
0
|
dbg ("lidnr: target file already exists: $path"); |
|
962
|
0
|
|
|
|
|
0
|
return; # we've been beaten to it |
|
963
|
|
|
|
|
|
|
} |
|
964
|
|
|
|
|
|
|
|
|
965
|
400
|
|
|
|
|
549
|
my $linkfailed; |
|
966
|
400
|
50
|
|
|
|
40396
|
if (!link ($pathtmp, $path)) { |
|
967
|
0
|
|
|
|
|
0
|
dbg("link failure, recovering: $!"); |
|
968
|
0
|
|
|
|
|
0
|
$linkfailed = 1; |
|
969
|
|
|
|
|
|
|
} |
|
970
|
|
|
|
|
|
|
|
|
971
|
|
|
|
|
|
|
# link() may return failure, even if it succeeded. use lstat() to verify that |
|
972
|
|
|
|
|
|
|
# link() really failed. use lstat() even if it reported success, just to be |
|
973
|
|
|
|
|
|
|
# sure. ;) |
|
974
|
|
|
|
|
|
|
|
|
975
|
400
|
|
|
|
|
6268
|
my ($dev3,$ino3,$mode3,$nlink3,$uid3) = lstat($path); |
|
976
|
400
|
50
|
|
|
|
5285
|
if (!defined $nlink3) { |
|
977
|
0
|
|
|
|
|
0
|
dbg ("lidnr: link failed, target file nonexistent: $path"); |
|
978
|
0
|
|
|
|
|
0
|
return; |
|
979
|
|
|
|
|
|
|
} |
|
980
|
|
|
|
|
|
|
|
|
981
|
|
|
|
|
|
|
# now, be paranoid and verify that the inode data is identical |
|
982
|
400
|
50
|
33
|
|
|
3472
|
if ($dev1 != $dev3 || $ino1 != $ino3 || $uid1 != $uid3) { |
|
|
|
|
33
|
|
|
|
|
|
983
|
|
|
|
|
|
|
# the tmpfile and the target don't match each other. |
|
984
|
|
|
|
|
|
|
# if the link failed, this means that another qproc got |
|
985
|
|
|
|
|
|
|
# the file before we did, which is not an error. |
|
986
|
0
|
0
|
|
|
|
0
|
if (!$linkfailed) { |
|
987
|
|
|
|
|
|
|
# link supposedly succeeded, so this *is* an error. warn |
|
988
|
0
|
|
|
|
|
0
|
warn ("lidnr: tmp file doesn't match target: $path ($dev3,$ino3,$mode3,$nlink3,$uid3) vs $pathtmp ($dev1,$ino1,$mode1,$nlink1,$uid1)"); |
|
989
|
|
|
|
|
|
|
} |
|
990
|
0
|
|
|
|
|
0
|
return; |
|
991
|
|
|
|
|
|
|
} |
|
992
|
|
|
|
|
|
|
|
|
993
|
|
|
|
|
|
|
# got it! unlink(2) the tmp file, since we don't need it. |
|
994
|
400
|
|
|
|
|
1253
|
dbg ("lidnr: unlink tmp file: $pathtmp"); |
|
995
|
400
|
50
|
|
|
|
30321
|
if (!unlink ($pathtmp)) { |
|
996
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot unlink $pathtmp"; |
|
997
|
|
|
|
|
|
|
# non-fatal, we can still continue anyway |
|
998
|
|
|
|
|
|
|
} |
|
999
|
|
|
|
|
|
|
|
|
1000
|
400
|
|
|
|
|
1197
|
dbg ("lidnr: return: $path"); |
|
1001
|
400
|
|
|
|
|
2465
|
return $path; |
|
1002
|
|
|
|
|
|
|
} |
|
1003
|
|
|
|
|
|
|
|
|
1004
|
|
|
|
|
|
|
sub create_control_file { |
|
1005
|
1120
|
|
|
1120
|
0
|
2777
|
my ($self, $job, $pathtmpdata, $pathtmpctrl) = @_; |
|
1006
|
|
|
|
|
|
|
|
|
1007
|
1120
|
|
|
|
|
6403
|
dbg ("create_control_file $pathtmpctrl for $pathtmpdata ($job->{pathdata})"); |
|
1008
|
1120
|
50
|
|
|
|
140349
|
if (!sysopen (OUT, $pathtmpctrl, O_WRONLY|O_CREAT|O_EXCL, |
|
1009
|
|
|
|
|
|
|
$self->{queue_file_mode})) |
|
1010
|
|
|
|
|
|
|
{ |
|
1011
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open $pathtmpctrl for write: $!"; |
|
1012
|
0
|
|
|
|
|
0
|
return; |
|
1013
|
|
|
|
|
|
|
} |
|
1014
|
|
|
|
|
|
|
|
|
1015
|
1120
|
|
|
|
|
7745
|
print OUT "QDFN: ", $job->{pathdata}, "\n"; |
|
1016
|
1120
|
|
|
|
|
7610
|
print OUT "QDSB: ", $job->{size_bytes}, "\n"; |
|
1017
|
1120
|
|
|
|
|
4324
|
print OUT "QSTT: ", $job->{time_submitted_secs}, "\n"; |
|
1018
|
1120
|
|
|
|
|
4984
|
print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n"; |
|
1019
|
1120
|
|
|
|
|
3897
|
print OUT "QSHN: ", $self->gethostname(), "\n"; |
|
1020
|
|
|
|
|
|
|
|
|
1021
|
1120
|
|
|
|
|
2889
|
my $md = $job->{metadata}; |
|
1022
|
1120
|
|
|
|
|
1669
|
foreach my $k (keys %{$md}) { |
|
|
1120
|
|
|
|
|
5154
|
|
|
1023
|
1120
|
|
|
|
|
2885
|
my $v = $md->{$k}; |
|
1024
|
1120
|
50
|
33
|
|
|
15395
|
if (($k =~ /^Q...$/) |
|
|
|
|
33
|
|
|
|
|
|
1025
|
|
|
|
|
|
|
|| ($k =~ /[:\0\n]/s) |
|
1026
|
|
|
|
|
|
|
|| ($v =~ /[\0\n]/s)) |
|
1027
|
|
|
|
|
|
|
{ |
|
1028
|
0
|
|
|
|
|
0
|
close OUT; |
|
1029
|
0
|
|
|
|
|
0
|
die "IPC::DirQueue: invalid metadatum: '$k'"; # TODO: clean up files? |
|
1030
|
|
|
|
|
|
|
} |
|
1031
|
1120
|
|
|
|
|
7583
|
print OUT $k, ": ", $v, "\n"; |
|
1032
|
|
|
|
|
|
|
} |
|
1033
|
|
|
|
|
|
|
|
|
1034
|
1120
|
50
|
|
|
|
9898090
|
if (!close (OUT)) { |
|
1035
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot close $pathtmpctrl for write: $!"; |
|
1036
|
0
|
|
|
|
|
0
|
return; |
|
1037
|
|
|
|
|
|
|
} |
|
1038
|
|
|
|
|
|
|
|
|
1039
|
1120
|
|
|
|
|
4787
|
return 1; |
|
1040
|
|
|
|
|
|
|
} |
|
1041
|
|
|
|
|
|
|
|
|
1042
|
|
|
|
|
|
|
sub read_control_file { |
|
1043
|
490
|
|
|
490
|
0
|
973
|
my ($self, $job, $infh) = @_; |
|
1044
|
490
|
|
|
|
|
629
|
local ($_); |
|
1045
|
|
|
|
|
|
|
|
|
1046
|
490
|
|
|
|
|
7792
|
while (<$infh>) { |
|
1047
|
2940
|
|
|
|
|
19660
|
my ($k, $value) = split (/: /, $_, 2); |
|
1048
|
2940
|
|
|
|
|
4028
|
chop $value; |
|
1049
|
2940
|
100
|
|
|
|
8907
|
if ($k =~ /^Q[A-Z]{3}$/) { |
|
1050
|
2450
|
|
|
|
|
20818
|
$job->{$k} = $value; |
|
1051
|
|
|
|
|
|
|
} |
|
1052
|
|
|
|
|
|
|
else { |
|
1053
|
490
|
|
|
|
|
5077
|
$job->{metadata}->{$k} = $value; |
|
1054
|
|
|
|
|
|
|
} |
|
1055
|
|
|
|
|
|
|
} |
|
1056
|
|
|
|
|
|
|
|
|
1057
|
|
|
|
|
|
|
# all jobs must have a datafile (even if it's empty) |
|
1058
|
490
|
50
|
33
|
|
|
11227
|
if (!$job->{QDFN} || !-f $job->{QDFN}) { |
|
1059
|
0
|
|
|
|
|
0
|
return; |
|
1060
|
|
|
|
|
|
|
} |
|
1061
|
|
|
|
|
|
|
|
|
1062
|
490
|
|
|
|
|
1743
|
return $job; |
|
1063
|
|
|
|
|
|
|
# print OUT "QDFN: ", $job->{pathdata}, "\n"; |
|
1064
|
|
|
|
|
|
|
# print OUT "QDSB: ", $job->{size_bytes}, "\n"; |
|
1065
|
|
|
|
|
|
|
# print OUT "QSTT: ", $job->{time_submitted_secs}, "\n"; |
|
1066
|
|
|
|
|
|
|
# print OUT "QSTM: ", $job->{time_submitted_msecs}, "\n"; |
|
1067
|
|
|
|
|
|
|
# print OUT "QSHN: ", $self->gethostname(), "\n"; |
|
1068
|
|
|
|
|
|
|
} |
|
1069
|
|
|
|
|
|
|
|
|
1070
|
|
|
|
|
|
|
sub worker_still_working { |
|
1071
|
0
|
|
|
0
|
0
|
0
|
my ($self, $fname) = @_; |
|
1072
|
0
|
0
|
|
|
|
0
|
if (!$fname) { |
|
1073
|
0
|
|
|
|
|
0
|
return; |
|
1074
|
|
|
|
|
|
|
} |
|
1075
|
0
|
0
|
|
|
|
0
|
if (!open (IN, "<".$fname)) { |
|
1076
|
0
|
|
|
|
|
0
|
return; |
|
1077
|
|
|
|
|
|
|
} |
|
1078
|
0
|
|
|
|
|
0
|
my $hname = ; chomp $hname; |
|
|
0
|
|
|
|
|
0
|
|
|
1079
|
0
|
|
|
|
|
0
|
my $wpid = ; chomp $wpid; |
|
|
0
|
|
|
|
|
0
|
|
|
1080
|
0
|
|
|
|
|
0
|
close IN; |
|
1081
|
0
|
0
|
|
|
|
0
|
if ($hname eq $self->gethostname()) { |
|
1082
|
0
|
0
|
|
|
|
0
|
if (!kill (0, $wpid)) { |
|
1083
|
0
|
|
|
|
|
0
|
return; # pid is local and no longer running |
|
1084
|
|
|
|
|
|
|
} |
|
1085
|
|
|
|
|
|
|
} |
|
1086
|
|
|
|
|
|
|
|
|
1087
|
|
|
|
|
|
|
# pid is still running, or remote |
|
1088
|
0
|
|
|
|
|
0
|
return 1; |
|
1089
|
|
|
|
|
|
|
} |
|
1090
|
|
|
|
|
|
|
|
|
1091
|
|
|
|
|
|
|
########################################################################### |
|
1092
|
|
|
|
|
|
|
|
|
1093
|
|
|
|
|
|
|
sub q_dir { |
|
1094
|
6004
|
|
|
6004
|
0
|
9126
|
my ($self) = @_; |
|
1095
|
6004
|
|
|
|
|
51461
|
return $self->{dir}; |
|
1096
|
|
|
|
|
|
|
} |
|
1097
|
|
|
|
|
|
|
|
|
1098
|
|
|
|
|
|
|
sub q_subdir { |
|
1099
|
6004
|
|
|
6004
|
0
|
12163
|
my ($self, $subdir) = @_; |
|
1100
|
6004
|
|
|
|
|
20900
|
return $self->q_dir().SLASH.$subdir; |
|
1101
|
|
|
|
|
|
|
} |
|
1102
|
|
|
|
|
|
|
|
|
1103
|
|
|
|
|
|
|
sub new_q_filename { |
|
1104
|
1120
|
|
|
1120
|
0
|
2304
|
my ($self, $job, $addextra) = @_; |
|
1105
|
|
|
|
|
|
|
|
|
1106
|
1120
|
|
|
|
|
12616
|
my @gmt = gmtime ($job->{time_submitted_secs}); |
|
1107
|
|
|
|
|
|
|
|
|
1108
|
|
|
|
|
|
|
# NN.20040718140300MMMM.hash(hostname.$$)[.rand] |
|
1109
|
|
|
|
|
|
|
# |
|
1110
|
|
|
|
|
|
|
# NN = priority, default 50 |
|
1111
|
|
|
|
|
|
|
# MMMM = microseconds from Time::HiRes::gettimeofday() |
|
1112
|
|
|
|
|
|
|
# hostname = current hostname |
|
1113
|
|
|
|
|
|
|
|
|
1114
|
1120
|
|
|
|
|
12604
|
my $buf = sprintf ("%02d.%04d%02d%02d%02d%02d%02d%06d.%s", |
|
1115
|
|
|
|
|
|
|
$job->{pri}, |
|
1116
|
|
|
|
|
|
|
$gmt[5]+1900, $gmt[4]+1, $gmt[3], $gmt[2], $gmt[1], $gmt[0], |
|
1117
|
|
|
|
|
|
|
$job->{time_submitted_msecs}, |
|
1118
|
|
|
|
|
|
|
hash_string_to_filename ($self->gethostname().$$)); |
|
1119
|
|
|
|
|
|
|
|
|
1120
|
|
|
|
|
|
|
# normally, this isn't used. but if there's a collision, |
|
1121
|
|
|
|
|
|
|
# all retries after that will do this; in this case, the |
|
1122
|
|
|
|
|
|
|
# extra anti-collision stuff is useful |
|
1123
|
1120
|
50
|
|
|
|
4287
|
if ($addextra) { |
|
1124
|
0
|
|
|
|
|
0
|
$buf .= ".".$$.".".$self->get_random_int(); |
|
1125
|
|
|
|
|
|
|
} |
|
1126
|
|
|
|
|
|
|
|
|
1127
|
1120
|
|
|
|
|
4609
|
return $buf; |
|
1128
|
|
|
|
|
|
|
} |
|
1129
|
|
|
|
|
|
|
|
|
1130
|
|
|
|
|
|
|
sub hash_string_to_filename { |
|
1131
|
1120
|
|
|
1120
|
0
|
3310
|
my ($str) = @_; |
|
1132
|
|
|
|
|
|
|
# get a 16-bit checksum of the input, then uuencode that string |
|
1133
|
1120
|
|
|
|
|
13994
|
$str = pack ("u*", unpack ("%16C*", $str)); |
|
1134
|
|
|
|
|
|
|
# transcode from uuencode-space into safe, base64-ish space |
|
1135
|
1120
|
|
|
|
|
5823
|
$str =~ y/ -_/A-Za-z0-9+_/; |
|
1136
|
|
|
|
|
|
|
# and remove the stuff that wasn't in that "safe" range |
|
1137
|
1120
|
|
|
|
|
4067
|
$str =~ y/A-Za-z0-9+_//cd; |
|
1138
|
1120
|
|
|
|
|
12647
|
return $str; |
|
1139
|
|
|
|
|
|
|
} |
|
1140
|
|
|
|
|
|
|
|
|
1141
|
|
|
|
|
|
|
sub new_lock_filename { |
|
1142
|
400
|
|
|
400
|
0
|
636
|
my ($self) = @_; |
|
1143
|
400
|
|
|
|
|
1261
|
return sprintf ("%d.%s.%d", time, $self->gethostname(), $$); |
|
1144
|
|
|
|
|
|
|
} |
|
1145
|
|
|
|
|
|
|
|
|
1146
|
|
|
|
|
|
|
sub get_random_int { |
|
1147
|
130
|
|
|
130
|
0
|
231
|
my ($self) = @_; |
|
1148
|
|
|
|
|
|
|
|
|
1149
|
|
|
|
|
|
|
# we try to use /dev/random first, as that's globally random for all PIDs on |
|
1150
|
|
|
|
|
|
|
# the system. this avoids brokenness if the caller has called srand(), then |
|
1151
|
|
|
|
|
|
|
# forked multiple enqueueing procs, as they will all share the same seed and |
|
1152
|
|
|
|
|
|
|
# will all return the same "random" output. |
|
1153
|
130
|
|
|
|
|
166
|
my $buf; |
|
1154
|
130
|
50
|
33
|
|
|
2701
|
if (sysopen (IN, "
|
|
1155
|
0
|
|
|
|
|
0
|
my ($hi, $lo) = unpack ("C2", $buf); |
|
1156
|
0
|
|
|
|
|
0
|
return ($hi << 8) | $lo; |
|
1157
|
|
|
|
|
|
|
} else { |
|
1158
|
|
|
|
|
|
|
# fall back to plain old rand(), use perl's implicit srand() call, |
|
1159
|
|
|
|
|
|
|
# and hope caller hasn't called srand() yet in a parent process. |
|
1160
|
130
|
|
|
|
|
1005
|
return int rand (65536); |
|
1161
|
|
|
|
|
|
|
} |
|
1162
|
|
|
|
|
|
|
} |
|
1163
|
|
|
|
|
|
|
|
|
1164
|
|
|
|
|
|
|
sub gethostname { |
|
1165
|
3040
|
|
|
3040
|
0
|
6736
|
my ($self) = @_; |
|
1166
|
|
|
|
|
|
|
|
|
1167
|
3040
|
|
|
|
|
9931
|
my $hname = $self->{myhostname}; |
|
1168
|
3040
|
100
|
|
|
|
26152
|
return $hname if $hname; |
|
1169
|
|
|
|
|
|
|
|
|
1170
|
|
|
|
|
|
|
# try using Sys::Hostname. may fail on non-UNIX platforms |
|
1171
|
16
|
|
|
16
|
|
7853
|
eval ' |
|
|
16
|
|
|
|
|
42804
|
|
|
|
16
|
|
|
|
|
37575
|
|
|
|
16
|
|
|
|
|
1818
|
|
|
1172
|
|
|
|
|
|
|
use Sys::Hostname; |
|
1173
|
|
|
|
|
|
|
$self->{myhostname} = hostname; # cache the result |
|
1174
|
|
|
|
|
|
|
'; |
|
1175
|
|
|
|
|
|
|
|
|
1176
|
|
|
|
|
|
|
# could have failed. supply a default in that case |
|
1177
|
16
|
|
50
|
|
|
694
|
$self->{myhostname} ||= 'nohost'; |
|
1178
|
|
|
|
|
|
|
|
|
1179
|
16
|
|
|
|
|
265
|
return $self->{myhostname}; |
|
1180
|
|
|
|
|
|
|
} |
|
1181
|
|
|
|
|
|
|
|
|
1182
|
|
|
|
|
|
|
sub ensure_dir_exists { |
|
1183
|
5155
|
|
|
5155
|
0
|
9658
|
my ($self, $dir) = @_; |
|
1184
|
5155
|
100
|
|
|
|
24863
|
return if exists ($self->{ensured_dir_exists}->{$dir}); |
|
1185
|
105
|
|
|
|
|
381
|
$self->{ensured_dir_exists}->{$dir} = 1; |
|
1186
|
105
|
100
|
|
|
|
59784
|
(-d $dir) or mkdir($dir); |
|
1187
|
|
|
|
|
|
|
} |
|
1188
|
|
|
|
|
|
|
|
|
1189
|
|
|
|
|
|
|
sub queuedir_is_bad { |
|
1190
|
2
|
|
|
2
|
0
|
5
|
my ($self, $pathqueuedir) = @_; |
|
1191
|
|
|
|
|
|
|
|
|
1192
|
|
|
|
|
|
|
# try creating the dir; it may not exist yet |
|
1193
|
2
|
|
|
|
|
6
|
$self->ensure_dir_exists ($pathqueuedir); |
|
1194
|
2
|
50
|
|
|
|
51
|
if (!opendir (RETRY, $pathqueuedir)) { |
|
1195
|
|
|
|
|
|
|
# still can't open it! problem |
|
1196
|
0
|
|
|
|
|
0
|
warn "IPC::DirQueue: cannot open queue dir \"$pathqueuedir\": $!\n"; |
|
1197
|
0
|
|
|
|
|
0
|
return 1; |
|
1198
|
|
|
|
|
|
|
} |
|
1199
|
|
|
|
|
|
|
# otherwise, we could open it -- it just needed to be created. |
|
1200
|
2
|
|
|
|
|
19
|
closedir RETRY; |
|
1201
|
2
|
|
|
|
|
19
|
return 0; |
|
1202
|
|
|
|
|
|
|
} |
|
1203
|
|
|
|
|
|
|
|
|
1204
|
|
|
|
|
|
|
sub dbg { |
|
1205
|
12135
|
50
|
|
12135
|
0
|
40130
|
return unless $DEBUG; |
|
1206
|
0
|
|
|
|
|
0
|
warn "dq debug: ".join(' ',@_)."\n"; |
|
1207
|
|
|
|
|
|
|
} |
|
1208
|
|
|
|
|
|
|
|
|
1209
|
|
|
|
|
|
|
########################################################################### |
|
1210
|
|
|
|
|
|
|
|
|
1211
|
|
|
|
|
|
|
sub queue_iter_start { |
|
1212
|
407
|
|
|
407
|
0
|
794
|
my ($self, $pathqueuedir) = @_; |
|
1213
|
|
|
|
|
|
|
|
|
1214
|
407
|
50
|
|
|
|
1670
|
if ($self->{indexclient}) { |
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
1215
|
0
|
|
|
|
|
0
|
dbg ("queue iter: getting list for $pathqueuedir"); |
|
1216
|
0
|
|
|
|
|
0
|
my @files = sort grep { /^\d/ } $self->{indexclient}->ls($pathqueuedir); |
|
|
0
|
|
|
|
|
0
|
|
|
1217
|
|
|
|
|
|
|
|
|
1218
|
0
|
0
|
|
|
|
0
|
if (scalar @files <= 0) { |
|
1219
|
0
|
0
|
|
|
|
0
|
return if $self->queuedir_is_bad($pathqueuedir); |
|
1220
|
|
|
|
|
|
|
} |
|
1221
|
|
|
|
|
|
|
|
|
1222
|
0
|
|
|
|
|
0
|
return { files => \@files }; |
|
1223
|
|
|
|
|
|
|
} |
|
1224
|
|
|
|
|
|
|
elsif ($self->{ordered}) { |
|
1225
|
343
|
|
|
|
|
1383
|
dbg ("queue iter: opening $pathqueuedir (ordered)"); |
|
1226
|
343
|
|
|
|
|
38399
|
my $files = $self->get_dir_filelist_sorted($pathqueuedir); |
|
1227
|
343
|
100
|
|
|
|
1016
|
if (scalar @$files <= 0) { |
|
1228
|
2
|
50
|
|
|
|
10
|
return if $self->queuedir_is_bad($pathqueuedir); |
|
1229
|
|
|
|
|
|
|
} |
|
1230
|
|
|
|
|
|
|
|
|
1231
|
343
|
|
|
|
|
1481
|
return { files => $files }; |
|
1232
|
|
|
|
|
|
|
} |
|
1233
|
|
|
|
|
|
|
elsif ($self->{queue_fanout}) { |
|
1234
|
32
|
|
|
|
|
85
|
return $self->queue_iter_fanout_start($pathqueuedir); |
|
1235
|
|
|
|
|
|
|
} |
|
1236
|
|
|
|
|
|
|
else { |
|
1237
|
32
|
|
|
|
|
35
|
my $dirfh; |
|
1238
|
32
|
|
|
|
|
78
|
dbg ("queue iter: opening $pathqueuedir"); |
|
1239
|
32
|
50
|
|
|
|
797
|
if (!opendir ($dirfh, $pathqueuedir)) { |
|
1240
|
0
|
0
|
|
|
|
0
|
return if $self->queuedir_is_bad($pathqueuedir); |
|
1241
|
0
|
0
|
|
|
|
0
|
if (!opendir ($dirfh, $pathqueuedir)) { |
|
1242
|
0
|
|
|
|
|
0
|
warn "oops? pathqueuedir bad"; |
|
1243
|
0
|
|
|
|
|
0
|
return; |
|
1244
|
|
|
|
|
|
|
} |
|
1245
|
|
|
|
|
|
|
} |
|
1246
|
|
|
|
|
|
|
|
|
1247
|
32
|
|
|
|
|
108
|
return { fh => $dirfh }; |
|
1248
|
|
|
|
|
|
|
} |
|
1249
|
|
|
|
|
|
|
|
|
1250
|
0
|
|
|
|
|
0
|
die "cannot get here"; |
|
1251
|
|
|
|
|
|
|
} |
|
1252
|
|
|
|
|
|
|
|
|
1253
|
|
|
|
|
|
|
sub queue_iter_next { |
|
1254
|
1579
|
|
|
1579
|
0
|
2371
|
my ($self, $iter) = @_; |
|
1255
|
|
|
|
|
|
|
|
|
1256
|
1579
|
50
|
|
|
|
5319
|
if ($self->{indexclient}) { |
|
|
|
100
|
|
|
|
|
|
|
|
|
100
|
|
|
|
|
|
|
1257
|
0
|
|
|
|
|
0
|
return shift @{$iter->{files}}; |
|
|
0
|
|
|
|
|
0
|
|
|
1258
|
|
|
|
|
|
|
} |
|
1259
|
|
|
|
|
|
|
elsif ($self->{ordered}) { |
|
1260
|
546
|
|
|
|
|
692
|
return shift @{$iter->{files}}; |
|
|
546
|
|
|
|
|
1714
|
|
|
1261
|
|
|
|
|
|
|
} |
|
1262
|
|
|
|
|
|
|
elsif ($self->{queue_fanout}) { |
|
1263
|
687
|
|
|
|
|
1589
|
return $self->queue_iter_fanout_next($iter); |
|
1264
|
|
|
|
|
|
|
} |
|
1265
|
|
|
|
|
|
|
else { |
|
1266
|
346
|
|
|
|
|
1988
|
return readdir($iter->{fh}); |
|
1267
|
|
|
|
|
|
|
} |
|
1268
|
|
|
|
|
|
|
|
|
1269
|
0
|
|
|
|
|
0
|
return; |
|
1270
|
|
|
|
|
|
|
} |
|
1271
|
|
|
|
|
|
|
|
|
1272
|
|
|
|
|
|
|
sub queue_iter_stop { |
|
1273
|
407
|
|
|
407
|
0
|
638
|
my ($self, $iter) = @_; |
|
1274
|
|
|
|
|
|
|
|
|
1275
|
407
|
50
|
|
|
|
822
|
return unless $iter; |
|
1276
|
407
|
100
|
|
|
|
1260
|
if (defined $iter->{fanfh}) { closedir($iter->{fanfh}); } |
|
|
30
|
|
|
|
|
403
|
|
|
1277
|
407
|
100
|
|
|
|
1296
|
if (defined $iter->{fh}) { closedir($iter->{fh}); } |
|
|
32
|
|
|
|
|
347
|
|
|
1278
|
|
|
|
|
|
|
} |
|
1279
|
|
|
|
|
|
|
|
|
1280
|
|
|
|
|
|
|
########################################################################### |
|
1281
|
|
|
|
|
|
|
|
|
1282
|
|
|
|
|
|
|
sub queue_dir_fanout_create { |
|
1283
|
1120
|
|
|
1120
|
0
|
2178
|
my ($self, $pathqueuedir) = @_; |
|
1284
|
|
|
|
|
|
|
|
|
1285
|
1120
|
100
|
|
|
|
4449
|
if (!$self->{queue_fanout}) { |
|
1286
|
990
|
|
|
|
|
2580
|
return; |
|
1287
|
|
|
|
|
|
|
} |
|
1288
|
|
|
|
|
|
|
|
|
1289
|
130
|
|
|
|
|
1922
|
my @letters = split '', q{0123456789abcdef}; |
|
1290
|
130
|
|
|
|
|
582
|
my $fanout = $letters[get_random_int() % (scalar @letters)]; |
|
1291
|
|
|
|
|
|
|
|
|
1292
|
130
|
|
|
|
|
571
|
$self->ensure_dir_exists ($pathqueuedir); |
|
1293
|
130
|
|
|
|
|
581
|
$self->ensure_dir_exists ($pathqueuedir.SLASH.$fanout); |
|
1294
|
130
|
|
|
|
|
701
|
return $fanout; |
|
1295
|
|
|
|
|
|
|
} |
|
1296
|
|
|
|
|
|
|
|
|
1297
|
|
|
|
|
|
|
sub queue_dir_fanout_commit { |
|
1298
|
1120
|
|
|
1120
|
0
|
2234
|
my ($self, $pathqueuedir, $fanout) = @_; |
|
1299
|
|
|
|
|
|
|
|
|
1300
|
1120
|
100
|
|
|
|
4010
|
if (!$self->{queue_fanout}) { |
|
1301
|
990
|
|
|
|
|
3452
|
return; |
|
1302
|
|
|
|
|
|
|
} |
|
1303
|
|
|
|
|
|
|
|
|
1304
|
|
|
|
|
|
|
# now touch all levels ($pathqueuedir will be touched later) |
|
1305
|
130
|
50
|
|
|
|
810
|
$self->touch($pathqueuedir.SLASH.$fanout) |
|
1306
|
|
|
|
|
|
|
or die "cannot touch fanout for $pathqueuedir/$fanout"; |
|
1307
|
|
|
|
|
|
|
} |
|
1308
|
|
|
|
|
|
|
|
|
1309
|
|
|
|
|
|
|
sub queue_dir_fanout_path { |
|
1310
|
1120
|
|
|
1120
|
0
|
2266
|
my ($self, $pathqueuedir, $fanout) = @_; |
|
1311
|
|
|
|
|
|
|
|
|
1312
|
1120
|
100
|
|
|
|
4403
|
if (!$self->{queue_fanout}) { |
|
1313
|
990
|
|
|
|
|
3672
|
return $pathqueuedir; |
|
1314
|
|
|
|
|
|
|
} |
|
1315
|
|
|
|
|
|
|
else { |
|
1316
|
130
|
|
|
|
|
692
|
return $pathqueuedir.SLASH.$fanout; |
|
1317
|
|
|
|
|
|
|
} |
|
1318
|
|
|
|
|
|
|
} |
|
1319
|
|
|
|
|
|
|
|
|
1320
|
|
|
|
|
|
|
sub queue_dir_fanout_path_strip { |
|
1321
|
1572
|
|
|
1572
|
0
|
2174
|
my ($self, $fname) = @_; |
|
1322
|
|
|
|
|
|
|
|
|
1323
|
1572
|
100
|
|
|
|
11466
|
if ($self->{queue_fanout}) { |
|
1324
|
685
|
|
|
|
|
19182
|
$fname =~ s/^.*\///; |
|
1325
|
|
|
|
|
|
|
} |
|
1326
|
1572
|
|
|
|
|
3501
|
return $fname; |
|
1327
|
|
|
|
|
|
|
} |
|
1328
|
|
|
|
|
|
|
|
|
1329
|
|
|
|
|
|
|
sub queue_iter_fanout_start { |
|
1330
|
32
|
|
|
32
|
0
|
47
|
my ($self, $pathqueuedir) = @_; |
|
1331
|
32
|
|
|
|
|
57
|
my $iter = { }; |
|
1332
|
|
|
|
|
|
|
|
|
1333
|
|
|
|
|
|
|
{ |
|
1334
|
32
|
|
|
|
|
40
|
my @fanouts; |
|
|
32
|
|
|
|
|
40
|
|
|
1335
|
32
|
|
|
|
|
98
|
dbg ("queue iter: opening $pathqueuedir"); |
|
1336
|
32
|
50
|
|
|
|
703
|
if (!opendir (DIR, $pathqueuedir)) { |
|
1337
|
0
|
|
|
|
|
0
|
@fanouts = (); # no dir? nothing queued |
|
1338
|
|
|
|
|
|
|
} |
|
1339
|
|
|
|
|
|
|
else { |
|
1340
|
416
|
|
|
|
|
6664
|
my %map = map { |
|
1341
|
480
|
|
|
|
|
1055
|
$_ => (-M $pathqueuedir.SLASH.$_) |
|
1342
|
32
|
|
|
|
|
471
|
} grep { /^[a-z0-9]$/ } readdir(DIR); |
|
1343
|
32
|
|
|
|
|
283
|
@fanouts = sort { $map{$a} <=> $map{$b} } keys %map; |
|
|
896
|
|
|
|
|
1193
|
|
|
1344
|
32
|
|
|
|
|
187
|
dbg ("fanout: $pathqueuedir, order is ".join ' ', @fanouts); |
|
1345
|
|
|
|
|
|
|
} |
|
1346
|
32
|
|
|
|
|
350
|
closedir DIR; |
|
1347
|
32
|
|
|
|
|
101
|
$iter->{fanoutlist} = \@fanouts; |
|
1348
|
32
|
|
|
|
|
80
|
$iter->{pathqueuedir} = $pathqueuedir; |
|
1349
|
|
|
|
|
|
|
|
|
1350
|
|
|
|
|
|
|
} |
|
1351
|
32
|
|
|
|
|
95
|
return $iter; |
|
1352
|
|
|
|
|
|
|
} |
|
1353
|
|
|
|
|
|
|
|
|
1354
|
|
|
|
|
|
|
sub queue_iter_fanout_next { |
|
1355
|
687
|
|
|
687
|
0
|
1841
|
my ($self, $iter) = @_; |
|
1356
|
|
|
|
|
|
|
|
|
1357
|
|
|
|
|
|
|
# dir handles are: |
|
1358
|
|
|
|
|
|
|
# /path/to/queue = $iter->{fh} |
|
1359
|
|
|
|
|
|
|
# /f = $iter->{fanfh} |
|
1360
|
|
|
|
|
|
|
|
|
1361
|
|
|
|
|
|
|
next_fanout: |
|
1362
|
|
|
|
|
|
|
|
|
1363
|
|
|
|
|
|
|
# open the {fanfh} handle, if it isn't already going |
|
1364
|
884
|
100
|
|
|
|
1997
|
if (!defined $iter->{fanfh}) { |
|
1365
|
229
|
|
|
|
|
228
|
my $nextfanout = shift @{$iter->{fanoutlist}}; |
|
|
229
|
|
|
|
|
507
|
|
|
1366
|
229
|
100
|
|
|
|
484
|
if (!defined $nextfanout) { |
|
1367
|
2
|
|
|
|
|
5
|
dbg ("fanout: end of list"); |
|
1368
|
2
|
|
|
|
|
5
|
return; |
|
1369
|
|
|
|
|
|
|
} |
|
1370
|
|
|
|
|
|
|
|
|
1371
|
227
|
|
|
|
|
248
|
my $dirfh; |
|
1372
|
227
|
|
|
|
|
607
|
dbg ("fanout: opening next dir: $nextfanout"); |
|
1373
|
227
|
50
|
|
|
|
18624
|
if (!opendir ($dirfh, $iter->{pathqueuedir}.SLASH.$nextfanout)) { |
|
1374
|
0
|
|
|
|
|
0
|
warn "opendir failed $iter->{pathqueuedir}/$nextfanout: $!"; |
|
1375
|
0
|
|
|
|
|
0
|
return; |
|
1376
|
|
|
|
|
|
|
} |
|
1377
|
|
|
|
|
|
|
|
|
1378
|
227
|
|
|
|
|
420
|
$iter->{fanstr} = $nextfanout; |
|
1379
|
227
|
|
|
|
|
686
|
$iter->{fanfh} = $dirfh; |
|
1380
|
|
|
|
|
|
|
} |
|
1381
|
|
|
|
|
|
|
|
|
1382
|
882
|
|
|
|
|
4492
|
my $fname = readdir($iter->{fanfh}); |
|
1383
|
882
|
100
|
|
|
|
1960
|
if (defined $fname) { |
|
1384
|
685
|
|
|
|
|
2150
|
return $iter->{fanstr}.SLASH.$fname; # best-case scenario |
|
1385
|
|
|
|
|
|
|
} |
|
1386
|
|
|
|
|
|
|
|
|
1387
|
197
|
|
|
|
|
415
|
dbg ("fanout: finished this dir, trying next one"); |
|
1388
|
197
|
|
|
|
|
2056
|
closedir($iter->{fanfh}); |
|
1389
|
197
|
|
|
|
|
358
|
$iter->{fanstr} = undef; |
|
1390
|
197
|
|
|
|
|
289
|
$iter->{fanfh} = undef; |
|
1391
|
197
|
|
|
|
|
546
|
goto next_fanout; |
|
1392
|
|
|
|
|
|
|
} |
|
1393
|
|
|
|
|
|
|
|
|
1394
|
23
|
|
|
23
|
|
316
|
use constant UTIME_TAKES_UNDEF_FOR_TOUCH => ($] >= 5.007002); |
|
|
23
|
|
|
|
|
80
|
|
|
|
23
|
|
|
|
|
5614
|
|
|
1395
|
|
|
|
|
|
|
|
|
1396
|
|
|
|
|
|
|
sub touch { |
|
1397
|
1250
|
|
|
1250
|
0
|
3001
|
my ($self, $path) = @_; |
|
1398
|
|
|
|
|
|
|
|
|
1399
|
|
|
|
|
|
|
# 'Since perl 5.7.2, if the first two elements of the list are "undef", then |
|
1400
|
|
|
|
|
|
|
# the utime(2) function in the C library will be called with a null second |
|
1401
|
|
|
|
|
|
|
# argument. On most systems, this will set the file's access and modification |
|
1402
|
|
|
|
|
|
|
# times to the current time'. |
|
1403
|
|
|
|
|
|
|
|
|
1404
|
1250
|
|
|
|
|
1771
|
if (UTIME_TAKES_UNDEF_FOR_TOUCH) { |
|
1405
|
1250
|
|
|
|
|
58649
|
return utime undef, undef, $path; |
|
1406
|
|
|
|
|
|
|
} else { |
|
1407
|
|
|
|
|
|
|
my $now = time; |
|
1408
|
|
|
|
|
|
|
return utime $now, $now, $path; |
|
1409
|
|
|
|
|
|
|
} |
|
1410
|
|
|
|
|
|
|
} |
|
1411
|
|
|
|
|
|
|
|
|
1412
|
|
|
|
|
|
|
########################################################################### |
|
1413
|
|
|
|
|
|
|
|
|
1414
|
|
|
|
|
|
|
1; |
|
1415
|
|
|
|
|
|
|
|
|
1416
|
|
|
|
|
|
|
=back |
|
1417
|
|
|
|
|
|
|
|
|
1418
|
|
|
|
|
|
|
=head1 STALE LOCKS AND SIGNAL HANDLING |
|
1419
|
|
|
|
|
|
|
|
|
1420
|
|
|
|
|
|
|
If interrupted or terminated, dequeueing processes should be careful to either |
|
1421
|
|
|
|
|
|
|
call C<$job-Efinish()> or C<$job-Ereturn_to_queue()> on any active |
|
1422
|
|
|
|
|
|
|
tasks before exiting -- otherwise those jobs will remain marked I. |
|
1423
|
|
|
|
|
|
|
|
|
1424
|
|
|
|
|
|
|
Dequeueing processes can also call C<$job-Etouch_active_lock()> |
|
1425
|
|
|
|
|
|
|
periodically, while processing large tasks, to ensure that the task is still |
|
1426
|
|
|
|
|
|
|
marked as I. |
|
1427
|
|
|
|
|
|
|
|
|
1428
|
|
|
|
|
|
|
Stale locks are normally dealt with automatically. If a lock is still |
|
1429
|
|
|
|
|
|
|
I after about 10 minutes of inactivity, the other dequeuers on |
|
1430
|
|
|
|
|
|
|
that machine will probe the process ID listed in that lock file using |
|
1431
|
|
|
|
|
|
|
C. If that process ID is no longer running, the lock is presumed |
|
1432
|
|
|
|
|
|
|
likely to be stale. If a given timeout (10 minutes plus a random value |
|
1433
|
|
|
|
|
|
|
between 0 and 256 seconds) has elapsed since the lock file was last |
|
1434
|
|
|
|
|
|
|
modified, the lock file is deleted. |
|
1435
|
|
|
|
|
|
|
|
|
1436
|
|
|
|
|
|
|
This 10-minute default can be modified using the C |
|
1437
|
|
|
|
|
|
|
parameter to the C constructor. |
|
1438
|
|
|
|
|
|
|
|
|
1439
|
|
|
|
|
|
|
Note: this means that if the dequeueing processes are spread among |
|
1440
|
|
|
|
|
|
|
multiple machines, and there is no longer a dequeuer running on the |
|
1441
|
|
|
|
|
|
|
machine that initially 'locked' the task, it will never be unlocked, |
|
1442
|
|
|
|
|
|
|
unless you delete the I file for that task. |
|
1443
|
|
|
|
|
|
|
|
|
1444
|
|
|
|
|
|
|
=head1 QUEUE DIRECTORY STRUCTURE |
|
1445
|
|
|
|
|
|
|
|
|
1446
|
|
|
|
|
|
|
C maintains the following structure for a queue directory: |
|
1447
|
|
|
|
|
|
|
|
|
1448
|
|
|
|
|
|
|
=over 4 |
|
1449
|
|
|
|
|
|
|
|
|
1450
|
|
|
|
|
|
|
=item queue directory |
|
1451
|
|
|
|
|
|
|
|
|
1452
|
|
|
|
|
|
|
The B directory is used to store the queue control files. Queue |
|
1453
|
|
|
|
|
|
|
control files determine what jobs are in the queue; if a job has a queue |
|
1454
|
|
|
|
|
|
|
control file in this directory, it is listed in the queue. |
|
1455
|
|
|
|
|
|
|
|
|
1456
|
|
|
|
|
|
|
The filename format is as follows: |
|
1457
|
|
|
|
|
|
|
|
|
1458
|
|
|
|
|
|
|
50.20040909232529941258.HASH[.PID.RAND] |
|
1459
|
|
|
|
|
|
|
|
|
1460
|
|
|
|
|
|
|
The first two digits (C<50>) are the priority of the job. Lower priority |
|
1461
|
|
|
|
|
|
|
numbers are run first. C<20040909232529> is the current date and time when the |
|
1462
|
|
|
|
|
|
|
enqueueing process was run, in C format. C<941258> is the time in |
|
1463
|
|
|
|
|
|
|
microseconds, as returned by C. And finally, C is a |
|
1464
|
|
|
|
|
|
|
variable-length hash of some semi-random data, used to increase the chance of |
|
1465
|
|
|
|
|
|
|
uniqueness. |
|
1466
|
|
|
|
|
|
|
|
|
1467
|
|
|
|
|
|
|
If there is a collision, the timestamps are regenerated after a 250 msec sleep, |
|
1468
|
|
|
|
|
|
|
and further randomness will be added at the end of the string (namely, the |
|
1469
|
|
|
|
|
|
|
current process ID and a random integer value). Up to 10 retries will be |
|
1470
|
|
|
|
|
|
|
attempted. Once the file is atomically moved into the B directory |
|
1471
|
|
|
|
|
|
|
without collision, the retries cease. |
|
1472
|
|
|
|
|
|
|
|
|
1473
|
|
|
|
|
|
|
If B was used in the C constructor, then |
|
1474
|
|
|
|
|
|
|
the B directory does not contain the queue control files directly; |
|
1475
|
|
|
|
|
|
|
instead, there is an interposing set of 16 "fan-out" directories, named |
|
1476
|
|
|
|
|
|
|
according to the hex digits from C<0> to C. |
|
1477
|
|
|
|
|
|
|
|
|
1478
|
|
|
|
|
|
|
=item active directory |
|
1479
|
|
|
|
|
|
|
|
|
1480
|
|
|
|
|
|
|
The B directory is used to store active queue control files. |
|
1481
|
|
|
|
|
|
|
|
|
1482
|
|
|
|
|
|
|
When a job becomes 'active' -- ie. is picked up by C -- |
|
1483
|
|
|
|
|
|
|
its control file is moved from the B directory into the B |
|
1484
|
|
|
|
|
|
|
directory while it is processed. |
|
1485
|
|
|
|
|
|
|
|
|
1486
|
|
|
|
|
|
|
=item data directory |
|
1487
|
|
|
|
|
|
|
|
|
1488
|
|
|
|
|
|
|
The B directory is used to store enqueued data files. |
|
1489
|
|
|
|
|
|
|
|
|
1490
|
|
|
|
|
|
|
It contains a two-level "fan-out" hashed directory structure; each data file is |
|
1491
|
|
|
|
|
|
|
stored under a single-letter directory, which in turn is under a single-letter |
|
1492
|
|
|
|
|
|
|
directory. This increases the efficiency of directory lookups under many |
|
1493
|
|
|
|
|
|
|
filesystems. |
|
1494
|
|
|
|
|
|
|
|
|
1495
|
|
|
|
|
|
|
The format of filenames here is similar to that used in the B directory, |
|
1496
|
|
|
|
|
|
|
except that the last two characters are removed and used instead for the |
|
1497
|
|
|
|
|
|
|
"fan-out" directory names. |
|
1498
|
|
|
|
|
|
|
|
|
1499
|
|
|
|
|
|
|
=item tmp directory |
|
1500
|
|
|
|
|
|
|
|
|
1501
|
|
|
|
|
|
|
The B directory contains temporary work files that are in the process |
|
1502
|
|
|
|
|
|
|
of enqueueing, and not ready ready for processing. |
|
1503
|
|
|
|
|
|
|
|
|
1504
|
|
|
|
|
|
|
The filename format here is similar to the above, with suffixes indicating |
|
1505
|
|
|
|
|
|
|
the type of file (".ctrl", ".data"). |
|
1506
|
|
|
|
|
|
|
|
|
1507
|
|
|
|
|
|
|
=back |
|
1508
|
|
|
|
|
|
|
|
|
1509
|
|
|
|
|
|
|
Atomic, NFS-safe renaming is used to avoid collisions, overwriting or |
|
1510
|
|
|
|
|
|
|
other unsafe operations. |
|
1511
|
|
|
|
|
|
|
|
|
1512
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
1513
|
|
|
|
|
|
|
|
|
1514
|
|
|
|
|
|
|
C |
|
1515
|
|
|
|
|
|
|
|
|
1516
|
|
|
|
|
|
|
=head1 AUTHOR |
|
1517
|
|
|
|
|
|
|
|
|
1518
|
|
|
|
|
|
|
Justin Mason Edq /at/ jmason.orgE |
|
1519
|
|
|
|
|
|
|
|
|
1520
|
|
|
|
|
|
|
=head1 MAILING LIST |
|
1521
|
|
|
|
|
|
|
|
|
1522
|
|
|
|
|
|
|
The IPC::DirQueue mailing list is at Eipc-dirqueue-subscribe@perl.orgE. |
|
1523
|
|
|
|
|
|
|
|
|
1524
|
|
|
|
|
|
|
=head1 COPYRIGHT |
|
1525
|
|
|
|
|
|
|
|
|
1526
|
|
|
|
|
|
|
C is distributed under the same license as perl itself. |
|
1527
|
|
|
|
|
|
|
|
|
1528
|
|
|
|
|
|
|
=head1 AVAILABILITY |
|
1529
|
|
|
|
|
|
|
|
|
1530
|
|
|
|
|
|
|
The latest version of this library is likely to be available from CPAN. |
|
1531
|
|
|
|
|
|
|
|