line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Data::Consumer; |
2
|
|
|
|
|
|
|
|
3
|
11
|
|
|
11
|
|
57125
|
use warnings; |
|
11
|
|
|
|
|
28
|
|
|
11
|
|
|
|
|
355
|
|
4
|
11
|
|
|
11
|
|
60
|
use strict; |
|
11
|
|
|
|
|
21
|
|
|
11
|
|
|
|
|
256
|
|
5
|
11
|
|
|
11
|
|
53
|
use Carp qw(confess cluck); |
|
11
|
|
|
|
|
19
|
|
|
11
|
|
|
|
|
555
|
|
6
|
11
|
|
|
11
|
|
52
|
use vars qw/$Debug $VERSION $Fail $Cmd/; |
|
11
|
|
|
|
|
22
|
|
|
11
|
|
|
|
|
9644
|
|
7
|
|
|
|
|
|
|
|
8
|
|
|
|
|
|
|
# This code was formatted with the following perltidy options: |
9
|
|
|
|
|
|
|
# -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis |
10
|
|
|
|
|
|
|
# If you patch it please use the same options for your patch. |
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
=head1 NAME |
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
Data::Consumer - Repeatedly consume a data resource in a robust way |
15
|
|
|
|
|
|
|
|
16
|
|
|
|
|
|
|
=head1 VERSION |
17
|
|
|
|
|
|
|
|
18
|
|
|
|
|
|
|
Version 0.17 |
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
=cut |
21
|
|
|
|
|
|
|
|
22
|
|
|
|
|
|
|
$VERSION= '0.17'; |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
=head1 SYNOPSIS |
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
use Data::Consumer; |
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
my $consumer = Data::Consumer->new( |
29
|
|
|
|
|
|
|
type => $consumer_name, |
30
|
|
|
|
|
|
|
unprocessed => $unprocessed, |
31
|
|
|
|
|
|
|
working => $working, |
32
|
|
|
|
|
|
|
processed => $processed, |
33
|
|
|
|
|
|
|
failed => $failed, |
34
|
|
|
|
|
|
|
max_passes => $num_or_undef, |
35
|
|
|
|
|
|
|
max_process => $num_or_undef, |
36
|
|
|
|
|
|
|
max_elapsed => $seconds_or_undef, |
37
|
|
|
|
|
|
|
); |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
$consumer->consume( sub { |
40
|
|
|
|
|
|
|
my $id = shift; |
41
|
|
|
|
|
|
|
print "processed $id\n"; |
42
|
|
|
|
|
|
|
} ); |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
=head1 DESCRIPTION |
45
|
|
|
|
|
|
|
|
46
|
|
|
|
|
|
|
It is a common requirement to need to process a feed of items of some |
47
|
|
|
|
|
|
|
sort in a robust manner. Such a feed might be records that are inserted |
48
|
|
|
|
|
|
|
into a table, or files dropped in a delivery directory. |
49
|
|
|
|
|
|
|
Writing a script that handles all the edge cases, like getting "stuck" |
50
|
|
|
|
|
|
|
on a failed item, and manages things like locking so that the script |
51
|
|
|
|
|
|
|
can be parallelized can be tricky and is certainly repetitive. |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
The aim of L is to provide a framework to allow writing |
54
|
|
|
|
|
|
|
such consumer type scripts as easy as writing a callback that processes |
55
|
|
|
|
|
|
|
each item. The framework handles the rest. |
56
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
The basic idea is that one need only use, or in the case of a feed type |
58
|
|
|
|
|
|
|
not already supported, define a L subclass |
59
|
|
|
|
|
|
|
which implements a few reasonably well defined primitive methods which |
60
|
|
|
|
|
|
|
handle the required tasks, and then the L methods use |
61
|
|
|
|
|
|
|
those to provide a DWIMily consistent interface to the end consumer. |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
Currently L is distributed with two subclasses, (well |
64
|
|
|
|
|
|
|
three actually, but L is deprecated in favour |
65
|
|
|
|
|
|
|
of L) L for handling |
66
|
|
|
|
|
|
|
records in a MySQL db (using the MySQL C function), and |
67
|
|
|
|
|
|
|
L for handling a drop directory scenario (like |
68
|
|
|
|
|
|
|
for FTP or a mail directory). |
69
|
|
|
|
|
|
|
|
70
|
|
|
|
|
|
|
Once a resource type has been defined as a L subclass |
71
|
|
|
|
|
|
|
the use pattern is to construct the subclass with the appropriate |
72
|
|
|
|
|
|
|
arguments, and then call consume with a callback. |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
=head2 The Consumer Pattern |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
The consumer pattern is where code wants to consume an 'atomic' resource |
77
|
|
|
|
|
|
|
piece by piece. The consuming code doesn't really want to worry much |
78
|
|
|
|
|
|
|
about how they got the piece, a task that should be handled by the framework. |
79
|
|
|
|
|
|
|
The consumer subclasses assume that the resource can be modeled as a |
80
|
|
|
|
|
|
|
queue (that there is some ordering principle by which they can be processed |
81
|
|
|
|
|
|
|
in a predictable sequence). The consume pattern in full glory is something |
82
|
|
|
|
|
|
|
very close to the following following pseudo code. The items marked with |
83
|
|
|
|
|
|
|
asterisks are where user callbacks may be invoked: |
84
|
|
|
|
|
|
|
|
85
|
|
|
|
|
|
|
DO |
86
|
|
|
|
|
|
|
RESET TO THE BEGINNING OF THE QUEUE |
87
|
|
|
|
|
|
|
WHILE QUEUE NOT EMPTY AND CAN *PROCEED* |
88
|
|
|
|
|
|
|
ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE |
89
|
|
|
|
|
|
|
MARK AS 'WORKING' |
90
|
|
|
|
|
|
|
*PROCESS* ITEM |
91
|
|
|
|
|
|
|
IF PROCESSING FAILED |
92
|
|
|
|
|
|
|
MARK AS 'FAILED' |
93
|
|
|
|
|
|
|
OTHERWISE |
94
|
|
|
|
|
|
|
MARK AS 'PROCESSED' |
95
|
|
|
|
|
|
|
SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED' |
96
|
|
|
|
|
|
|
UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED |
97
|
|
|
|
|
|
|
RELEASE ANY LOCKS STILL HELD |
98
|
|
|
|
|
|
|
|
99
|
|
|
|
|
|
|
This implies that each item potentially has four states: C, |
100
|
|
|
|
|
|
|
C, C and C. In a database these might be |
101
|
|
|
|
|
|
|
values in a field, in a drop directory scenario these would be different |
102
|
|
|
|
|
|
|
directories, but with all of them they would normally be supplied as |
103
|
|
|
|
|
|
|
values to the L subclass being created. |
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
=head2 Subclassing Data::Consumer |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
L can be used with any resource type that can be modeled |
108
|
|
|
|
|
|
|
as a queue, supports some form of advisory locking mechanism, and |
109
|
|
|
|
|
|
|
provides a way to discriminate between at least the C and |
110
|
|
|
|
|
|
|
C state. |
111
|
|
|
|
|
|
|
|
112
|
|
|
|
|
|
|
The routines that must be defined for a new consumer type are C, |
113
|
|
|
|
|
|
|
C, C, C, and C<_mark_as()>, |
114
|
|
|
|
|
|
|
C<_do_callback()>. |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
=over 4 |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
=item new |
119
|
|
|
|
|
|
|
|
120
|
|
|
|
|
|
|
It is almost for sure that a subclass will need to override the default |
121
|
|
|
|
|
|
|
constructor. All L objects are blessed hashes, and in |
122
|
|
|
|
|
|
|
fact you should always call the parents classes constructor first with: |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
my $self= $class->SUPER::new(); |
125
|
|
|
|
|
|
|
|
126
|
|
|
|
|
|
|
=item reset |
127
|
|
|
|
|
|
|
|
128
|
|
|
|
|
|
|
This routine is used to reset the objects internal state so the next call to acquire |
129
|
|
|
|
|
|
|
will return the first available item in the queue. |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
=item acquire |
132
|
|
|
|
|
|
|
|
133
|
|
|
|
|
|
|
This routine is to find and in some way lock the next item in the queue. It should ensure |
134
|
|
|
|
|
|
|
that it call is_ignored() on each item to verify the item has not been requested to be |
135
|
|
|
|
|
|
|
ignored. |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
=item release |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
This routine is to release any held locks in the object. |
140
|
|
|
|
|
|
|
|
141
|
|
|
|
|
|
|
=item _mark_as |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
This routine is called to "mark" an item as a particular state. It |
144
|
|
|
|
|
|
|
should be able to handle user supplied values. For instance |
145
|
|
|
|
|
|
|
L implements this as an update statement that |
146
|
|
|
|
|
|
|
maps user supplied values to the consumer state names. |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
Possible states are: C, C, C, |
149
|
|
|
|
|
|
|
C. |
150
|
|
|
|
|
|
|
|
151
|
|
|
|
|
|
|
=item _do_callback |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
This routine is used to call the user supplied callback with the correct |
154
|
|
|
|
|
|
|
arguments. What arguments are appropriate for the callback are context |
155
|
|
|
|
|
|
|
dependent on the type of class. For instance in L |
156
|
|
|
|
|
|
|
calls the callback with the arguments C<($consumer, $id, $dbh)> whereas |
157
|
|
|
|
|
|
|
L calls the callback with the arguments |
158
|
|
|
|
|
|
|
C<($consumer, $filespec, $filehandle, $filename)>. The point is that the |
159
|
|
|
|
|
|
|
end user should be passed the arguments that make sense, not necessarily |
160
|
|
|
|
|
|
|
the same thing for each consumer type. |
161
|
|
|
|
|
|
|
|
162
|
|
|
|
|
|
|
=back |
163
|
|
|
|
|
|
|
|
164
|
|
|
|
|
|
|
Every well-behaved L subclass should include the |
165
|
|
|
|
|
|
|
functional equivalent of the following code in its .pm file: |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
use base 'Data::Consumer'; |
168
|
|
|
|
|
|
|
__PACKAGE__->register(); |
169
|
|
|
|
|
|
|
|
170
|
|
|
|
|
|
|
This will ensure that it can be properly loaded by |
171
|
|
|
|
|
|
|
C<< Data::Consumer->new(type=>$shortname) >>. |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
It is also normal for a L subclass to provide special |
174
|
|
|
|
|
|
|
methods as needed. For instance C<< Data::Consumer::Dir->fh() >> and |
175
|
|
|
|
|
|
|
C<< Data::Consumer::MySQL->dbh() >>. |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
|
179
|
|
|
|
|
|
|
=head1 METHODS |
180
|
|
|
|
|
|
|
|
181
|
|
|
|
|
|
|
=head2 CLASS->new(%opts) |
182
|
|
|
|
|
|
|
|
183
|
|
|
|
|
|
|
Constructor. Normally L's constructor is not called |
184
|
|
|
|
|
|
|
directly, instead the constructor of a subclass is used. However to |
185
|
|
|
|
|
|
|
make it easier to have a data driven load process L |
186
|
|
|
|
|
|
|
accepts the C argument which should specify the the short name of |
187
|
|
|
|
|
|
|
the subclass (the part after C) or the full name of |
188
|
|
|
|
|
|
|
the subclass. |
189
|
|
|
|
|
|
|
|
190
|
|
|
|
|
|
|
Thus |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
Data::Consumer->new(type=>'MySQL',%args); |
193
|
|
|
|
|
|
|
|
194
|
|
|
|
|
|
|
is exactly equivalent to calling |
195
|
|
|
|
|
|
|
|
196
|
|
|
|
|
|
|
Data::Consumer::MySQL->new(%args); |
197
|
|
|
|
|
|
|
|
198
|
|
|
|
|
|
|
except that the former will automatically require or use the appropriate module |
199
|
|
|
|
|
|
|
and the latter necessitates that you do so yourself. |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
Every L subclass constructor supports the following |
202
|
|
|
|
|
|
|
arguments on top of any that are subclass specific. Additionally some |
203
|
|
|
|
|
|
|
arguments are universally used, but have different meaning depending on |
204
|
|
|
|
|
|
|
the subclass. |
205
|
|
|
|
|
|
|
|
206
|
|
|
|
|
|
|
=over 4 |
207
|
|
|
|
|
|
|
|
208
|
|
|
|
|
|
|
=item unprocessed |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
How to tell if the item is unprocessed. |
211
|
|
|
|
|
|
|
|
212
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
213
|
|
|
|
|
|
|
subclass involved. |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
=item working |
216
|
|
|
|
|
|
|
|
217
|
|
|
|
|
|
|
How to tell if the item is currently being worked on. |
218
|
|
|
|
|
|
|
|
219
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
220
|
|
|
|
|
|
|
subclass involved. |
221
|
|
|
|
|
|
|
|
222
|
|
|
|
|
|
|
=item processed |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
How to tell if the item has already been worked on. |
225
|
|
|
|
|
|
|
|
226
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
227
|
|
|
|
|
|
|
subclass involved. |
228
|
|
|
|
|
|
|
|
229
|
|
|
|
|
|
|
=item failed |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
How to tell if processing failed while handling the item. |
232
|
|
|
|
|
|
|
|
233
|
|
|
|
|
|
|
How this argument is interpreted depends on the L |
234
|
|
|
|
|
|
|
subclass involved. |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
=item max_passes => $num_or_undef |
237
|
|
|
|
|
|
|
|
238
|
|
|
|
|
|
|
Normally C will loop through the data set until it is |
239
|
|
|
|
|
|
|
exhausted. By setting this parameter you can control the maximum number |
240
|
|
|
|
|
|
|
of iterations, for instance setting it to C<1> will result in a single |
241
|
|
|
|
|
|
|
pass through the data per invocation. If C<0> (or any other false value) |
242
|
|
|
|
|
|
|
is treated as meaning "loop until exhausted". |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
=item max_processed => $num_or_undef |
245
|
|
|
|
|
|
|
|
246
|
|
|
|
|
|
|
Maximum number of items to process per invocation. |
247
|
|
|
|
|
|
|
|
248
|
|
|
|
|
|
|
If set to a false value there is no limit. |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
=item max_failed => $num_or_undef |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
Maximum number of failed process attempts that may occur before consume will stop. |
253
|
|
|
|
|
|
|
If set to a false value there is no limit. Setting this to 1 will cause processing |
254
|
|
|
|
|
|
|
to stop after the first failure. |
255
|
|
|
|
|
|
|
|
256
|
|
|
|
|
|
|
=item max_elapsed => $seconds_or_undef |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
Maximum amount of time that may have elapsed when starting a new |
259
|
|
|
|
|
|
|
process. If more than this value has elapsed then no further processing |
260
|
|
|
|
|
|
|
occurs. If C<0> (or any false value) then there is no time limit. |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
=item proceed => $code_ref |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
This is a callback that may be used to control the looping process in |
265
|
|
|
|
|
|
|
consume via the C method. See the documentation of |
266
|
|
|
|
|
|
|
C and C |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
=item sweep => $bool |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
*** NOTE CURRENTLY THIS OPTION IS DISABLED *** |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
If this parameter is true, and there are four modes defined |
273
|
|
|
|
|
|
|
(C, C, C, C) then consume will |
274
|
|
|
|
|
|
|
perform a "sweep up" after every pass, which is responsible for moving |
275
|
|
|
|
|
|
|
"abandonded" files from the working directory (such as from a previous |
276
|
|
|
|
|
|
|
process that segfaulted during processing). Generally this should |
277
|
|
|
|
|
|
|
not be necessary. |
278
|
|
|
|
|
|
|
|
279
|
|
|
|
|
|
|
=back |
280
|
|
|
|
|
|
|
|
281
|
|
|
|
|
|
|
|
282
|
|
|
|
|
|
|
=head2 CLASS->register(@alias) |
283
|
|
|
|
|
|
|
|
284
|
|
|
|
|
|
|
Used by subclasses to register themselves as a L |
285
|
|
|
|
|
|
|
subclass and register any additional aliases that the class may be |
286
|
|
|
|
|
|
|
identified as. |
287
|
|
|
|
|
|
|
|
288
|
|
|
|
|
|
|
Will throw an exception if any of the aliases are already associated to |
289
|
|
|
|
|
|
|
a different class. |
290
|
|
|
|
|
|
|
|
291
|
|
|
|
|
|
|
When called on a subclass in list context returns a list of the |
292
|
|
|
|
|
|
|
subclasses registered aliases, |
293
|
|
|
|
|
|
|
|
294
|
|
|
|
|
|
|
If called on L in list context returns a list of all |
295
|
|
|
|
|
|
|
alias class mappings. |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
=cut |
298
|
|
|
|
|
|
|
|
299
|
|
|
|
|
|
|
|
300
|
|
|
|
|
|
|
|
301
|
|
|
|
|
|
|
=head2 $class_or_object->debug_warn_hook() |
302
|
|
|
|
|
|
|
|
303
|
|
|
|
|
|
|
Specify a callback to use to capture diagnostics data produced |
304
|
|
|
|
|
|
|
by a Data::Consumer object. |
305
|
|
|
|
|
|
|
|
306
|
|
|
|
|
|
|
If called as a class method, sets the default object for all |
307
|
|
|
|
|
|
|
Data::Consumer objects that have not explicitly set a hook. |
308
|
|
|
|
|
|
|
|
309
|
|
|
|
|
|
|
If called as an object method, sets the hook to use for that |
310
|
|
|
|
|
|
|
object alone. |
311
|
|
|
|
|
|
|
|
312
|
|
|
|
|
|
|
Returns the current effective hook. Defaults to use |
313
|
|
|
|
|
|
|
the C method for the object. Thus |
314
|
|
|
|
|
|
|
it can be overridden by a subclass if necessary. |
315
|
|
|
|
|
|
|
|
316
|
|
|
|
|
|
|
The hook will be called with the arguments |
317
|
|
|
|
|
|
|
|
318
|
|
|
|
|
|
|
($consumer,$level,@lines) |
319
|
|
|
|
|
|
|
|
320
|
|
|
|
|
|
|
and is not expected to return anything. |
321
|
|
|
|
|
|
|
|
322
|
|
|
|
|
|
|
=cut |
323
|
|
|
|
|
|
|
|
324
|
|
|
|
|
|
|
my $debug_warn_hook; |
325
|
|
|
|
|
|
|
sub debug_warn_hook { |
326
|
96
|
|
|
96
|
1
|
176
|
my $self= shift; |
327
|
96
|
50
|
|
|
|
503
|
if (@_) { |
328
|
0
|
0
|
|
|
|
0
|
if (ref $self) { |
329
|
0
|
|
|
|
|
0
|
$self->{debug_warn_hook}= shift; |
330
|
|
|
|
|
|
|
} else { |
331
|
0
|
|
|
|
|
0
|
$debug_warn_hook= shift; |
332
|
|
|
|
|
|
|
} |
333
|
|
|
|
|
|
|
} |
334
|
96
|
50
|
66
|
|
|
678
|
if (ref $self and defined $self->{debug_warn_hook}) { |
335
|
0
|
|
|
|
|
0
|
return $self->{debug_warn_hook}; |
336
|
|
|
|
|
|
|
} |
337
|
96
|
|
33
|
|
|
1007
|
return $debug_warn_hook || $self->can('default_debug_warn'); |
338
|
|
|
|
|
|
|
} |
339
|
|
|
|
|
|
|
|
340
|
|
|
|
|
|
|
=head2 $class_or_object->default_debug_warn($level,$debug); |
341
|
|
|
|
|
|
|
|
342
|
|
|
|
|
|
|
Use warn to output diagnostics. Message includes the process id |
343
|
|
|
|
|
|
|
and the class name. |
344
|
|
|
|
|
|
|
|
345
|
|
|
|
|
|
|
=cut |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
sub default_debug_warn { |
348
|
96
|
|
|
96
|
1
|
251
|
my $self= shift; |
349
|
96
|
|
|
|
|
299
|
my $level= shift; |
350
|
96
|
50
|
|
|
|
520
|
cluck($level) if $level=~/\D/; |
351
|
96
|
|
|
|
|
377
|
my $debug_level= $self->debug_level; |
352
|
96
|
50
|
|
|
|
380
|
if ( $debug_level > $level ) { |
353
|
0
|
|
0
|
|
|
0
|
warn ref($self) || $self, "\t$$\t>>> $_\n" for @_; |
354
|
|
|
|
|
|
|
} |
355
|
|
|
|
|
|
|
} |
356
|
|
|
|
|
|
|
|
357
|
|
|
|
|
|
|
=head2 $class_or_object->debug_level($level,@debug_lines) |
358
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
Set the minimum debug level. |
360
|
|
|
|
|
|
|
|
361
|
|
|
|
|
|
|
When called as an object method sets the value of that object |
362
|
|
|
|
|
|
|
alone. undef is distinct from 0 in that undef results in |
363
|
|
|
|
|
|
|
the global debug level being used for that object. |
364
|
|
|
|
|
|
|
|
365
|
|
|
|
|
|
|
When called as a class method sets the value for all objects |
366
|
|
|
|
|
|
|
which do not have a defined debug level. |
367
|
|
|
|
|
|
|
|
368
|
|
|
|
|
|
|
Returns the current effective debug level for the object or |
369
|
|
|
|
|
|
|
class. |
370
|
|
|
|
|
|
|
|
371
|
|
|
|
|
|
|
=cut |
372
|
|
|
|
|
|
|
|
373
|
|
|
|
|
|
|
|
374
|
|
|
|
|
|
|
sub debug_level { |
375
|
96
|
|
|
96
|
1
|
189
|
my $self= shift; |
376
|
96
|
50
|
|
|
|
278
|
if (@_) { |
377
|
0
|
0
|
|
|
|
0
|
if (ref $self) { |
378
|
0
|
|
|
|
|
0
|
$self->{debug_level}= shift; |
379
|
|
|
|
|
|
|
} else { |
380
|
0
|
|
|
|
|
0
|
$Debug= shift; |
381
|
|
|
|
|
|
|
} |
382
|
|
|
|
|
|
|
} |
383
|
96
|
50
|
66
|
|
|
518
|
if (ref $self and defined $self->{debug_level}) { |
384
|
0
|
|
|
|
|
0
|
return $self->{debug_level}; |
385
|
|
|
|
|
|
|
} |
386
|
96
|
|
50
|
|
|
476
|
return $Debug || 0; |
387
|
|
|
|
|
|
|
|
388
|
|
|
|
|
|
|
} |
389
|
|
|
|
|
|
|
|
390
|
|
|
|
|
|
|
=head2 $class_or_object->debug_warn($level,@debug_lines) |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
If the current debugging level is above C<$level> then call |
393
|
|
|
|
|
|
|
the current debug_warn_hook() to output a set of diagnostic |
394
|
|
|
|
|
|
|
messages. |
395
|
|
|
|
|
|
|
|
396
|
|
|
|
|
|
|
=cut |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
sub debug_warn { |
400
|
96
|
|
|
96
|
1
|
271
|
my $self=shift; |
401
|
96
|
|
|
|
|
255
|
my $level=shift; |
402
|
96
|
|
|
|
|
323
|
my $hook=$self->debug_warn_hook; |
403
|
96
|
100
|
50
|
|
|
587
|
my $pfx= ref $self ? $self->{debug_pfx} || '' : ''; |
404
|
96
|
|
|
|
|
342
|
$hook->($self,$level,map { $pfx.$_ } @_); |
|
96
|
|
|
|
|
532
|
|
405
|
|
|
|
|
|
|
} |
406
|
|
|
|
|
|
|
|
407
|
0
|
|
|
|
|
0
|
BEGIN { |
408
|
11
|
|
|
11
|
|
48
|
my %alias2class; |
409
|
|
|
|
|
|
|
my %class2alias; |
410
|
11
|
50
|
33
|
|
|
2172
|
$Debug and $Debug >= 5 and warn "\n"; |
411
|
|
|
|
|
|
|
|
412
|
|
|
|
|
|
|
sub register { |
413
|
10
|
|
|
10
|
1
|
39
|
my $class= shift; |
414
|
|
|
|
|
|
|
|
415
|
10
|
50
|
|
|
|
37
|
ref $class |
416
|
|
|
|
|
|
|
and confess "register() is a class method and cannot be called on an object\n"; |
417
|
10
|
|
|
|
|
21
|
my $pack= __PACKAGE__; |
418
|
|
|
|
|
|
|
|
419
|
10
|
50
|
|
|
|
39
|
if ( $class eq $pack ) { |
420
|
0
|
0
|
|
|
|
0
|
return wantarray ? %alias2class : 0 + keys %alias2class; |
421
|
|
|
|
|
|
|
} |
422
|
|
|
|
|
|
|
|
423
|
10
|
|
|
|
|
178
|
( my $std_name= $class ) =~ s/^\Q$pack\E:://; |
424
|
10
|
|
|
|
|
33
|
$std_name =~ s/::/-/g; |
425
|
|
|
|
|
|
|
|
426
|
10
|
|
|
|
|
15
|
my @failed; |
427
|
10
|
|
|
|
|
26
|
for my $name ( $class, $std_name, @_ ) { |
428
|
20
|
50
|
33
|
|
|
64
|
if ( $alias2class{$name} and $alias2class{$name} ne $class ) { |
429
|
0
|
|
|
|
|
0
|
push @failed, $name; |
430
|
0
|
|
|
|
|
0
|
next; |
431
|
|
|
|
|
|
|
} |
432
|
20
|
|
|
|
|
88
|
__PACKAGE__->debug_warn( 5, "registered '$name' as an alias of '$class'" ); |
433
|
20
|
|
|
|
|
45
|
$alias2class{$name}= $class; |
434
|
20
|
|
|
|
|
51
|
$class2alias{$class}{$name}= $class; |
435
|
|
|
|
|
|
|
} |
436
|
|
|
|
|
|
|
@failed |
437
|
|
|
|
|
|
|
and confess "Failed to register aliases for '$class' as they are already used\n", |
438
|
10
|
50
|
|
|
|
29
|
join( "\n", map { "\t'$_' is already assigned to '$alias2class{$_}'" } @failed ), |
|
0
|
|
|
|
|
0
|
|
439
|
|
|
|
|
|
|
"\n"; |
440
|
10
|
50
|
|
|
|
33
|
return wantarray ? %{ $class2alias{$class} } : 0 + keys %{ $class2alias{$class} }; |
|
0
|
|
|
|
|
0
|
|
|
10
|
|
|
|
|
8229
|
|
441
|
|
|
|
|
|
|
} |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
sub new { |
444
|
5
|
|
|
5
|
1
|
47
|
my ( $class, %opts )= @_; |
445
|
5
|
50
|
|
|
|
68
|
ref $class |
446
|
|
|
|
|
|
|
and confess "new() is a class method and cannot be called on an object\n"; |
447
|
|
|
|
|
|
|
|
448
|
5
|
50
|
|
|
|
45
|
if ( $class eq __PACKAGE__ ) { |
449
|
|
|
|
|
|
|
my $type= $opts{type} |
450
|
0
|
0
|
|
|
|
0
|
or confess "'type' is a mandatory named parameter for $class->new()\n"; |
451
|
0
|
|
|
|
|
0
|
my $full = $type; |
452
|
0
|
0
|
|
|
|
0
|
if (!$alias2class{$full}) { |
453
|
0
|
0
|
|
|
|
0
|
if ($full!~/::/) { |
454
|
0
|
|
|
|
|
0
|
$full=~s/-/::/g; |
455
|
0
|
|
|
|
|
0
|
$full=join '::',$class,$full; |
456
|
|
|
|
|
|
|
} |
457
|
0
|
0
|
|
|
|
0
|
eval "require $full; 1" |
458
|
|
|
|
|
|
|
or confess "'type' parameter '$type' could not be loaded properly: $@\n"; |
459
|
|
|
|
|
|
|
} |
460
|
0
|
0
|
|
|
|
0
|
$class= $alias2class{$full} |
461
|
|
|
|
|
|
|
or confess "'type' parameter '$type' mapped to '$full' which does not seem to exist\n"; |
462
|
|
|
|
|
|
|
} |
463
|
5
|
|
|
|
|
70
|
my $object= bless {}, $class; |
464
|
5
|
|
|
|
|
192
|
$class->debug_warn( 5, "created new object '$object'" ); |
465
|
5
|
|
|
|
|
23
|
return $object; |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
} |
468
|
|
|
|
|
|
|
|
469
|
|
|
|
|
|
|
=head2 $object->last_id() |
470
|
|
|
|
|
|
|
|
471
|
|
|
|
|
|
|
Returns the identifier for the last item acquired. |
472
|
|
|
|
|
|
|
|
473
|
|
|
|
|
|
|
Returns undef if acquire has never been called or if the last |
474
|
|
|
|
|
|
|
attempt to acquire data failed because none was available. |
475
|
|
|
|
|
|
|
|
476
|
|
|
|
|
|
|
=cut |
477
|
|
|
|
|
|
|
|
478
|
|
|
|
|
|
|
sub last_id { |
479
|
150
|
|
|
150
|
1
|
325
|
my $self= shift; |
480
|
150
|
|
|
|
|
637
|
return $self->{last_id}; |
481
|
|
|
|
|
|
|
} |
482
|
|
|
|
|
|
|
|
483
|
|
|
|
|
|
|
# Until i figure out to make gedit handle begin/end directives this has to |
484
|
|
|
|
|
|
|
# stay commented out |
485
|
|
|
|
|
|
|
#=begin dev |
486
|
|
|
|
|
|
|
# |
487
|
|
|
|
|
|
|
#=head2 $object->_mark_as($type,$id) |
488
|
|
|
|
|
|
|
# |
489
|
|
|
|
|
|
|
#** Must be overriden ** |
490
|
|
|
|
|
|
|
# |
491
|
|
|
|
|
|
|
#Mark an item as a particular type if the object defines that type. |
492
|
|
|
|
|
|
|
# |
493
|
|
|
|
|
|
|
#This is wrapped by mark_as() for error checking, so you are guaranteed |
494
|
|
|
|
|
|
|
#that $type will be one of |
495
|
|
|
|
|
|
|
# |
496
|
|
|
|
|
|
|
# 'unprocessed', 'working', 'processed', 'failed' |
497
|
|
|
|
|
|
|
# |
498
|
|
|
|
|
|
|
#and that $object->{$type} will be true value, and that $id will be from |
499
|
|
|
|
|
|
|
#the currently acquired item. |
500
|
|
|
|
|
|
|
# |
501
|
|
|
|
|
|
|
#=end dev |
502
|
|
|
|
|
|
|
|
503
|
|
|
|
|
|
|
=head2 $object->mark_as($type) |
504
|
|
|
|
|
|
|
|
505
|
|
|
|
|
|
|
Mark an item as a particular type if the object defines that type. |
506
|
|
|
|
|
|
|
|
507
|
|
|
|
|
|
|
Allowed types are C, C, C, C |
508
|
|
|
|
|
|
|
|
509
|
|
|
|
|
|
|
=cut |
510
|
|
|
|
|
|
|
|
511
|
0
|
|
|
0
|
|
0
|
sub _mark_as { confess "must be overriden" } |
512
|
|
|
|
|
|
|
|
513
|
0
|
|
|
|
|
0
|
BEGIN { |
514
|
11
|
|
|
11
|
|
45
|
my ( %valid, @valid ); |
515
|
11
|
|
|
|
|
29
|
@valid= qw ( unprocessed working processed failed ); |
516
|
11
|
|
|
|
|
9326
|
@valid{@valid}= ( 1 .. @valid ); |
517
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
sub mark_as { |
519
|
100
|
|
|
100
|
1
|
375
|
my $self= shift @_; |
520
|
100
|
|
|
|
|
233
|
my $key= shift @_; |
521
|
|
|
|
|
|
|
|
522
|
|
|
|
|
|
|
$valid{$key} |
523
|
|
|
|
|
|
|
or confess "Unknown type in mark_as(), valid options are ", |
524
|
100
|
50
|
|
|
|
400
|
join( ", ", map { "'$_'" } @valid ), |
|
0
|
|
|
|
|
0
|
|
525
|
|
|
|
|
|
|
"\n"; |
526
|
|
|
|
|
|
|
|
527
|
100
|
50
|
|
|
|
494
|
my $id= @_ ? shift @_ : $self->last_id; |
528
|
100
|
50
|
|
|
|
370
|
defined $id |
529
|
|
|
|
|
|
|
or confess "Nothing acquired to be marked as '$key' in mark_as.\n"; |
530
|
|
|
|
|
|
|
|
531
|
100
|
50
|
|
|
|
388
|
return unless defined $self->{$key}; |
532
|
100
|
|
|
|
|
463
|
return $self->_mark_as( $key, $id ); |
533
|
|
|
|
|
|
|
} |
534
|
|
|
|
|
|
|
} |
535
|
|
|
|
|
|
|
|
536
|
|
|
|
|
|
|
=head2 $object->process($callback) |
537
|
|
|
|
|
|
|
|
538
|
|
|
|
|
|
|
Marks the current item as C and processes it using the |
539
|
|
|
|
|
|
|
C<$callback>. If the C<$callback> dies then the item is marked as |
540
|
|
|
|
|
|
|
C, otherwise the item is marked as C once the |
541
|
|
|
|
|
|
|
C<$callback> returns. The return value of the C<$callback> is ignored. |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
C<$callback> will be called with at least two arguments, the first being |
544
|
|
|
|
|
|
|
the $consumer object itself, and the second being an identifier for the |
545
|
|
|
|
|
|
|
current record. Normally additional, likely to be useful, arguments are |
546
|
|
|
|
|
|
|
provided as well, on a per subclass basis. For example |
547
|
|
|
|
|
|
|
L will pass in the consumer object, the id of the to |
548
|
|
|
|
|
|
|
be processed record, and a copy of the consumers database handle as well for |
549
|
|
|
|
|
|
|
convenience. On the other hand L will pass in the |
550
|
|
|
|
|
|
|
consumer object, followed by a filespecification for the file to be |
551
|
|
|
|
|
|
|
processed, an open filehandle to the file, and the filename itself (with |
552
|
|
|
|
|
|
|
no path). |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
The callback may call the methods 'leave', 'ignore', 'fail', and 'halt' on |
555
|
|
|
|
|
|
|
the consumer object before returning, typically by doing something like |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
return $consumer->ignore; |
558
|
|
|
|
|
|
|
|
559
|
|
|
|
|
|
|
this allows the callback to send specific signals to consume, specifically |
560
|
|
|
|
|
|
|
|
561
|
|
|
|
|
|
|
leave : return the item to the unprocessed state after the callback returns. |
562
|
|
|
|
|
|
|
ignore : return the item to the unprocessed state after the callback returns |
563
|
|
|
|
|
|
|
and never attempt to process it again with this consumer object. |
564
|
|
|
|
|
|
|
fail : same result as dieing in a callback, except without throwing an exception |
565
|
|
|
|
|
|
|
in the situation where there might be $SIG{__DIE__} hooks to worry about. |
566
|
|
|
|
|
|
|
halt : stop the consume() process after this has been executed |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
For further details always consult the relevant subclasses documentation for |
569
|
|
|
|
|
|
|
C |
570
|
|
|
|
|
|
|
|
571
|
|
|
|
|
|
|
=cut |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
sub process { |
574
|
50
|
|
|
50
|
1
|
119
|
my $self= shift; |
575
|
50
|
|
|
|
|
105
|
my $callback= shift; |
576
|
50
|
|
|
|
|
117
|
delete $self->{fail}; |
577
|
50
|
|
|
|
|
168
|
my $id= $self->last_id; |
578
|
50
|
50
|
|
|
|
172
|
defined $id |
579
|
|
|
|
|
|
|
or $self->error("Undefined last_id. Nothing acquired yet?"); |
580
|
50
|
|
|
|
|
248
|
$self->mark_as('working'); |
581
|
50
|
|
|
|
|
124
|
local $Cmd; |
582
|
50
|
|
|
|
|
105
|
delete $self->{defer_leave}; |
583
|
50
|
|
|
|
|
207
|
my $error= $self->_do_callback($callback); |
584
|
50
|
|
33
|
|
|
676
|
$error ||= $self->{fail}; |
585
|
50
|
50
|
|
|
|
201
|
if ( $error ) { |
586
|
0
|
|
|
|
|
0
|
$self->mark_as('failed'); |
587
|
0
|
|
|
|
|
0
|
$self->error($error); |
588
|
|
|
|
|
|
|
} else { |
589
|
50
|
50
|
|
|
|
311
|
if ($self->{defer_leave}) { |
590
|
0
|
|
|
|
|
0
|
$self->mark_as('unprocessed'); |
591
|
|
|
|
|
|
|
} else { |
592
|
50
|
|
|
|
|
389
|
$self->mark_as('processed'); |
593
|
|
|
|
|
|
|
} |
594
|
|
|
|
|
|
|
} |
595
|
50
|
|
|
|
|
257
|
return 1; |
596
|
|
|
|
|
|
|
} |
597
|
|
|
|
|
|
|
|
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
=head2 $consumer->leave() |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
Sometimes its useful to defer processing. This method when called |
602
|
|
|
|
|
|
|
from within a consume/process callback will result in the |
603
|
|
|
|
|
|
|
item being marked as 'unprocessed' after the callback returns |
604
|
|
|
|
|
|
|
(so long as it does not die). |
605
|
|
|
|
|
|
|
|
606
|
|
|
|
|
|
|
Typically this is invoked as |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
return $consumer->leave; |
609
|
|
|
|
|
|
|
|
610
|
|
|
|
|
|
|
from withing a consume/process callback. |
611
|
|
|
|
|
|
|
|
612
|
|
|
|
|
|
|
Returns $consumer. Will die if not 'unprocessed' state is defined. |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
=cut |
615
|
|
|
|
|
|
|
|
616
|
|
|
|
|
|
|
sub leave { |
617
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
618
|
0
|
0
|
|
|
|
0
|
confess("Can't leave as 'unprocessed' is undefined!") if not defined $self->{unprocessed}; |
619
|
0
|
|
|
|
|
0
|
$self->{defer_leave}++; |
620
|
0
|
|
|
|
|
0
|
return $self; |
621
|
|
|
|
|
|
|
} |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
=head2 $consumer->ignore(@list) |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
This can used to cause acquire to ignore each item in @list. |
626
|
|
|
|
|
|
|
|
627
|
|
|
|
|
|
|
If @list is empty then it is assumed it is being called from |
628
|
|
|
|
|
|
|
within consume/process and marks the currently acquired item |
629
|
|
|
|
|
|
|
as ignored and calls C<< $consumer->leave() >>. |
630
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
Returns $consumer. Will die if no 'unprocessed' state is defined. |
632
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
=cut |
634
|
|
|
|
|
|
|
|
635
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
sub ignore { |
637
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
638
|
0
|
0
|
|
|
|
0
|
if (@_) { |
639
|
0
|
|
|
|
|
0
|
for my $id (@_) { |
640
|
0
|
|
|
|
|
0
|
$self->{ignore}{$id}++; |
641
|
|
|
|
|
|
|
} |
642
|
|
|
|
|
|
|
} else { |
643
|
0
|
|
|
|
|
0
|
my $id= $self->last_id; |
644
|
0
|
|
|
|
|
0
|
$self->{ignore}{$id}++; |
645
|
0
|
|
|
|
|
0
|
$self->leave; |
646
|
|
|
|
|
|
|
} |
647
|
0
|
|
|
|
|
0
|
return $self; |
648
|
|
|
|
|
|
|
} |
649
|
|
|
|
|
|
|
|
650
|
|
|
|
|
|
|
=head2 $consumer->fail($message) |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
Same as doing C from within a consume/process callback except |
653
|
|
|
|
|
|
|
that no exception is thrown (no C<$SIG{__DIE__}> callbacks are invoked) and |
654
|
|
|
|
|
|
|
the error is deferred until the callback actually returns. |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
Typically used as |
657
|
|
|
|
|
|
|
|
658
|
|
|
|
|
|
|
return $consumer->fail; |
659
|
|
|
|
|
|
|
|
660
|
|
|
|
|
|
|
from within a consumer() callback. |
661
|
|
|
|
|
|
|
|
662
|
|
|
|
|
|
|
Returns the $consumer object. |
663
|
|
|
|
|
|
|
|
664
|
|
|
|
|
|
|
=cut |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
sub fail { |
667
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
668
|
0
|
|
|
|
|
0
|
$self->{fail}= shift; |
669
|
0
|
|
|
|
|
0
|
return $self; |
670
|
|
|
|
|
|
|
} |
671
|
|
|
|
|
|
|
|
672
|
|
|
|
|
|
|
=head2 $consumer->halt() |
673
|
|
|
|
|
|
|
|
674
|
|
|
|
|
|
|
Causes consume() to halt processing and exit once |
675
|
|
|
|
|
|
|
the callback returns. Typically invoked like |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
return $consumer->halt; |
678
|
|
|
|
|
|
|
|
679
|
|
|
|
|
|
|
or |
680
|
|
|
|
|
|
|
|
681
|
|
|
|
|
|
|
return $consumer->fail->halt; |
682
|
|
|
|
|
|
|
|
683
|
|
|
|
|
|
|
Returns the consumer object. |
684
|
|
|
|
|
|
|
|
685
|
|
|
|
|
|
|
=cut |
686
|
|
|
|
|
|
|
|
687
|
|
|
|
|
|
|
|
688
|
|
|
|
|
|
|
sub halt { |
689
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
690
|
0
|
|
|
|
|
0
|
$self->{halt}++; |
691
|
0
|
|
|
|
|
0
|
return $self; |
692
|
|
|
|
|
|
|
} |
693
|
|
|
|
|
|
|
|
694
|
|
|
|
|
|
|
|
695
|
|
|
|
|
|
|
|
696
|
|
|
|
|
|
|
=head2 $object->is_ignored($id) |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
Returns true if an item has been set to be ignored. If $id is omitted |
699
|
|
|
|
|
|
|
defaults to last_id |
700
|
|
|
|
|
|
|
|
701
|
|
|
|
|
|
|
=cut |
702
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
sub is_ignored { |
704
|
240
|
|
|
240
|
1
|
467
|
my $self= shift; |
705
|
240
|
50
|
|
|
|
663
|
my $id= @_ ? shift @_ : $self->last_id; |
706
|
240
|
50
|
|
|
|
612
|
return if !defined $id; |
707
|
240
|
50
|
|
|
|
999
|
return $self->{ignore}{$id} ? 1 : 0 |
708
|
|
|
|
|
|
|
} |
709
|
|
|
|
|
|
|
|
710
|
|
|
|
|
|
|
=head2 $object->reset() |
711
|
|
|
|
|
|
|
|
712
|
|
|
|
|
|
|
Reset the state of the object. |
713
|
|
|
|
|
|
|
|
714
|
|
|
|
|
|
|
=head2 $object->acquire() |
715
|
|
|
|
|
|
|
|
716
|
|
|
|
|
|
|
Acquire an item to be processed. |
717
|
|
|
|
|
|
|
|
718
|
|
|
|
|
|
|
Returns an identifier to be used to identify the item acquired. |
719
|
|
|
|
|
|
|
|
720
|
|
|
|
|
|
|
=head2 $object->release() |
721
|
|
|
|
|
|
|
|
722
|
|
|
|
|
|
|
Release any locks on the currently held item. |
723
|
|
|
|
|
|
|
|
724
|
|
|
|
|
|
|
Normally there is no need to call this directly. |
725
|
|
|
|
|
|
|
|
726
|
|
|
|
|
|
|
=cut |
727
|
|
|
|
|
|
|
|
728
|
0
|
|
|
0
|
1
|
0
|
sub reset { confess "abstract method must be overriden by subclass\n"; } |
729
|
0
|
|
|
0
|
1
|
0
|
sub acquire { confess "abstract method must be overriden by subclass\n"; } |
730
|
0
|
|
|
0
|
1
|
0
|
sub release { confess "abstract method must be overriden by subclass\n"; } |
731
|
|
|
|
|
|
|
|
732
|
|
|
|
|
|
|
=head2 $object->error() |
733
|
|
|
|
|
|
|
|
734
|
|
|
|
|
|
|
Calls the C callback if the user has provided one, otherwise |
735
|
|
|
|
|
|
|
calls C. Probably not all that useful for an end user. |
736
|
|
|
|
|
|
|
|
737
|
|
|
|
|
|
|
=cut |
738
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
sub error { |
740
|
0
|
|
|
0
|
1
|
0
|
my $self= shift; |
741
|
0
|
0
|
|
|
|
0
|
if ( $self->{error} ) { |
742
|
0
|
|
|
|
|
0
|
$self->{error}->(@_); |
743
|
|
|
|
|
|
|
} else { |
744
|
0
|
|
|
|
|
0
|
confess @_; |
745
|
|
|
|
|
|
|
} |
746
|
|
|
|
|
|
|
} |
747
|
|
|
|
|
|
|
|
748
|
|
|
|
|
|
|
=head2 $object->consume($callback) |
749
|
|
|
|
|
|
|
|
750
|
|
|
|
|
|
|
Consumes a data resource until it is exhausted using C, |
751
|
|
|
|
|
|
|
C, and C as appropriate. Normally this is the main |
752
|
|
|
|
|
|
|
method used by external processes. |
753
|
|
|
|
|
|
|
|
754
|
|
|
|
|
|
|
Before each attempt to acquire a new resource, and once at the end of |
755
|
|
|
|
|
|
|
each pass consume will call C to determine if it can do so. |
756
|
|
|
|
|
|
|
The user may hook into this by specifying a callback in the constructor. |
757
|
|
|
|
|
|
|
This callback will be executed with no args when it is in the inner loop |
758
|
|
|
|
|
|
|
(per item), and with the number of passes at the end of each pass |
759
|
|
|
|
|
|
|
(starting with 1). |
760
|
|
|
|
|
|
|
|
761
|
|
|
|
|
|
|
=head2 $object->proceed($passes) |
762
|
|
|
|
|
|
|
|
763
|
|
|
|
|
|
|
Returns C if the conditions specified at construction time are |
764
|
|
|
|
|
|
|
satisfied and processing may proceed. Returns C otherwise. |
765
|
|
|
|
|
|
|
|
766
|
|
|
|
|
|
|
If the user has specified a C callback in the constructor then |
767
|
|
|
|
|
|
|
this will be executed before any other rules are applied, with a |
768
|
|
|
|
|
|
|
reference to the current C<$object>, a reference to the runstats, and if |
769
|
|
|
|
|
|
|
being called at the end of pass with the number of passes. |
770
|
|
|
|
|
|
|
|
771
|
|
|
|
|
|
|
If this callback returns C then the other rules will be applied, |
772
|
|
|
|
|
|
|
and only if all other conditions from the constructor are satisfied |
773
|
|
|
|
|
|
|
will C itself return C. |
774
|
|
|
|
|
|
|
|
775
|
|
|
|
|
|
|
=head2 $object->runstats() |
776
|
|
|
|
|
|
|
|
777
|
|
|
|
|
|
|
Returns a reference to a hash of statistics about the last (or currently running) |
778
|
|
|
|
|
|
|
execution of consume. Example: |
779
|
|
|
|
|
|
|
|
780
|
|
|
|
|
|
|
{ |
781
|
|
|
|
|
|
|
'passes' => 2, |
782
|
|
|
|
|
|
|
'processed_this_pass' => 0, |
783
|
|
|
|
|
|
|
'processed' => 3, |
784
|
|
|
|
|
|
|
'start_time' => 1209750962, |
785
|
|
|
|
|
|
|
'failed' => 0, |
786
|
|
|
|
|
|
|
'elapsed' => 0, |
787
|
|
|
|
|
|
|
'end_time' => 1209750962, |
788
|
|
|
|
|
|
|
'failed_this_pass' => 0 |
789
|
|
|
|
|
|
|
} |
790
|
|
|
|
|
|
|
|
791
|
|
|
|
|
|
|
Note that start_time and end_time are unix timestamps. |
792
|
|
|
|
|
|
|
|
793
|
|
|
|
|
|
|
=cut |
794
|
|
|
|
|
|
|
|
795
|
0
|
|
|
0
|
1
|
0
|
sub runstats { $_[0]->{runstats} } |
796
|
|
|
|
|
|
|
|
797
|
|
|
|
|
|
|
sub proceed { |
798
|
70
|
|
|
70
|
1
|
245
|
my $self= shift; |
799
|
70
|
|
|
|
|
183
|
my $runstats= $self->{runstats}; |
800
|
70
|
|
|
|
|
201
|
$runstats->{end_time}= time; |
801
|
70
|
|
|
|
|
233
|
$runstats->{elapsed}= $runstats->{end_time} - $runstats->{start_time}; |
802
|
|
|
|
|
|
|
|
803
|
70
|
50
|
|
|
|
542
|
if ( my $cb= $self->{proceed} ) { |
804
|
0
|
0
|
|
|
|
0
|
$cb->( $self, $self->{runstats}, @_ ) # pass on the $passes argument if its there |
805
|
|
|
|
|
|
|
or return; |
806
|
|
|
|
|
|
|
} |
807
|
70
|
|
|
|
|
335
|
for my $key (qw(elapsed passes processed failed)) { |
808
|
280
|
|
|
|
|
619
|
my $max= "max_$key"; |
809
|
280
|
50
|
33
|
|
|
1195
|
return if $self->{$max} && $runstats->{$key} >= $self->{$max}; |
810
|
|
|
|
|
|
|
} |
811
|
70
|
50
|
|
|
|
240
|
return if $self->{halt}; |
812
|
70
|
|
|
|
|
483
|
return 1; |
813
|
|
|
|
|
|
|
} |
814
|
|
|
|
|
|
|
|
815
|
|
|
|
|
|
|
sub consume { |
816
|
5
|
|
|
5
|
1
|
188
|
my $self= shift; |
817
|
5
|
|
|
|
|
10
|
my $callback= shift; |
818
|
|
|
|
|
|
|
|
819
|
5
|
|
|
|
|
12
|
my $passes= 0; |
820
|
|
|
|
|
|
|
|
821
|
5
|
50
|
|
|
|
14
|
unless ($self->{runstats}) { |
822
|
5
|
|
|
|
|
17
|
$self->{runstats}= {}; |
823
|
|
|
|
|
|
|
$self->{runstats}{$_}= 0 |
824
|
5
|
|
|
|
|
40
|
for qw(passes processed failed processed_this_pass failed_this_pass); |
825
|
|
|
|
|
|
|
} |
826
|
|
|
|
|
|
|
|
827
|
5
|
|
|
|
|
11
|
my $runstats= $self->{runstats}; |
828
|
5
|
|
|
|
|
25
|
$runstats->{start_time}= time; |
829
|
|
|
|
|
|
|
|
830
|
5
|
|
|
|
|
28
|
$self->reset(); |
831
|
|
|
|
|
|
|
do { |
832
|
10
|
|
|
|
|
28
|
++$runstats->{passes}; |
833
|
10
|
|
|
|
|
24
|
$runstats->{processed_this_pass}= $runstats->{failed_this_pass}= 0; |
834
|
10
|
|
66
|
|
|
68
|
while ( $self->proceed && defined( my $item= $self->acquire ) ) { |
835
|
|
|
|
|
|
|
eval { |
836
|
50
|
|
|
|
|
283
|
$self->process($callback); |
837
|
50
|
|
|
|
|
250
|
$runstats->{processed_this_pass}++; |
838
|
50
|
|
|
|
|
139
|
$runstats->{processed}++; |
839
|
50
|
|
|
|
|
598
|
1; |
840
|
|
|
|
|
|
|
} |
841
|
50
|
50
|
|
|
|
120
|
or do { |
842
|
0
|
|
|
|
|
0
|
$runstats->{failed_this_pass}++; |
843
|
0
|
|
|
|
|
0
|
$runstats->{failed}++; |
844
|
|
|
|
|
|
|
|
845
|
|
|
|
|
|
|
# quotes force string copy |
846
|
0
|
|
|
|
|
0
|
$self->debug_warn(5, "Failed during \$self->process(\$callback): $@"); |
847
|
|
|
|
|
|
|
} |
848
|
|
|
|
|
|
|
} |
849
|
|
|
|
|
|
|
} while $self->proceed( $runstats->{passes} ) |
850
|
5
|
|
66
|
|
|
10
|
&& $runstats->{processed_this_pass}; |
851
|
|
|
|
|
|
|
|
852
|
|
|
|
|
|
|
# if we still hold a lock let it go. |
853
|
5
|
|
|
|
|
17
|
delete $self->{halt}; |
854
|
5
|
|
|
|
|
30
|
$self->release; |
855
|
5
|
|
|
|
|
21
|
return $runstats; |
856
|
|
|
|
|
|
|
} |
857
|
|
|
|
|
|
|
|
858
|
|
|
|
|
|
|
|
859
|
|
|
|
|
|
|
=head1 AUTHOR |
860
|
|
|
|
|
|
|
|
861
|
|
|
|
|
|
|
Yves Orton, C<< >> |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
=head1 BUGS |
864
|
|
|
|
|
|
|
|
865
|
|
|
|
|
|
|
Please report any bugs or feature requests to |
866
|
|
|
|
|
|
|
C, or through the web interface at |
867
|
|
|
|
|
|
|
L. |
868
|
|
|
|
|
|
|
|
869
|
|
|
|
|
|
|
I will be notified, and then you'll automatically be notified of progress on |
870
|
|
|
|
|
|
|
your bug as I make changes. |
871
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
=head1 SUPPORT |
873
|
|
|
|
|
|
|
|
874
|
|
|
|
|
|
|
You can find documentation for this module with the perldoc command. |
875
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
perldoc Data::Consumer |
877
|
|
|
|
|
|
|
|
878
|
|
|
|
|
|
|
|
879
|
|
|
|
|
|
|
You can also look for information at: |
880
|
|
|
|
|
|
|
|
881
|
|
|
|
|
|
|
=over 4 |
882
|
|
|
|
|
|
|
|
883
|
|
|
|
|
|
|
=item * RT: CPAN's request tracker |
884
|
|
|
|
|
|
|
|
885
|
|
|
|
|
|
|
L |
886
|
|
|
|
|
|
|
|
887
|
|
|
|
|
|
|
=item * AnnoCPAN: Annotated CPAN documentation |
888
|
|
|
|
|
|
|
|
889
|
|
|
|
|
|
|
L |
890
|
|
|
|
|
|
|
|
891
|
|
|
|
|
|
|
=item * CPAN Ratings |
892
|
|
|
|
|
|
|
|
893
|
|
|
|
|
|
|
L |
894
|
|
|
|
|
|
|
|
895
|
|
|
|
|
|
|
=item * Search CPAN |
896
|
|
|
|
|
|
|
|
897
|
|
|
|
|
|
|
L |
898
|
|
|
|
|
|
|
|
899
|
|
|
|
|
|
|
=back |
900
|
|
|
|
|
|
|
|
901
|
|
|
|
|
|
|
|
902
|
|
|
|
|
|
|
=head1 ACKNOWLEDGEMENTS |
903
|
|
|
|
|
|
|
|
904
|
|
|
|
|
|
|
Igor Sutton for ideas, testing and support |
905
|
|
|
|
|
|
|
|
906
|
|
|
|
|
|
|
=head1 COPYRIGHT & LICENSE |
907
|
|
|
|
|
|
|
|
908
|
|
|
|
|
|
|
Copyright 2008, 2010, 2011 Yves Orton, all rights reserved. |
909
|
|
|
|
|
|
|
|
910
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it |
911
|
|
|
|
|
|
|
under the same terms as Perl itself. |
912
|
|
|
|
|
|
|
|
913
|
|
|
|
|
|
|
=cut |
914
|
|
|
|
|
|
|
|
915
|
|
|
|
|
|
|
1; # End of Data::Consumer |
916
|
|
|
|
|
|
|
|