line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
##################################################################### |
2
|
|
|
|
|
|
|
## AUTHOR: Mary Ehlers, regina.verbae@gmail.com |
3
|
|
|
|
|
|
|
## ABSTRACT: An initialized pipeline segment for the Piper system |
4
|
|
|
|
|
|
|
##################################################################### |
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
package Piper::Instance; |
7
|
|
|
|
|
|
|
|
8
|
4
|
|
|
4
|
|
37
|
use v5.10; |
|
4
|
|
|
|
|
10
|
|
9
|
4
|
|
|
4
|
|
15
|
use strict; |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
60
|
|
10
|
4
|
|
|
4
|
|
14
|
use warnings; |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
122
|
|
11
|
|
|
|
|
|
|
|
12
|
4
|
|
|
4
|
|
1314
|
use List::AllUtils qw(last_value max part sum); |
|
4
|
|
|
|
|
39376
|
|
|
4
|
|
|
|
|
280
|
|
13
|
4
|
|
|
4
|
|
23
|
use List::UtilsBy qw(max_by min_by); |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
128
|
|
14
|
4
|
|
|
4
|
|
1340
|
use Piper::Path; |
|
4
|
|
|
|
|
11
|
|
|
4
|
|
|
|
|
105
|
|
15
|
4
|
|
|
4
|
|
23
|
use Scalar::Util qw(weaken); |
|
4
|
|
|
|
|
6
|
|
|
4
|
|
|
|
|
165
|
|
16
|
4
|
|
|
4
|
|
18
|
use Types::Standard qw(ArrayRef ConsumerOf Enum HashRef InstanceOf Tuple slurpy); |
|
4
|
|
|
|
|
5
|
|
|
4
|
|
|
|
|
21
|
|
17
|
|
|
|
|
|
|
|
18
|
4
|
|
|
4
|
|
3952
|
use Moo; |
|
4
|
|
|
|
|
37
|
|
|
4
|
|
|
|
|
20
|
|
19
|
4
|
|
|
4
|
|
1165
|
use namespace::clean; |
|
4
|
|
|
|
|
7
|
|
|
4
|
|
|
|
|
17
|
|
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
with qw(Piper::Role::Queue); |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
use overload ( |
24
|
3274
|
|
|
3274
|
|
113712
|
q{""} => sub { $_[0]->path }, |
25
|
4
|
|
|
|
|
22
|
fallback => 1, |
26
|
4
|
|
|
4
|
|
1395
|
); |
|
4
|
|
|
|
|
7
|
|
27
|
|
|
|
|
|
|
|
28
|
|
|
|
|
|
|
our $VERSION = '0.05'; # from Piper-0.05.tar.gz |
29
|
|
|
|
|
|
|
|
30
|
|
|
|
|
|
|
#pod =head1 ATTRIBUTES |
31
|
|
|
|
|
|
|
#pod |
32
|
|
|
|
|
|
|
#pod =head2 batch_size |
33
|
|
|
|
|
|
|
#pod |
34
|
|
|
|
|
|
|
#pod The number of items to process at a time for this segment. |
35
|
|
|
|
|
|
|
#pod |
36
|
|
|
|
|
|
|
#pod If not set, inherits the C of any existing parent(s). If the segment has no parents, or if none of its parents have a C defined, the default C will be used. The default is 200, but this can be configured at import of L. |
37
|
|
|
|
|
|
|
#pod |
38
|
|
|
|
|
|
|
#pod To clear a previously-set C, simply set it to C or use the C method. |
39
|
|
|
|
|
|
|
#pod |
40
|
|
|
|
|
|
|
#pod $segment->batch_size(undef); |
41
|
|
|
|
|
|
|
#pod $segment->clear_batch_size; |
42
|
|
|
|
|
|
|
#pod |
43
|
|
|
|
|
|
|
#pod =cut |
44
|
|
|
|
|
|
|
|
45
|
|
|
|
|
|
|
# around below to set up inheritance through parents |
46
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
#pod =head2 children |
48
|
|
|
|
|
|
|
#pod |
49
|
|
|
|
|
|
|
#pod For container instances (made from L objects, not L objects), the C attribute holds an arrayref of the contained instance objects. |
50
|
|
|
|
|
|
|
#pod |
51
|
|
|
|
|
|
|
#pod =cut |
52
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
has children => ( |
54
|
|
|
|
|
|
|
is => 'ro', |
55
|
|
|
|
|
|
|
# Must contain at least one child |
56
|
|
|
|
|
|
|
isa => Tuple[InstanceOf['Piper::Instance'], |
57
|
|
|
|
|
|
|
slurpy ArrayRef[InstanceOf['Piper::Instance']] |
58
|
|
|
|
|
|
|
], |
59
|
|
|
|
|
|
|
required => 0, |
60
|
|
|
|
|
|
|
predicate => 1, |
61
|
|
|
|
|
|
|
); |
62
|
|
|
|
|
|
|
|
63
|
|
|
|
|
|
|
#pod =head2 debug |
64
|
|
|
|
|
|
|
#pod |
65
|
|
|
|
|
|
|
#pod Debug level for this segment. When accessing, inherits the debug level of any existing parent(s) if not explicitly set for this segment. The default level is 0, but can be globally overridden with the environment variable C. |
66
|
|
|
|
|
|
|
#pod |
67
|
|
|
|
|
|
|
#pod To clear a previously-set debug level for a segment, simply set it to C or use the C method. |
68
|
|
|
|
|
|
|
#pod |
69
|
|
|
|
|
|
|
#pod $segment->debug(undef); |
70
|
|
|
|
|
|
|
#pod $segment->clear_debug; |
71
|
|
|
|
|
|
|
#pod |
72
|
|
|
|
|
|
|
#pod =cut |
73
|
|
|
|
|
|
|
|
74
|
|
|
|
|
|
|
# around below to set up inheritance through parents |
75
|
|
|
|
|
|
|
|
76
|
|
|
|
|
|
|
#pod =head2 enabled |
77
|
|
|
|
|
|
|
#pod |
78
|
|
|
|
|
|
|
#pod A boolean indicating that the segment is enabled and can accept items for processing. Inherits this attribute from any existing parent(s) with a default of true. |
79
|
|
|
|
|
|
|
#pod |
80
|
|
|
|
|
|
|
#pod To clear a previously-set enabled attribute, simply set it to C or use the C method. |
81
|
|
|
|
|
|
|
#pod |
82
|
|
|
|
|
|
|
#pod $segment->enabled(undef); |
83
|
|
|
|
|
|
|
#pod $segment->clear_enabled; |
84
|
|
|
|
|
|
|
#pod |
85
|
|
|
|
|
|
|
#pod =cut |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
# around below to set up inheritance through parents |
88
|
|
|
|
|
|
|
|
89
|
|
|
|
|
|
|
#pod =head2 main |
90
|
|
|
|
|
|
|
#pod |
91
|
|
|
|
|
|
|
#pod Holds a reference to the outermost container instance for the pipeline. |
92
|
|
|
|
|
|
|
#pod |
93
|
|
|
|
|
|
|
#pod =cut |
94
|
|
|
|
|
|
|
|
95
|
|
|
|
|
|
|
has main => ( |
96
|
|
|
|
|
|
|
is => 'lazy', |
97
|
|
|
|
|
|
|
isa => InstanceOf['Piper::Instance'], |
98
|
|
|
|
|
|
|
weak_ref => 1, |
99
|
|
|
|
|
|
|
builder => sub { |
100
|
39
|
|
|
39
|
|
322
|
my ($self) = @_; |
101
|
39
|
|
|
|
|
54
|
my $parent = $self; |
102
|
39
|
|
|
|
|
114
|
while ($parent->has_parent) { |
103
|
35
|
|
|
|
|
68
|
$parent = $parent->parent; |
104
|
|
|
|
|
|
|
} |
105
|
39
|
|
|
|
|
542
|
return $parent; |
106
|
|
|
|
|
|
|
}, |
107
|
|
|
|
|
|
|
); |
108
|
|
|
|
|
|
|
|
109
|
|
|
|
|
|
|
#pod =head2 parent |
110
|
|
|
|
|
|
|
#pod |
111
|
|
|
|
|
|
|
#pod Unless this segment is the outermost container (C), this attribute holds a reference to the segment's immediate container. |
112
|
|
|
|
|
|
|
#pod |
113
|
|
|
|
|
|
|
#pod =cut |
114
|
|
|
|
|
|
|
|
115
|
|
|
|
|
|
|
has parent => ( |
116
|
|
|
|
|
|
|
is => 'rwp', |
117
|
|
|
|
|
|
|
isa => InstanceOf['Piper::Instance'], |
118
|
|
|
|
|
|
|
# Setting a parent will introduce a self-reference |
119
|
|
|
|
|
|
|
weak_ref => 1, |
120
|
|
|
|
|
|
|
required => 0, |
121
|
|
|
|
|
|
|
predicate => 1, |
122
|
|
|
|
|
|
|
); |
123
|
|
|
|
|
|
|
|
124
|
|
|
|
|
|
|
#pod =head2 path |
125
|
|
|
|
|
|
|
#pod |
126
|
|
|
|
|
|
|
#pod The full path to this segment, built as the concatenation of all the parent(s) labels and the segment's label, joined by C>. L objects stringify to this attribute. |
127
|
|
|
|
|
|
|
#pod |
128
|
|
|
|
|
|
|
#pod =cut |
129
|
|
|
|
|
|
|
|
130
|
|
|
|
|
|
|
has path => ( |
131
|
|
|
|
|
|
|
is => 'lazy', |
132
|
|
|
|
|
|
|
isa => InstanceOf['Piper::Path'], |
133
|
|
|
|
|
|
|
builder => sub { |
134
|
35
|
|
|
35
|
|
3337
|
my ($self) = @_; |
135
|
|
|
|
|
|
|
|
136
|
35
|
100
|
|
|
|
524
|
return $self->has_parent |
137
|
|
|
|
|
|
|
? $self->parent->path->child($self->label) |
138
|
|
|
|
|
|
|
: Piper::Path->new($self->label); |
139
|
|
|
|
|
|
|
}, |
140
|
|
|
|
|
|
|
); |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
#pod =head2 verbose |
143
|
|
|
|
|
|
|
#pod |
144
|
|
|
|
|
|
|
#pod Verbosity level for this segment. When accessing, inherits verbosity level of any existing parent(s) if not explicitly set for this segment. |
145
|
|
|
|
|
|
|
#pod |
146
|
|
|
|
|
|
|
#pod To clear a previously-set verbosity level for a segment, simply set it to C or use the C method. |
147
|
|
|
|
|
|
|
#pod |
148
|
|
|
|
|
|
|
#pod $segment->verbose(undef); |
149
|
|
|
|
|
|
|
#pod $segment->clear_verbose; |
150
|
|
|
|
|
|
|
#pod |
151
|
|
|
|
|
|
|
#pod =cut |
152
|
|
|
|
|
|
|
|
153
|
|
|
|
|
|
|
# Inherit parent settings |
154
|
|
|
|
|
|
|
for my $attr (qw(batch_size debug enabled verbose)) { |
155
|
|
|
|
|
|
|
my $clear = "clear_$attr"; |
156
|
|
|
|
|
|
|
my $has = "has_$attr"; |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
around $attr => sub { |
159
|
|
|
|
|
|
|
my ($orig, $self) = splice @_, 0, 2; |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
state $default = { |
162
|
|
|
|
|
|
|
batch_size => $self->main->config->batch_size, |
163
|
|
|
|
|
|
|
debug => 0, |
164
|
|
|
|
|
|
|
enabled => 1, |
165
|
|
|
|
|
|
|
verbose => 0, |
166
|
|
|
|
|
|
|
}; |
167
|
|
|
|
|
|
|
|
168
|
|
|
|
|
|
|
if (@_) { |
169
|
|
|
|
|
|
|
return $self->$clear() if !defined $_[0]; |
170
|
|
|
|
|
|
|
return $self->$orig(@_); |
171
|
|
|
|
|
|
|
} |
172
|
|
|
|
|
|
|
else { |
173
|
|
|
|
|
|
|
return $self->$has() |
174
|
|
|
|
|
|
|
? $self->$orig() |
175
|
|
|
|
|
|
|
: $self->has_parent |
176
|
|
|
|
|
|
|
? $self->parent->$attr() |
177
|
|
|
|
|
|
|
: $default->{$attr}; |
178
|
|
|
|
|
|
|
} |
179
|
|
|
|
|
|
|
}; |
180
|
|
|
|
|
|
|
} |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
#pod =head1 METHODS |
183
|
|
|
|
|
|
|
#pod |
184
|
|
|
|
|
|
|
#pod Methods marked with a (*) should only be called from the outermost instance. |
185
|
|
|
|
|
|
|
#pod |
186
|
|
|
|
|
|
|
#pod =head2 clear_batch_size |
187
|
|
|
|
|
|
|
#pod |
188
|
|
|
|
|
|
|
#pod =head2 clear_debug |
189
|
|
|
|
|
|
|
#pod |
190
|
|
|
|
|
|
|
#pod =head2 clear_enabled |
191
|
|
|
|
|
|
|
#pod |
192
|
|
|
|
|
|
|
#pod =head2 clear_verbose |
193
|
|
|
|
|
|
|
#pod |
194
|
|
|
|
|
|
|
#pod Methods for clearing the corresponding attribute. |
195
|
|
|
|
|
|
|
#pod |
196
|
|
|
|
|
|
|
#pod =head2 has_children |
197
|
|
|
|
|
|
|
#pod |
198
|
|
|
|
|
|
|
#pod A boolean indicating whether the instance has any children (contained instances). Will be true for all segments initialized from a L object and false for all segments initialized from a L object. |
199
|
|
|
|
|
|
|
#pod |
200
|
|
|
|
|
|
|
#pod =head2 has_parent |
201
|
|
|
|
|
|
|
#pod |
202
|
|
|
|
|
|
|
#pod A boolean indicating whether the instance has a parent (container instance). Will be true for all segments except the outermost segment (C). |
203
|
|
|
|
|
|
|
#pod |
204
|
|
|
|
|
|
|
#pod =head2 has_pending |
205
|
|
|
|
|
|
|
#pod |
206
|
|
|
|
|
|
|
#pod Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing. |
207
|
|
|
|
|
|
|
#pod |
208
|
|
|
|
|
|
|
#pod =cut |
209
|
|
|
|
|
|
|
|
210
|
|
|
|
|
|
|
sub has_pending { |
211
|
398
|
|
|
398
|
1
|
4887
|
my ($self) = @_; |
212
|
|
|
|
|
|
|
|
213
|
398
|
100
|
|
|
|
621
|
if ($self->has_children) { |
214
|
208
|
|
|
|
|
244
|
for my $child (@{$self->children}) { |
|
208
|
|
|
|
|
307
|
|
215
|
267
|
100
|
|
|
|
391
|
return 1 if $child->has_pending; |
216
|
|
|
|
|
|
|
} |
217
|
51
|
|
|
|
|
131
|
return 0; |
218
|
|
|
|
|
|
|
} |
219
|
|
|
|
|
|
|
else { |
220
|
190
|
|
|
|
|
2536
|
return $self->queue->ready; |
221
|
|
|
|
|
|
|
} |
222
|
|
|
|
|
|
|
} |
223
|
|
|
|
|
|
|
|
224
|
|
|
|
|
|
|
#pod =head2 *dequeue([$num]) |
225
|
|
|
|
|
|
|
#pod |
226
|
|
|
|
|
|
|
#pod Remove at most C<$num> S<(default 1)> processed items from the end of the segment. |
227
|
|
|
|
|
|
|
#pod |
228
|
|
|
|
|
|
|
#pod =head2 *enqueue(@data) |
229
|
|
|
|
|
|
|
#pod |
230
|
|
|
|
|
|
|
#pod Queue C<@data> for processing by the pipeline. |
231
|
|
|
|
|
|
|
#pod |
232
|
|
|
|
|
|
|
#pod =cut |
233
|
|
|
|
|
|
|
|
234
|
|
|
|
|
|
|
around enqueue => sub { |
235
|
|
|
|
|
|
|
my ($orig, $self, @args) = @_; |
236
|
|
|
|
|
|
|
|
237
|
|
|
|
|
|
|
if (!$self->enabled) { |
238
|
|
|
|
|
|
|
# Bypass - go straight to drain |
239
|
|
|
|
|
|
|
$self->INFO('Skipping disabled process', @args); |
240
|
|
|
|
|
|
|
$self->drain->enqueue(@args); |
241
|
|
|
|
|
|
|
return; |
242
|
|
|
|
|
|
|
} |
243
|
|
|
|
|
|
|
|
244
|
|
|
|
|
|
|
my @items; |
245
|
|
|
|
|
|
|
if ($self->has_allow) { |
246
|
|
|
|
|
|
|
my ($skip, $queue) = part { |
247
|
|
|
|
|
|
|
$self->allow->($_) ? 1 : 0 |
248
|
|
|
|
|
|
|
} @args; |
249
|
|
|
|
|
|
|
|
250
|
|
|
|
|
|
|
@items = @$queue if defined $queue; |
251
|
|
|
|
|
|
|
|
252
|
|
|
|
|
|
|
if (defined $skip) { |
253
|
|
|
|
|
|
|
$self->INFO('Disallowed items emitted to next handler', @$skip); |
254
|
|
|
|
|
|
|
$self->drain->enqueue(@$skip); |
255
|
|
|
|
|
|
|
} |
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
else { |
258
|
|
|
|
|
|
|
@items = @args; |
259
|
|
|
|
|
|
|
} |
260
|
|
|
|
|
|
|
|
261
|
|
|
|
|
|
|
return unless @items; |
262
|
|
|
|
|
|
|
|
263
|
|
|
|
|
|
|
$self->INFO('Queueing items', @items); |
264
|
|
|
|
|
|
|
$self->$orig(@items); |
265
|
|
|
|
|
|
|
}; |
266
|
|
|
|
|
|
|
|
267
|
|
|
|
|
|
|
#pod =head2 find_segment($location) |
268
|
|
|
|
|
|
|
#pod |
269
|
|
|
|
|
|
|
#pod Find and return the segment instance according to <$location>, which can be a label or a path-like hierarchy of labels. |
270
|
|
|
|
|
|
|
#pod |
271
|
|
|
|
|
|
|
#pod For example, in the following pipeline, a few possible C<$location> values include C, C, or C. |
272
|
|
|
|
|
|
|
#pod |
273
|
|
|
|
|
|
|
#pod my $pipe = Piper->new( |
274
|
|
|
|
|
|
|
#pod { label => 'main' }, |
275
|
|
|
|
|
|
|
#pod subpipe => Piper->new( |
276
|
|
|
|
|
|
|
#pod a => sub { ... }, |
277
|
|
|
|
|
|
|
#pod b => sub { ... }, |
278
|
|
|
|
|
|
|
#pod c => sub { ... }, |
279
|
|
|
|
|
|
|
#pod ), |
280
|
|
|
|
|
|
|
#pod )->init; |
281
|
|
|
|
|
|
|
#pod |
282
|
|
|
|
|
|
|
#pod If a label is unique within the pipeline, no path is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner. |
283
|
|
|
|
|
|
|
#pod |
284
|
|
|
|
|
|
|
#pod For example, in the following pipeline, searching for C from C would find C, not C. So to reach C from C, the appropriate search would be for C. |
285
|
|
|
|
|
|
|
#pod |
286
|
|
|
|
|
|
|
#pod my $pipe = Piper->new( |
287
|
|
|
|
|
|
|
#pod { label => 'main' }, |
288
|
|
|
|
|
|
|
#pod pipeA => Piper->new( |
289
|
|
|
|
|
|
|
#pod processA => sub { ... }, |
290
|
|
|
|
|
|
|
#pod processB => sub { ... }, |
291
|
|
|
|
|
|
|
#pod ), |
292
|
|
|
|
|
|
|
#pod processA => sub { ... }, |
293
|
|
|
|
|
|
|
#pod ); |
294
|
|
|
|
|
|
|
#pod |
295
|
|
|
|
|
|
|
#pod =cut |
296
|
|
|
|
|
|
|
|
297
|
|
|
|
|
|
|
sub find_segment { |
298
|
192
|
|
|
192
|
1
|
58587
|
my ($self, $location) = @_; |
299
|
|
|
|
|
|
|
|
300
|
192
|
|
|
|
|
261
|
state $global_cache = {}; |
301
|
192
|
|
100
|
|
|
3566
|
$global_cache->{$self->main->id}{$self->path} //= {}; |
302
|
192
|
|
|
|
|
2574
|
my $cache = $global_cache->{$self->main->id}{$self->path}; |
303
|
|
|
|
|
|
|
|
304
|
192
|
100
|
|
|
|
497
|
unless (exists $cache->{$location}) { |
305
|
161
|
|
|
|
|
2108
|
$location = Piper::Path->new($location); |
306
|
161
|
100
|
100
|
|
|
8761
|
if ($self->has_children or $self->has_parent) { |
307
|
159
|
100
|
|
|
|
317
|
my $parent = $self->has_children ? $self : $self->parent; |
308
|
159
|
|
|
|
|
273
|
my $segment = $parent->descendant($location); |
309
|
159
|
|
100
|
|
|
432
|
while (!defined $segment and $parent->has_parent) { |
310
|
88
|
|
|
|
|
109
|
my $referrer = $parent; |
311
|
88
|
|
|
|
|
137
|
$parent = $parent->parent; |
312
|
88
|
|
|
|
|
145
|
$segment = $parent->descendant($location, $referrer); |
313
|
|
|
|
|
|
|
} |
314
|
159
|
|
|
|
|
375
|
$cache->{$location} = $segment; |
315
|
|
|
|
|
|
|
} |
316
|
|
|
|
|
|
|
else { |
317
|
|
|
|
|
|
|
# Lonely Process (no parents or children) |
318
|
2
|
100
|
|
|
|
6
|
$cache->{$location} = "$self" eq "$location" ? $self : undef; |
319
|
|
|
|
|
|
|
} |
320
|
161
|
100
|
|
|
|
325
|
weaken($cache->{$location}) if defined $cache->{$location}; |
321
|
|
|
|
|
|
|
} |
322
|
|
|
|
|
|
|
|
323
|
192
|
100
|
|
|
|
434
|
$self->DEBUG("Found label $location: '$cache->{$location}'") if defined $cache->{$location}; |
324
|
192
|
|
|
|
|
5204
|
return $cache->{$location}; |
325
|
|
|
|
|
|
|
} |
326
|
|
|
|
|
|
|
|
327
|
|
|
|
|
|
|
#pod =head2 *flush |
328
|
|
|
|
|
|
|
#pod |
329
|
|
|
|
|
|
|
#pod Process batches until there are no more items pending. |
330
|
|
|
|
|
|
|
#pod |
331
|
|
|
|
|
|
|
#pod =cut |
332
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
sub flush { |
334
|
2
|
|
|
2
|
0
|
6
|
my ($self) = @_; |
335
|
|
|
|
|
|
|
|
336
|
2
|
|
|
|
|
8
|
while ($self->has_pending) { |
337
|
62
|
|
|
|
|
112
|
$self->process_batch; |
338
|
|
|
|
|
|
|
} |
339
|
|
|
|
|
|
|
} |
340
|
|
|
|
|
|
|
|
341
|
|
|
|
|
|
|
#pod =head2 *is_exhausted |
342
|
|
|
|
|
|
|
#pod |
343
|
|
|
|
|
|
|
#pod Returns a boolean indicating whether there are any items left to process or dequeue. |
344
|
|
|
|
|
|
|
#pod |
345
|
|
|
|
|
|
|
#pod =cut |
346
|
|
|
|
|
|
|
|
347
|
|
|
|
|
|
|
sub is_exhausted { |
348
|
24
|
|
|
24
|
0
|
6134
|
my ($self) = @_; |
349
|
|
|
|
|
|
|
|
350
|
24
|
100
|
|
|
|
39
|
return $self->prepare ? 0 : 1; |
351
|
|
|
|
|
|
|
} |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
#pod =head2 *isnt_exhausted |
354
|
|
|
|
|
|
|
#pod |
355
|
|
|
|
|
|
|
#pod Returns the opposite of C. |
356
|
|
|
|
|
|
|
#pod |
357
|
|
|
|
|
|
|
#pod =cut |
358
|
|
|
|
|
|
|
|
359
|
|
|
|
|
|
|
sub isnt_exhausted { |
360
|
15
|
|
|
15
|
0
|
32
|
my ($self) = @_; |
361
|
15
|
|
|
|
|
30
|
return !$self->is_exhausted; |
362
|
|
|
|
|
|
|
} |
363
|
|
|
|
|
|
|
|
364
|
|
|
|
|
|
|
#pod =head2 next_segment |
365
|
|
|
|
|
|
|
#pod |
366
|
|
|
|
|
|
|
#pod Returns the next adjacent segment from the calling segment. Returns C for the outermost container. |
367
|
|
|
|
|
|
|
#pod |
368
|
|
|
|
|
|
|
#pod =cut |
369
|
|
|
|
|
|
|
|
370
|
|
|
|
|
|
|
sub next_segment { |
371
|
12
|
|
|
12
|
1
|
18
|
my ($self) = @_; |
372
|
12
|
50
|
|
|
|
29
|
return unless $self->has_parent; |
373
|
12
|
|
|
|
|
165
|
return $self->parent->follower->{$self}; |
374
|
|
|
|
|
|
|
} |
375
|
|
|
|
|
|
|
|
376
|
|
|
|
|
|
|
#pod =head2 pending |
377
|
|
|
|
|
|
|
#pod |
378
|
|
|
|
|
|
|
#pod Returns the number of items that are queued at some level of the segment but have not completed processing. |
379
|
|
|
|
|
|
|
#pod |
380
|
|
|
|
|
|
|
#pod =cut |
381
|
|
|
|
|
|
|
|
382
|
|
|
|
|
|
|
sub pending { |
383
|
746
|
|
|
746
|
1
|
2981
|
my ($self) = @_; |
384
|
746
|
100
|
|
|
|
1033
|
if ($self->has_children) { |
385
|
14
|
|
|
|
|
15
|
return sum(map { $_->pending } @{$self->children}); |
|
24
|
|
|
|
|
36
|
|
|
14
|
|
|
|
|
32
|
|
386
|
|
|
|
|
|
|
} |
387
|
|
|
|
|
|
|
else { |
388
|
732
|
|
|
|
|
9528
|
return $self->queue->ready; |
389
|
|
|
|
|
|
|
} |
390
|
|
|
|
|
|
|
} |
391
|
|
|
|
|
|
|
|
392
|
|
|
|
|
|
|
#pod =head2 *prepare([$num]) |
393
|
|
|
|
|
|
|
#pod |
394
|
|
|
|
|
|
|
#pod Process batches while data is still C until at least C<$num> S<(default 1)> items are C for C. |
395
|
|
|
|
|
|
|
#pod |
396
|
|
|
|
|
|
|
#pod =cut |
397
|
|
|
|
|
|
|
|
398
|
|
|
|
|
|
|
sub prepare { |
399
|
28
|
|
|
28
|
1
|
49
|
my ($self, $num) = @_; |
400
|
28
|
|
100
|
|
|
92
|
$num //= 1; |
401
|
|
|
|
|
|
|
|
402
|
28
|
|
66
|
|
|
48
|
while ($self->has_pending and $self->ready < $num) { |
403
|
26
|
|
|
|
|
50
|
$self->process_batch; |
404
|
|
|
|
|
|
|
} |
405
|
28
|
|
|
|
|
356
|
return $self->ready; |
406
|
|
|
|
|
|
|
} |
407
|
|
|
|
|
|
|
|
408
|
|
|
|
|
|
|
#pod =head2 ready |
409
|
|
|
|
|
|
|
#pod |
410
|
|
|
|
|
|
|
#pod Returns the number of items that have finished processing and are ready for C from the segment. |
411
|
|
|
|
|
|
|
#pod |
412
|
|
|
|
|
|
|
#pod =cut |
413
|
|
|
|
|
|
|
|
414
|
|
|
|
|
|
|
#pod =head1 FLOW CONTROL METHODS |
415
|
|
|
|
|
|
|
#pod |
416
|
|
|
|
|
|
|
#pod These methods are available for use within process handler subroutines (see L). |
417
|
|
|
|
|
|
|
#pod |
418
|
|
|
|
|
|
|
#pod =head2 eject(@data) |
419
|
|
|
|
|
|
|
#pod |
420
|
|
|
|
|
|
|
#pod If the segment has a parent, send C<@data> to the drain of its parent. Otherwise, enqueues C<@data> to the segment's drain. |
421
|
|
|
|
|
|
|
#pod |
422
|
|
|
|
|
|
|
#pod =cut |
423
|
|
|
|
|
|
|
|
424
|
|
|
|
|
|
|
sub eject { |
425
|
5
|
|
|
5
|
1
|
4849
|
my $self = shift; |
426
|
5
|
100
|
|
|
|
18
|
if ($self->has_parent) { |
427
|
4
|
|
|
|
|
15
|
$self->INFO('Ejecting to drain of parent ('.$self->parent.')', @_); |
428
|
4
|
|
|
|
|
153
|
$self->parent->drain->enqueue(@_); |
429
|
|
|
|
|
|
|
} |
430
|
|
|
|
|
|
|
else { |
431
|
1
|
|
|
|
|
23
|
$self->INFO('Ejecting to drain', @_); |
432
|
1
|
|
|
|
|
37
|
$self->drain->enqueue(@_); |
433
|
|
|
|
|
|
|
} |
434
|
|
|
|
|
|
|
} |
435
|
|
|
|
|
|
|
|
436
|
|
|
|
|
|
|
#pod =head2 emit(@data) |
437
|
|
|
|
|
|
|
#pod |
438
|
|
|
|
|
|
|
#pod Send C<@data> to the next segment in the pipeline. If the segment is the last in the pipeline, emits to the drain, making the C<@data> ready for C. |
439
|
|
|
|
|
|
|
#pod |
440
|
|
|
|
|
|
|
#pod =cut |
441
|
|
|
|
|
|
|
|
442
|
|
|
|
|
|
|
sub emit { |
443
|
130
|
|
|
130
|
1
|
5188
|
my $self = shift; |
444
|
130
|
|
|
|
|
1916
|
$self->INFO('Emitting', @_); |
445
|
|
|
|
|
|
|
# Just collect in the drain |
446
|
130
|
|
|
|
|
4681
|
$self->drain->enqueue(@_); |
447
|
|
|
|
|
|
|
} |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
#pod =head2 inject(@data) |
450
|
|
|
|
|
|
|
#pod |
451
|
|
|
|
|
|
|
#pod If the segment has a parent, enqueues C<@data> to its parent. Otherwise, enqueues <@data> to itself. |
452
|
|
|
|
|
|
|
#pod |
453
|
|
|
|
|
|
|
#pod =cut |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
sub inject { |
456
|
5
|
|
|
5
|
1
|
6948
|
my $self = shift; |
457
|
|
|
|
|
|
|
|
458
|
5
|
100
|
|
|
|
19
|
if ($self->has_parent) { |
459
|
4
|
|
|
|
|
26
|
$self->INFO('Injecting to parent ('.$self->parent.')', @_); |
460
|
4
|
|
|
|
|
156
|
$self->parent->enqueue(@_); |
461
|
|
|
|
|
|
|
} |
462
|
|
|
|
|
|
|
else { |
463
|
1
|
|
|
|
|
5
|
$self->INFO('Injecting to self ('.$self.')', @_); |
464
|
1
|
|
|
|
|
41
|
$self->enqueue(@_); |
465
|
|
|
|
|
|
|
} |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
|
|
|
|
|
|
#pod =head2 injectAfter($location, @data) |
469
|
|
|
|
|
|
|
#pod |
470
|
|
|
|
|
|
|
#pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>. |
471
|
|
|
|
|
|
|
#pod |
472
|
|
|
|
|
|
|
#pod =cut |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
sub injectAfter { |
475
|
5
|
|
|
5
|
1
|
4981
|
my $self = shift; |
476
|
5
|
|
|
|
|
7
|
my $location = shift; |
477
|
5
|
|
|
|
|
12
|
my $segment = $self->find_segment($location); |
478
|
5
|
100
|
|
|
|
46
|
$self->ERROR("Could not find $location to injectAfter", @_) |
479
|
|
|
|
|
|
|
if !defined $segment; |
480
|
3
|
|
|
|
|
52
|
$self->INFO("Injecting to $location", @_); |
481
|
3
|
|
|
|
|
115
|
$segment->drain->enqueue(@_); |
482
|
|
|
|
|
|
|
} |
483
|
|
|
|
|
|
|
|
484
|
|
|
|
|
|
|
#pod =head2 injectAt($location, @data) |
485
|
|
|
|
|
|
|
#pod |
486
|
|
|
|
|
|
|
#pod Send C<@data> to the segment I the specified C<$location>. See L|/find_segment($location)> for a detailed description of C<$location>. |
487
|
|
|
|
|
|
|
#pod |
488
|
|
|
|
|
|
|
#pod =cut |
489
|
|
|
|
|
|
|
|
490
|
|
|
|
|
|
|
sub injectAt { |
491
|
5
|
|
|
5
|
1
|
4273
|
my $self = shift; |
492
|
5
|
|
|
|
|
26
|
my $location = shift; |
493
|
5
|
|
|
|
|
14
|
my $segment = $self->find_segment($location); |
494
|
5
|
100
|
|
|
|
53
|
$self->ERROR("Could not find $location to injectAt", @_) |
495
|
|
|
|
|
|
|
if !defined $segment; |
496
|
3
|
|
|
|
|
55
|
$self->INFO("Injecting to $location", @_); |
497
|
3
|
|
|
|
|
113
|
$segment->enqueue(@_); |
498
|
|
|
|
|
|
|
} |
499
|
|
|
|
|
|
|
|
500
|
|
|
|
|
|
|
#pod =head2 recycle(@data) |
501
|
|
|
|
|
|
|
#pod |
502
|
|
|
|
|
|
|
#pod Re-queue C<@data> to the top of the current segment in an order such that C would subsequently return C<$data[0]> and so forth. |
503
|
|
|
|
|
|
|
#pod |
504
|
|
|
|
|
|
|
#pod =cut |
505
|
|
|
|
|
|
|
|
506
|
|
|
|
|
|
|
sub recycle { |
507
|
5
|
|
|
5
|
1
|
27
|
my $self = shift; |
508
|
5
|
|
|
|
|
97
|
$self->INFO('Recycling', @_); |
509
|
5
|
|
|
|
|
189
|
$self->requeue(@_); |
510
|
|
|
|
|
|
|
} |
511
|
|
|
|
|
|
|
|
512
|
|
|
|
|
|
|
#pod =head1 LOGGING AND DEBUGGING METHODS |
513
|
|
|
|
|
|
|
#pod |
514
|
|
|
|
|
|
|
#pod See L for detailed descriptions. |
515
|
|
|
|
|
|
|
#pod |
516
|
|
|
|
|
|
|
#pod =head2 INFO($message, [@items]) |
517
|
|
|
|
|
|
|
#pod |
518
|
|
|
|
|
|
|
#pod Prints an informational C<$message> to STDERR if either the debug or verbosity level for the segment S<< is > 0 >>. |
519
|
|
|
|
|
|
|
#pod |
520
|
|
|
|
|
|
|
#pod =head2 DEBUG($message, [@items]) |
521
|
|
|
|
|
|
|
#pod |
522
|
|
|
|
|
|
|
#pod Prints a debug C<$message> to STDERR if the debug level for the segment S<< is > 0 >>. |
523
|
|
|
|
|
|
|
#pod |
524
|
|
|
|
|
|
|
#pod =head2 WARN($message, [@items]) |
525
|
|
|
|
|
|
|
#pod |
526
|
|
|
|
|
|
|
#pod Issues a warning with C<$message> via L. |
527
|
|
|
|
|
|
|
#pod |
528
|
|
|
|
|
|
|
#pod =head2 ERROR($message, [@items]) |
529
|
|
|
|
|
|
|
#pod |
530
|
|
|
|
|
|
|
#pod Throws an error with C<$message> via L. |
531
|
|
|
|
|
|
|
#pod |
532
|
|
|
|
|
|
|
#pod =head1 UTILITY ATTRIBUTES |
533
|
|
|
|
|
|
|
#pod |
534
|
|
|
|
|
|
|
#pod None of these should be directly accessed. Documented for contributors and source-code readers. |
535
|
|
|
|
|
|
|
#pod |
536
|
|
|
|
|
|
|
#pod =head2 args |
537
|
|
|
|
|
|
|
#pod |
538
|
|
|
|
|
|
|
#pod The arguments passed to the C method of L. |
539
|
|
|
|
|
|
|
#pod |
540
|
|
|
|
|
|
|
#pod =cut |
541
|
|
|
|
|
|
|
|
542
|
|
|
|
|
|
|
has args => ( |
543
|
|
|
|
|
|
|
is => 'rwp', |
544
|
|
|
|
|
|
|
isa => ArrayRef, |
545
|
|
|
|
|
|
|
lazy => 1, |
546
|
|
|
|
|
|
|
builder => sub { |
547
|
10
|
|
|
10
|
|
91
|
my ($self) = @_; |
548
|
10
|
50
|
|
|
|
27
|
if ($self->has_parent) { |
549
|
10
|
|
|
|
|
126
|
return $self->main->args; |
550
|
|
|
|
|
|
|
} |
551
|
|
|
|
|
|
|
else { |
552
|
0
|
|
|
|
|
0
|
return []; |
553
|
|
|
|
|
|
|
} |
554
|
|
|
|
|
|
|
}, |
555
|
|
|
|
|
|
|
); |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
#pod =head2 directory |
558
|
|
|
|
|
|
|
#pod |
559
|
|
|
|
|
|
|
#pod A hashref of the segment's children, keyed by their labels. Used by C. |
560
|
|
|
|
|
|
|
#pod |
561
|
|
|
|
|
|
|
#pod =cut |
562
|
|
|
|
|
|
|
|
563
|
|
|
|
|
|
|
has directory => ( |
564
|
|
|
|
|
|
|
is => 'lazy', |
565
|
|
|
|
|
|
|
isa => HashRef, |
566
|
|
|
|
|
|
|
builder => sub { |
567
|
7
|
|
|
7
|
|
57
|
my ($self) = @_; |
568
|
7
|
50
|
|
|
|
22
|
return {} unless $self->has_children; |
569
|
7
|
|
|
|
|
11
|
my %dir; |
570
|
7
|
|
|
|
|
10
|
for my $child (@{$self->children}) { |
|
7
|
|
|
|
|
19
|
|
571
|
11
|
|
|
|
|
159
|
$dir{$child->path->name} = $child; |
572
|
|
|
|
|
|
|
} |
573
|
7
|
|
|
|
|
107
|
return \%dir; |
574
|
|
|
|
|
|
|
}, |
575
|
|
|
|
|
|
|
); |
576
|
|
|
|
|
|
|
|
577
|
|
|
|
|
|
|
#pod =head2 drain |
578
|
|
|
|
|
|
|
#pod |
579
|
|
|
|
|
|
|
#pod A reference to the location where the segment's processed items are emitted. |
580
|
|
|
|
|
|
|
#pod |
581
|
|
|
|
|
|
|
#pod =cut |
582
|
|
|
|
|
|
|
|
583
|
|
|
|
|
|
|
BEGIN { # Enables 'with Piper::Role::Queue' |
584
|
|
|
|
|
|
|
has drain => ( |
585
|
|
|
|
|
|
|
is => 'lazy', |
586
|
|
|
|
|
|
|
handles => [qw(dequeue ready)], |
587
|
|
|
|
|
|
|
builder => sub { |
588
|
23
|
|
|
23
|
|
204
|
my ($self) = @_; |
589
|
23
|
100
|
|
|
|
52
|
if ($self->has_parent) { |
590
|
12
|
|
|
|
|
29
|
return $self->next_segment; |
591
|
|
|
|
|
|
|
} |
592
|
|
|
|
|
|
|
else { |
593
|
11
|
|
|
|
|
142
|
return $self->main->config->queue_class->new(); |
594
|
|
|
|
|
|
|
} |
595
|
|
|
|
|
|
|
}, |
596
|
4
|
|
|
4
|
|
6937
|
); |
597
|
|
|
|
|
|
|
} |
598
|
|
|
|
|
|
|
|
599
|
|
|
|
|
|
|
#pod =head2 follower |
600
|
|
|
|
|
|
|
#pod |
601
|
|
|
|
|
|
|
#pod A hashref of children paths to the child's next adjacent segment. Used by C. |
602
|
|
|
|
|
|
|
#pod |
603
|
|
|
|
|
|
|
#pod =cut |
604
|
|
|
|
|
|
|
|
605
|
|
|
|
|
|
|
has follower => ( |
606
|
|
|
|
|
|
|
is => 'lazy', |
607
|
|
|
|
|
|
|
isa => HashRef, |
608
|
|
|
|
|
|
|
builder => sub { |
609
|
8
|
|
|
8
|
|
74
|
my ($self) = @_; |
610
|
8
|
50
|
|
|
|
23
|
return {} unless $self->has_children; |
611
|
8
|
|
|
|
|
10
|
my %follow; |
612
|
8
|
|
|
|
|
13
|
for my $index (0..$#{$self->children}) { |
|
8
|
|
|
|
|
25
|
|
613
|
12
|
100
|
|
|
|
33
|
if (defined $self->children->[$index + 1]) { |
614
|
4
|
|
|
|
|
19
|
$follow{$self->children->[$index]} = |
615
|
|
|
|
|
|
|
$self->children->[$index + 1]; |
616
|
|
|
|
|
|
|
} |
617
|
|
|
|
|
|
|
else { |
618
|
8
|
|
|
|
|
110
|
$follow{$self->children->[$index]} = $self->drain; |
619
|
|
|
|
|
|
|
} |
620
|
|
|
|
|
|
|
} |
621
|
8
|
|
|
|
|
112
|
return \%follow; |
622
|
|
|
|
|
|
|
}, |
623
|
|
|
|
|
|
|
); |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
#pod =head2 logger |
626
|
|
|
|
|
|
|
#pod |
627
|
|
|
|
|
|
|
#pod A reference to the logger for the pipeline. Handles L methods. |
628
|
|
|
|
|
|
|
#pod |
629
|
|
|
|
|
|
|
#pod =cut |
630
|
|
|
|
|
|
|
|
631
|
|
|
|
|
|
|
has logger => ( |
632
|
|
|
|
|
|
|
is => 'lazy', |
633
|
|
|
|
|
|
|
isa => ConsumerOf['Piper::Role::Logger'], |
634
|
|
|
|
|
|
|
handles => 'Piper::Role::Logger', |
635
|
|
|
|
|
|
|
builder => sub { |
636
|
35
|
|
|
35
|
|
744
|
my ($self) = @_; |
637
|
|
|
|
|
|
|
|
638
|
35
|
100
|
|
|
|
90
|
if ($self->has_parent) { |
639
|
22
|
|
|
|
|
270
|
return $self->main->logger; |
640
|
|
|
|
|
|
|
} |
641
|
|
|
|
|
|
|
else { |
642
|
13
|
|
|
|
|
186
|
return $self->main->config->logger_class->new(); |
643
|
|
|
|
|
|
|
} |
644
|
|
|
|
|
|
|
}, |
645
|
|
|
|
|
|
|
); |
646
|
|
|
|
|
|
|
|
647
|
|
|
|
|
|
|
# Cute little trick to auto-insert the instance object |
648
|
|
|
|
|
|
|
# as first argument, since $self will become the logger |
649
|
|
|
|
|
|
|
# object and lose access to paths/labels/etc. |
650
|
|
|
|
|
|
|
around [qw(INFO DEBUG WARN ERROR)] => sub { |
651
|
|
|
|
|
|
|
my ($orig, $self) = splice @_, 0, 2; |
652
|
|
|
|
|
|
|
if (ref $_[0]) { |
653
|
|
|
|
|
|
|
$self->$orig(@_); |
654
|
|
|
|
|
|
|
} |
655
|
|
|
|
|
|
|
else { |
656
|
|
|
|
|
|
|
$self->$orig($self, @_); |
657
|
|
|
|
|
|
|
} |
658
|
|
|
|
|
|
|
}; |
659
|
|
|
|
|
|
|
|
660
|
|
|
|
|
|
|
#pod =head2 queue |
661
|
|
|
|
|
|
|
#pod |
662
|
|
|
|
|
|
|
#pod A reference to the location where data is queued for processing by this segment. |
663
|
|
|
|
|
|
|
#pod |
664
|
|
|
|
|
|
|
#pod =cut |
665
|
|
|
|
|
|
|
|
666
|
|
|
|
|
|
|
BEGIN { # Enables 'with Piper::Role::Queue' |
667
|
|
|
|
|
|
|
has queue => ( |
668
|
|
|
|
|
|
|
is => 'lazy', |
669
|
|
|
|
|
|
|
isa => ConsumerOf['Piper::Role::Queue'], |
670
|
|
|
|
|
|
|
handles => [qw(enqueue requeue)], |
671
|
|
|
|
|
|
|
builder => sub { |
672
|
23
|
|
|
23
|
|
411
|
my ($self) = @_; |
673
|
23
|
100
|
|
|
|
62
|
if ($self->has_children) { |
674
|
8
|
|
|
|
|
109
|
return $self->children->[0]; |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
else { |
677
|
15
|
|
|
|
|
197
|
return $self->main->config->queue_class->new(); |
678
|
|
|
|
|
|
|
} |
679
|
|
|
|
|
|
|
}, |
680
|
4
|
|
|
4
|
|
10967
|
); |
681
|
|
|
|
|
|
|
} |
682
|
|
|
|
|
|
|
|
683
|
|
|
|
|
|
|
#pod =head2 segment |
684
|
|
|
|
|
|
|
#pod |
685
|
|
|
|
|
|
|
#pod The L or L object from which the instance segment was created. |
686
|
|
|
|
|
|
|
#pod |
687
|
|
|
|
|
|
|
#pod =cut |
688
|
|
|
|
|
|
|
|
689
|
|
|
|
|
|
|
BEGIN { # So we can 'around' on Piper::Role::Segment methods |
690
|
4
|
|
|
4
|
|
19688
|
has segment => ( |
691
|
|
|
|
|
|
|
is => 'ro', |
692
|
|
|
|
|
|
|
isa => ConsumerOf['Piper::Role::Segment'], |
693
|
|
|
|
|
|
|
handles => 'Piper::Role::Segment', |
694
|
|
|
|
|
|
|
required => 1, |
695
|
|
|
|
|
|
|
); |
696
|
|
|
|
|
|
|
} |
697
|
|
|
|
|
|
|
|
698
|
|
|
|
|
|
|
#pod =head1 UTILITY METHODS |
699
|
|
|
|
|
|
|
#pod |
700
|
|
|
|
|
|
|
#pod None of these should be directly accessed. Documented for contributors and source-code readers. |
701
|
|
|
|
|
|
|
#pod |
702
|
|
|
|
|
|
|
#pod =head2 descendant($path, $referrer) |
703
|
|
|
|
|
|
|
#pod |
704
|
|
|
|
|
|
|
#pod Returns a child segment if its path ends with C<$path>. Does not search children with a path of C<$referrer>, as it was presumably already searched by a previous iteration of the search. Used by C. |
705
|
|
|
|
|
|
|
#pod |
706
|
|
|
|
|
|
|
#pod =cut |
707
|
|
|
|
|
|
|
|
708
|
|
|
|
|
|
|
sub descendant { |
709
|
554
|
|
|
554
|
1
|
19525
|
my ($self, $path, $referrer) = @_; |
710
|
554
|
50
|
|
|
|
1025
|
return unless $self->has_children; |
711
|
554
|
|
100
|
|
|
1565
|
$referrer //= ''; |
712
|
|
|
|
|
|
|
|
713
|
554
|
|
|
|
|
4008
|
$self->DEBUG("Searching for location '$path'"); |
714
|
554
|
100
|
|
|
|
14224
|
$self->DEBUG('Referrer', $referrer) if $referrer; |
715
|
|
|
|
|
|
|
|
716
|
|
|
|
|
|
|
# Search immediate children |
717
|
554
|
100
|
66
|
|
|
8351
|
$path = Piper::Path->new($path) if $path and not ref $path; |
718
|
554
|
50
|
|
|
|
10001
|
my @pieces = $path ? $path->split : (); |
719
|
554
|
|
|
|
|
972
|
my $descend = $self; |
720
|
554
|
|
100
|
|
|
1629
|
while (defined $descend and @pieces) { |
721
|
731
|
100
|
|
|
|
10121
|
if (!$descend->has_children) { |
|
|
100
|
|
|
|
|
|
722
|
62
|
|
|
|
|
122
|
$descend = undef; |
723
|
|
|
|
|
|
|
} |
724
|
|
|
|
|
|
|
elsif (exists $descend->directory->{$pieces[0]}) { |
725
|
348
|
|
|
|
|
5947
|
$descend = $descend->directory->{$pieces[0]}; |
726
|
348
|
|
|
|
|
2456
|
shift @pieces; |
727
|
|
|
|
|
|
|
} |
728
|
|
|
|
|
|
|
else { |
729
|
321
|
|
|
|
|
2722
|
$descend = undef; |
730
|
|
|
|
|
|
|
} |
731
|
|
|
|
|
|
|
} |
732
|
|
|
|
|
|
|
|
733
|
|
|
|
|
|
|
# Search grandchildren, |
734
|
|
|
|
|
|
|
# but not when checking whether requested location starts at $self (referrer = $self) |
735
|
554
|
100
|
100
|
|
|
1380
|
if (!defined $descend and $referrer ne $self) { |
736
|
358
|
|
|
|
|
454
|
my @possible; |
737
|
358
|
|
|
|
|
368
|
for my $child (@{$self->children}) { |
|
358
|
|
|
|
|
659
|
|
738
|
570
|
100
|
|
|
|
1029
|
if ($child eq $referrer) { |
739
|
79
|
|
|
|
|
174
|
$self->DEBUG("Skipping search of '$child' referrer"); |
740
|
79
|
|
|
|
|
1986
|
next; |
741
|
|
|
|
|
|
|
} |
742
|
491
|
100
|
|
|
|
1166
|
if ($child->has_children) { |
743
|
133
|
|
|
|
|
225
|
my $potential = $child->descendant($path); |
744
|
133
|
100
|
|
|
|
270
|
push @possible, $potential if defined $potential; |
745
|
|
|
|
|
|
|
} |
746
|
|
|
|
|
|
|
} |
747
|
|
|
|
|
|
|
|
748
|
358
|
100
|
|
|
|
657
|
if (@possible) { |
749
|
24
|
|
|
24
|
|
161
|
$descend = min_by { $_->path->split } @possible; |
|
24
|
|
|
|
|
524
|
|
750
|
|
|
|
|
|
|
} |
751
|
|
|
|
|
|
|
} |
752
|
|
|
|
|
|
|
|
753
|
|
|
|
|
|
|
# If location begins with $self->label, see if requested location starts at $self |
754
|
|
|
|
|
|
|
# but not if already checking that (referrer = $self) |
755
|
554
|
100
|
100
|
|
|
1496
|
if (!defined $descend and $referrer ne $self) { |
756
|
334
|
|
|
|
|
4381
|
my $overlap = $self->label; |
757
|
334
|
100
|
|
|
|
15719
|
if ($path =~ m{^\Q$overlap\E(?:$|/(?.*))}) { |
758
|
4
|
|
100
|
4
|
|
39957
|
$path = $+{path} // ''; |
|
4
|
|
|
|
|
1212
|
|
|
4
|
|
|
|
|
1304
|
|
|
134
|
|
|
|
|
790
|
|
759
|
134
|
100
|
|
|
|
2281
|
$self->DEBUG('Overlapping descendant search', $path ? $path : ()); |
760
|
134
|
100
|
|
|
|
3469
|
$descend = $path ? $self->descendant($path, $self) : $self; |
761
|
|
|
|
|
|
|
} |
762
|
|
|
|
|
|
|
} |
763
|
|
|
|
|
|
|
|
764
|
554
|
|
|
|
|
2110
|
return $descend; |
765
|
|
|
|
|
|
|
} |
766
|
|
|
|
|
|
|
|
767
|
|
|
|
|
|
|
#pod =head2 pressure |
768
|
|
|
|
|
|
|
#pod |
769
|
|
|
|
|
|
|
#pod An integer metric for the "fullness" of the pending queue. For handler instances (initialized from L objects), it is the percentage of pending items vs the batch size of the segment. For container instances (initialized from L objects), is is the maximum C of the contained instances. Used by process_batch for choosing which segment to process. |
770
|
|
|
|
|
|
|
#pod |
771
|
|
|
|
|
|
|
#pod =cut |
772
|
|
|
|
|
|
|
|
773
|
|
|
|
|
|
|
# Metric for "how full" the pending queue is |
774
|
|
|
|
|
|
|
sub pressure { |
775
|
532
|
|
|
532
|
1
|
648
|
my ($self) = @_; |
776
|
532
|
100
|
|
|
|
848
|
if ($self->has_children) { |
777
|
79
|
|
|
|
|
86
|
return max(map { $_->pressure } @{$self->children}); |
|
213
|
|
|
|
|
5316
|
|
|
79
|
|
|
|
|
133
|
|
778
|
|
|
|
|
|
|
} |
779
|
|
|
|
|
|
|
else { |
780
|
453
|
100
|
50
|
|
|
636
|
return $self->pending ? (int(100 * $self->pending / $self->batch_size) || 1) : 0; |
781
|
|
|
|
|
|
|
} |
782
|
|
|
|
|
|
|
} |
783
|
|
|
|
|
|
|
|
784
|
|
|
|
|
|
|
#pod =head2 process_batch |
785
|
|
|
|
|
|
|
#pod |
786
|
|
|
|
|
|
|
#pod Chooses the "best" segment for processing, and processes a batch for that segment. |
787
|
|
|
|
|
|
|
#pod |
788
|
|
|
|
|
|
|
#pod It first attempts to choose the full-batch segment (C<< pending >= batch_size >>) closest to the end of the pipeline. If there are no full-batch segments, it chooses the segment closest to being full. |
789
|
|
|
|
|
|
|
#pod |
790
|
|
|
|
|
|
|
#pod =cut |
791
|
|
|
|
|
|
|
|
792
|
|
|
|
|
|
|
sub process_batch { |
793
|
283
|
|
|
283
|
1
|
6904
|
my ($self) = @_; |
794
|
283
|
100
|
|
|
|
516
|
if ($self->has_children) { |
795
|
166
|
|
|
|
|
173
|
my $best; |
796
|
|
|
|
|
|
|
# Full-batch process closest to drain |
797
|
166
|
100
|
|
282
|
|
473
|
if ($best = last_value { $_->pressure >= 100 } @{$self->children}) { |
|
282
|
|
|
|
|
980
|
|
|
166
|
|
|
|
|
428
|
|
798
|
148
|
|
|
|
|
320
|
$self->DEBUG("Chose batch $best: full-batch process closest to drain"); |
799
|
|
|
|
|
|
|
} |
800
|
|
|
|
|
|
|
# If no full batch, choose the one closest to full |
801
|
|
|
|
|
|
|
else { |
802
|
18
|
|
|
31
|
|
832
|
$best = max_by { $_->pressure } @{$self->children}; |
|
31
|
|
|
|
|
537
|
|
|
18
|
|
|
|
|
71
|
|
803
|
18
|
|
|
|
|
856
|
$self->DEBUG("Chose batch $best: closest to full-batch"); |
804
|
|
|
|
|
|
|
} |
805
|
166
|
|
|
|
|
4465
|
$best->process_batch; |
806
|
|
|
|
|
|
|
} |
807
|
|
|
|
|
|
|
else { |
808
|
117
|
|
|
|
|
1771
|
my $num = $self->batch_size; |
809
|
117
|
|
|
|
|
8725
|
$self->DEBUG('Processing batch with max size', $num); |
810
|
|
|
|
|
|
|
|
811
|
117
|
|
|
|
|
4150
|
my @batch = $self->queue->dequeue($num); |
812
|
117
|
|
|
|
|
1797
|
$self->INFO('Processing batch', @batch); |
813
|
|
|
|
|
|
|
|
814
|
|
|
|
|
|
|
$self->segment->handler->( |
815
|
|
|
|
|
|
|
$self, |
816
|
|
|
|
|
|
|
\@batch, |
817
|
117
|
|
|
|
|
2938
|
@{$self->args} |
|
117
|
|
|
|
|
1566
|
|
818
|
|
|
|
|
|
|
); |
819
|
|
|
|
|
|
|
} |
820
|
|
|
|
|
|
|
} |
821
|
|
|
|
|
|
|
|
822
|
|
|
|
|
|
|
1; |
823
|
|
|
|
|
|
|
|
824
|
|
|
|
|
|
|
__END__ |