line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Data::Consumer::Dir; |
2
|
|
|
|
|
|
|
|
3
|
5
|
|
|
5
|
|
257640
|
use warnings; |
|
5
|
|
|
|
|
55
|
|
|
5
|
|
|
|
|
210
|
|
4
|
5
|
|
|
5
|
|
25
|
use strict; |
|
5
|
|
|
|
|
5
|
|
|
5
|
|
|
|
|
95
|
|
5
|
5
|
|
|
5
|
|
5720
|
use DBI; |
|
5
|
|
|
|
|
69490
|
|
|
5
|
|
|
|
|
330
|
|
6
|
5
|
|
|
5
|
|
50
|
use Carp qw(confess); |
|
5
|
|
|
|
|
10
|
|
|
5
|
|
|
|
|
225
|
|
7
|
5
|
|
|
5
|
|
25
|
use warnings FATAL => 'all'; |
|
5
|
|
|
|
|
15
|
|
|
5
|
|
|
|
|
225
|
|
8
|
5
|
|
|
5
|
|
25
|
use base 'Data::Consumer'; |
|
5
|
|
|
|
|
5
|
|
|
5
|
|
|
|
|
2245
|
|
9
|
5
|
|
|
5
|
|
30
|
use File::Spec; |
|
5
|
|
|
|
|
10
|
|
|
5
|
|
|
|
|
365
|
|
10
|
5
|
|
|
5
|
|
40
|
use File::Path; |
|
5
|
|
|
|
|
20
|
|
|
5
|
|
|
|
|
240
|
|
11
|
5
|
|
|
5
|
|
25
|
use Fcntl; |
|
5
|
|
|
|
|
5
|
|
|
5
|
|
|
|
|
920
|
|
12
|
5
|
|
|
5
|
|
25
|
use Fcntl ':flock'; |
|
5
|
|
|
|
|
10
|
|
|
5
|
|
|
|
|
530
|
|
13
|
5
|
|
|
5
|
|
30
|
use vars qw/$Debug $VERSION $Cmd $Fail/; |
|
5
|
|
|
|
|
5
|
|
|
5
|
|
|
|
|
330
|
|
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
# This code was formatted with the following perltidy options: |
16
|
|
|
|
|
|
|
# -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis |
17
|
|
|
|
|
|
|
# If you patch it please use the same options for your patch. |
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
*Debug= *Data::Consumer::Debug; |
20
|
|
|
|
|
|
|
*Cmd= *Data::Consumer::Cmd; |
21
|
|
|
|
|
|
|
*Fail= *Data::Consumer::Fail; |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
BEGIN { |
24
|
5
|
|
|
5
|
|
40
|
__PACKAGE__->register(); |
25
|
|
|
|
|
|
|
} |
26
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
=head1 NAME |
28
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
Data::Consumer::Dir - Data::Consumer implementation for a directory of files resource |
30
|
|
|
|
|
|
|
|
31
|
|
|
|
|
|
|
=head1 VERSION |
32
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
Version 0.17 |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
=cut |
36
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
$VERSION= '0.17'; |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
=head1 SYNOPSIS |
40
|
|
|
|
|
|
|
|
41
|
|
|
|
|
|
|
use Data::Consumer::Dir; |
42
|
|
|
|
|
|
|
|
43
|
|
|
|
|
|
|
my $consumer = Data::Consumer::Dir->new( |
44
|
|
|
|
|
|
|
root => '/some/dir', |
45
|
|
|
|
|
|
|
create => 1, |
46
|
|
|
|
|
|
|
open_mode => '+<', |
47
|
|
|
|
|
|
|
); |
48
|
|
|
|
|
|
|
|
49
|
|
|
|
|
|
|
$consumer->consume( sub { |
50
|
|
|
|
|
|
|
my $id = shift; |
51
|
|
|
|
|
|
|
print "processed $id\n"; |
52
|
|
|
|
|
|
|
} ); |
53
|
|
|
|
|
|
|
|
54
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=head1 FUNCTIONS |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=head2 CLASS->new(%opts) |
58
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
Constructor for a L instance. |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
Either the C option must be provided or both C and |
62
|
|
|
|
|
|
|
C arguments must be defined. Will die if the directories do |
63
|
|
|
|
|
|
|
not exist unless the C option is set to a true value. |
64
|
|
|
|
|
|
|
|
65
|
|
|
|
|
|
|
=over 4 |
66
|
|
|
|
|
|
|
|
67
|
|
|
|
|
|
|
=item unprocessed => $path_spec |
68
|
|
|
|
|
|
|
|
69
|
|
|
|
|
|
|
Directory within which unprocessed files will be found. |
70
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
May also be a callback which is responsible for marking the item as |
72
|
|
|
|
|
|
|
unprocessed. This will be called with the arguments C<($consumer, |
73
|
|
|
|
|
|
|
'unprocessed', $spec, $fh, $name)>. |
74
|
|
|
|
|
|
|
|
75
|
|
|
|
|
|
|
=item working => $path_spec |
76
|
|
|
|
|
|
|
|
77
|
|
|
|
|
|
|
Files will be moved to this directory prior to be processed. |
78
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
May also be a callback which is responsible for marking the item as |
80
|
|
|
|
|
|
|
working. This will be called with the arguments C<($consumer, |
81
|
|
|
|
|
|
|
'working', $spec, $fh, $name)>. |
82
|
|
|
|
|
|
|
|
83
|
|
|
|
|
|
|
=item processed => $path_spec |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
Once successfully processed the files will be moved to this directory. |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
May also be a callback which is responsible for marking the item as |
88
|
|
|
|
|
|
|
processed. This will be called with the arguments C<($consumer, |
89
|
|
|
|
|
|
|
'processed', $spec, $fh, $name)>. |
90
|
|
|
|
|
|
|
|
91
|
|
|
|
|
|
|
=item failed => $path_spec |
92
|
|
|
|
|
|
|
|
93
|
|
|
|
|
|
|
If processing fails then the files will be moved to this directory. |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
May also be a callback which is responsible for marking the item as |
96
|
|
|
|
|
|
|
failed. This will be called with the arguments C<($consumer, 'failed', |
97
|
|
|
|
|
|
|
$spec, $fh, $name)>. |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
=item root => $path_spec |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
Automatically creates any of the C, C, |
102
|
|
|
|
|
|
|
C, or C directories below a specified C. Only |
103
|
|
|
|
|
|
|
those directories not explicitly defined will be automatically created |
104
|
|
|
|
|
|
|
so this can be used in conjunction with the other options. |
105
|
|
|
|
|
|
|
|
106
|
|
|
|
|
|
|
=item create => $bool |
107
|
|
|
|
|
|
|
|
108
|
|
|
|
|
|
|
=item create_mode => $mode_flags |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
If true then directories specified by not existing will be created. |
111
|
|
|
|
|
|
|
If C is specified then the directories will be created with that mode. |
112
|
|
|
|
|
|
|
|
113
|
|
|
|
|
|
|
=item open_mode => $mode_str |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
In order to lock a file a filehandle must be opened, normally in |
116
|
|
|
|
|
|
|
read-only mode (C<< < >>), however it may be useful to open with other |
117
|
|
|
|
|
|
|
modes. |
118
|
|
|
|
|
|
|
|
119
|
|
|
|
|
|
|
=back |
120
|
|
|
|
|
|
|
|
121
|
|
|
|
|
|
|
=cut |
122
|
|
|
|
|
|
|
|
123
|
0
|
|
|
|
|
0
|
BEGIN { |
124
|
5
|
|
|
5
|
|
25
|
my @keys= qw(unprocessed working processed failed); |
125
|
5
|
|
|
|
|
20
|
my %m= ( |
126
|
|
|
|
|
|
|
'<' => O_RDONLY, |
127
|
|
|
|
|
|
|
'+<' => O_RDWR, |
128
|
|
|
|
|
|
|
'>>' => O_APPEND | O_WRONLY, |
129
|
|
|
|
|
|
|
'+>>' => O_APPEND | O_RDWR, |
130
|
|
|
|
|
|
|
); |
131
|
5
|
|
|
|
|
3370
|
$_= $_ | O_NONBLOCK for values %m; |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
sub new { |
134
|
5
|
|
|
5
|
1
|
4084744
|
my ( $class, %opts )= @_; |
135
|
5
|
|
|
|
|
266
|
my $self= $class->SUPER::new(); # let Data::Consumer bless the hash |
136
|
|
|
|
|
|
|
|
137
|
5
|
50
|
|
|
|
43
|
if ( $opts{root} ) { |
138
|
5
|
|
|
|
|
326
|
my ( $v, $p )= File::Spec->splitpath( $opts{root}, 'nofile' ); |
139
|
5
|
|
|
|
|
41
|
for my $type (@keys) { |
140
|
20
|
|
33
|
|
|
478
|
$opts{$type} ||= File::Spec->catpath( $v, File::Spec->catdir( $p, $type ), '' ); |
141
|
|
|
|
|
|
|
} |
142
|
|
|
|
|
|
|
} |
143
|
|
|
|
|
|
|
( $opts{unprocessed} and $opts{processed} ) |
144
|
5
|
50
|
33
|
|
|
106
|
or confess "Arguments 'unprocessed' and 'processed' are mandatory"; |
145
|
|
|
|
|
|
|
|
146
|
5
|
50
|
|
|
|
27
|
if ( $opts{create} ) { |
147
|
5
|
|
|
|
|
12
|
for (@keys) { |
148
|
20
|
50
|
|
|
|
55
|
next unless exists $opts{$_}; |
149
|
20
|
100
|
|
|
|
217
|
next if -d $opts{$_}; |
150
|
3
|
|
33
|
|
|
368
|
mkpath( $opts{$_}, $Debug, $opts{create_mode} || () ); |
151
|
|
|
|
|
|
|
} |
152
|
|
|
|
|
|
|
} |
153
|
5
|
50
|
|
|
|
42
|
if ( $opts{open_mode} ) { |
154
|
|
|
|
|
|
|
exists $m{ $opts{open_mode} } |
155
|
|
|
|
|
|
|
or confess "Illegal open mode '$opts{open_mode}' legal options are " |
156
|
5
|
50
|
|
|
|
30
|
. join( ',', map { "'$_'" } sort keys %m ) . "\n"; |
|
0
|
|
|
|
|
0
|
|
157
|
5
|
|
|
|
|
16
|
$opts{open_mode}= $m{ $opts{open_mode} }; |
158
|
|
|
|
|
|
|
} else { |
159
|
0
|
|
|
|
|
0
|
$opts{open_mode}= O_RDONLY | O_NONBLOCK; |
160
|
|
|
|
|
|
|
} |
161
|
|
|
|
|
|
|
|
162
|
5
|
|
|
|
|
25
|
%$self= %opts; |
163
|
5
|
|
|
|
|
22
|
return $self; |
164
|
|
|
|
|
|
|
} |
165
|
|
|
|
|
|
|
} |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
=head2 $object->reset() |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
Reset the state of the object. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
=head2 $object->acquire() |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
Acquire an item to be processed. |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
Returns an identifier to be used to identify the item acquired. |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
=head2 $object->release() |
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
Release any locks on the currently held item. |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
Normally there is no need to call this directly. |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
=cut |
184
|
|
|
|
|
|
|
|
185
|
|
|
|
|
|
|
sub reset { |
186
|
11
|
|
|
11
|
1
|
26
|
my $self= shift; |
187
|
11
|
|
|
|
|
67
|
$self->debug_warn( 5, "reset (scanning $self->{unprocessed})" ); |
188
|
11
|
|
|
|
|
94
|
$self->release(); |
189
|
|
|
|
|
|
|
opendir my $dh, $self->{unprocessed} |
190
|
11
|
50
|
|
|
|
467
|
or die "Failed to opendir '$self->{unprocessed}': $!"; |
191
|
11
|
50
|
|
|
|
415
|
my @files= map { /(.*)/s && $1 } readdir($dh); |
|
262
|
|
|
|
|
1179
|
|
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
#print for @files; |
194
|
11
|
|
|
|
|
66
|
@files= sort grep { -f _cf( $self->{unprocessed}, $_ ) } @files; |
|
262
|
|
|
|
|
578
|
|
195
|
11
|
|
|
|
|
53
|
$self->{files}= \@files; |
196
|
11
|
|
|
|
|
195
|
return $self; |
197
|
|
|
|
|
|
|
} |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
sub _cf { # cat file |
200
|
602
|
|
|
602
|
|
1363
|
my ( $r, $f )= @_; |
201
|
|
|
|
|
|
|
|
202
|
602
|
|
|
|
|
3956
|
my ( $v, $p )= File::Spec->splitpath( $r, 'nofile' ); |
203
|
602
|
|
|
|
|
5442
|
return File::Spec->catpath( $v, $p, $f ); |
204
|
|
|
|
|
|
|
} |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
sub _do_callback { |
207
|
50
|
|
|
50
|
|
160
|
my ( $self, $callback )= @_; |
208
|
50
|
|
|
|
|
130
|
local $Fail; |
209
|
50
|
50
|
|
|
|
103
|
if ( eval { $callback->( $self, @{$self}{qw(lock_spec lock_fh last_id)} ); 1; } ) { |
|
50
|
|
|
|
|
120
|
|
|
50
|
|
|
|
|
405
|
|
|
50
|
|
|
|
|
50016505
|
|
210
|
50
|
50
|
|
|
|
293
|
if ($Fail) { |
211
|
0
|
|
|
|
|
0
|
return "Callback reports an error: $Fail"; |
212
|
|
|
|
|
|
|
} |
213
|
50
|
|
|
|
|
389
|
return; |
214
|
|
|
|
|
|
|
} else { |
215
|
0
|
|
|
|
|
0
|
return "Callback failed: $@"; |
216
|
|
|
|
|
|
|
} |
217
|
|
|
|
|
|
|
} |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
sub acquire { |
220
|
60
|
|
|
60
|
1
|
195
|
my $self= shift; |
221
|
60
|
|
|
|
|
202
|
my $dbh= $self->{dbh}; |
222
|
|
|
|
|
|
|
|
223
|
60
|
50
|
|
|
|
139
|
$self->reset if !@{ $self->{files} || [] }; |
|
60
|
100
|
|
|
|
400
|
|
224
|
|
|
|
|
|
|
|
225
|
60
|
|
|
|
|
250
|
my $files= $self->{files}; |
226
|
60
|
|
|
|
|
419
|
while (@$files) { |
227
|
240
|
|
|
|
|
725
|
my $file= shift @$files; |
228
|
240
|
50
|
|
|
|
912
|
next if $self->is_ignored($file); |
229
|
240
|
|
|
|
|
653
|
my $spec= _cf( $self->{unprocessed}, $file ); |
230
|
240
|
|
|
|
|
571
|
my $fh; |
231
|
240
|
100
|
66
|
|
|
6772
|
if ( sysopen $fh, $spec, $self->{open_mode} and flock( $fh, LOCK_EX | LOCK_NB ) ) { |
232
|
50
|
|
|
|
|
2931
|
$self->{lock_fh}= $fh; |
233
|
50
|
|
|
|
|
288
|
$self->{lock_spec}= $spec; |
234
|
50
|
|
|
|
|
439
|
$self->debug_warn( 5, "acquired '$file': $spec" ); |
235
|
50
|
|
|
|
|
183
|
$self->{last_id}= $file; |
236
|
50
|
|
|
|
|
385
|
return $file; |
237
|
|
|
|
|
|
|
} |
238
|
|
|
|
|
|
|
} |
239
|
10
|
|
|
|
|
66
|
$self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); |
240
|
10
|
|
|
|
|
74
|
return; |
241
|
|
|
|
|
|
|
} |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
sub release { |
244
|
21
|
|
|
21
|
1
|
56
|
my $self= shift; |
245
|
|
|
|
|
|
|
|
246
|
21
|
100
|
|
|
|
233
|
flock( $self->{lock_fh}, LOCK_UN ) if $self->{lock_fh}; |
247
|
21
|
|
|
|
|
119
|
delete $self->{lock_fh}; |
248
|
21
|
|
|
|
|
37
|
delete $self->{lock_spec}; |
249
|
21
|
|
|
|
|
33
|
delete $self->{last_id}; |
250
|
21
|
|
|
|
|
558
|
return 1; |
251
|
|
|
|
|
|
|
} |
252
|
|
|
|
|
|
|
|
253
|
|
|
|
|
|
|
=head2 $object->fh() |
254
|
|
|
|
|
|
|
|
255
|
|
|
|
|
|
|
Return a filehandle to the currently acquired item. See the C |
256
|
|
|
|
|
|
|
argument in C for details on how to control the mode that the |
257
|
|
|
|
|
|
|
filehandle is opened with. |
258
|
|
|
|
|
|
|
|
259
|
|
|
|
|
|
|
=head2 $object->spec() |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
Return the full filespec for the currently acquired item. |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
=head2 $object->file() |
264
|
|
|
|
|
|
|
|
265
|
|
|
|
|
|
|
Return the filename (without path) of the currently acquired item. |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
Note that this is an alias for C<< $object->last_id() >>. |
268
|
|
|
|
|
|
|
|
269
|
|
|
|
|
|
|
=cut |
270
|
|
|
|
|
|
|
|
271
|
0
|
|
|
0
|
1
|
0
|
sub fh { $_[0]->{lock_fh} } |
272
|
0
|
|
|
0
|
1
|
0
|
sub spec { $_[0]->{lock_spec} } |
273
|
0
|
|
|
0
|
1
|
0
|
sub file { $_[0]->{last_id} } |
274
|
|
|
|
|
|
|
|
275
|
|
|
|
|
|
|
sub _mark_as { |
276
|
100
|
|
|
100
|
|
350
|
my ( $self, $key, $id )= @_; |
277
|
|
|
|
|
|
|
|
278
|
100
|
50
|
|
|
|
334
|
if ( $self->{$key} ) { |
279
|
100
|
50
|
|
|
|
302
|
if ( ref $self->{$key} ) { |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
# assume it must be a callback |
282
|
0
|
|
|
|
|
0
|
$self->debug_warn( 5, "executing mark_as callback for '$key'" ); |
283
|
0
|
|
|
|
|
0
|
$self->{$key}->( $self, $key, $self->{lock_spec}, $self->{lock_fh}, $self->{last_id} ); |
284
|
0
|
|
|
|
|
0
|
return; |
285
|
|
|
|
|
|
|
} |
286
|
100
|
|
|
|
|
368
|
my $spec= _cf( $self->{$key}, $self->{last_id} ); |
287
|
100
|
50
|
|
|
|
5785
|
rename $self->{lock_spec}, $spec |
288
|
|
|
|
|
|
|
or confess "$$: Failed to rename '$self->{lock_spec}' to '$spec':$!"; |
289
|
100
|
|
|
|
|
704
|
$self->{lock_spec}= $spec; |
290
|
|
|
|
|
|
|
} |
291
|
|
|
|
|
|
|
} |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
sub DESTROY { |
294
|
5
|
|
|
5
|
|
3010669
|
my $self= shift; |
295
|
5
|
50
|
|
|
|
35
|
$self->release() if $self; |
296
|
|
|
|
|
|
|
} |
297
|
|
|
|
|
|
|
|
298
|
|
|
|
|
|
|
=head1 AUTHOR |
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
Yves Orton, C<< >> |
301
|
|
|
|
|
|
|
|
302
|
|
|
|
|
|
|
=head1 BUGS |
303
|
|
|
|
|
|
|
|
304
|
|
|
|
|
|
|
Please report any bugs or feature requests to |
305
|
|
|
|
|
|
|
C, or through the web interface at |
306
|
|
|
|
|
|
|
L. |
307
|
|
|
|
|
|
|
|
308
|
|
|
|
|
|
|
I will be notified, and then you'll automatically be notified of progress on |
309
|
|
|
|
|
|
|
your bug as I make changes. |
310
|
|
|
|
|
|
|
|
311
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
Igor Sutton for ideas, testing and support |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head1 COPYRIGHT & LICENSE |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
Copyright 2008 Yves Orton, all rights reserved. |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
320
|
|
|
|
|
|
|
under the same terms as Perl itself. |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=cut |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
1; # End of Data::Consumer::Dir |
325
|
|
|
|
|
|
|
|