line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Thread::Queue; |
2
|
|
|
|
|
|
|
|
3
|
3
|
|
|
3
|
|
51413
|
use strict; |
|
3
|
|
|
|
|
16
|
|
|
3
|
|
|
|
|
65
|
|
4
|
3
|
|
|
3
|
|
9
|
use warnings; |
|
3
|
|
|
|
|
5
|
|
|
3
|
|
|
|
|
118
|
|
5
|
|
|
|
|
|
|
|
6
|
|
|
|
|
|
|
our $VERSION = '3.13'; |
7
|
|
|
|
|
|
|
$VERSION = eval $VERSION; |
8
|
|
|
|
|
|
|
|
9
|
3
|
|
|
3
|
|
1200
|
use threads::shared 1.21; |
|
3
|
|
|
|
|
2568
|
|
|
3
|
|
|
|
|
9
|
|
10
|
3
|
|
|
3
|
|
177
|
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); |
|
3
|
|
|
|
|
48
|
|
|
3
|
|
|
|
|
1406
|
|
11
|
|
|
|
|
|
|
|
12
|
|
|
|
|
|
|
# Carp errors from threads::shared calls should complain about caller |
13
|
|
|
|
|
|
|
our @CARP_NOT = ("threads::shared"); |
14
|
|
|
|
|
|
|
|
15
|
|
|
|
|
|
|
# Create a new queue possibly pre-populated with items |
16
|
|
|
|
|
|
|
sub new |
17
|
|
|
|
|
|
|
{ |
18
|
2
|
|
|
2
|
1
|
755
|
my $class = shift; |
19
|
2
|
|
|
2
|
|
6
|
my @queue :shared = map { shared_clone($_) } @_; |
|
12
|
|
|
|
|
86
|
|
|
2
|
|
|
|
|
780
|
|
|
2
|
|
|
|
|
1818
|
|
|
2
|
|
|
|
|
2323
|
|
20
|
2
|
|
|
|
|
74
|
my %self :shared = ( 'queue' => \@queue ); |
21
|
2
|
|
|
|
|
35
|
return bless(\%self, $class); |
22
|
|
|
|
|
|
|
} |
23
|
|
|
|
|
|
|
|
24
|
|
|
|
|
|
|
# Add items to the tail of a queue |
25
|
|
|
|
|
|
|
sub enqueue |
26
|
|
|
|
|
|
|
{ |
27
|
2
|
|
|
2
|
1
|
5
|
my $self = shift; |
28
|
2
|
|
|
|
|
3
|
lock(%$self); |
29
|
|
|
|
|
|
|
|
30
|
2
|
50
|
|
|
|
5
|
if ($$self{'ENDED'}) { |
31
|
0
|
|
|
|
|
0
|
require Carp; |
32
|
0
|
|
|
|
|
0
|
Carp::croak("'enqueue' method called on queue that has been 'end'ed"); |
33
|
|
|
|
|
|
|
} |
34
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
# Block if queue size exceeds any specified limit |
36
|
2
|
|
|
|
|
3
|
my $queue = $$self{'queue'}; |
37
|
2
|
|
33
|
|
|
7
|
cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'})); |
38
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
# Add items to queue, and then signal other threads |
40
|
2
|
50
|
|
|
|
10
|
push(@$queue, map { shared_clone($_) } @_) |
|
5
|
|
|
|
|
77
|
|
41
|
|
|
|
|
|
|
and cond_signal(%$self); |
42
|
|
|
|
|
|
|
} |
43
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
# Set or return the max. size for a queue |
45
|
|
|
|
|
|
|
sub limit : lvalue |
46
|
|
|
|
|
|
|
{ |
47
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
48
|
0
|
|
|
|
|
0
|
lock(%$self); |
49
|
0
|
|
|
|
|
0
|
$$self{'LIMIT'}; |
50
|
|
|
|
|
|
|
} |
51
|
|
|
|
|
|
|
|
52
|
|
|
|
|
|
|
# Return a count of the number of items on a queue |
53
|
|
|
|
|
|
|
sub pending |
54
|
|
|
|
|
|
|
{ |
55
|
6
|
|
|
6
|
1
|
739
|
my $self = shift; |
56
|
6
|
|
|
|
|
10
|
lock(%$self); |
57
|
6
|
50
|
33
|
|
|
14
|
return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); |
|
0
|
|
|
|
|
0
|
|
58
|
6
|
|
|
|
|
7
|
return scalar(@{$$self{'queue'}}); |
|
6
|
|
|
|
|
18
|
|
59
|
|
|
|
|
|
|
} |
60
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
# Indicate that no more data will enter the queue |
62
|
|
|
|
|
|
|
sub end |
63
|
|
|
|
|
|
|
{ |
64
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
65
|
0
|
|
|
|
|
0
|
lock(%$self); |
66
|
|
|
|
|
|
|
# No more data is coming |
67
|
0
|
|
|
|
|
0
|
$$self{'ENDED'} = 1; |
68
|
|
|
|
|
|
|
|
69
|
0
|
|
|
|
|
0
|
cond_signal(%$self); # Unblock possibly waiting threads |
70
|
|
|
|
|
|
|
} |
71
|
|
|
|
|
|
|
|
72
|
|
|
|
|
|
|
# Return 1 or more items from the head of a queue, blocking if needed |
73
|
|
|
|
|
|
|
sub dequeue |
74
|
|
|
|
|
|
|
{ |
75
|
12
|
|
|
12
|
1
|
9163
|
my $self = shift; |
76
|
12
|
|
|
|
|
16
|
lock(%$self); |
77
|
12
|
|
|
|
|
19
|
my $queue = $$self{'queue'}; |
78
|
|
|
|
|
|
|
|
79
|
12
|
100
|
|
|
|
26
|
my $count = @_ ? $self->_validate_count(shift) : 1; |
80
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
# Wait for requisite number of items |
82
|
7
|
|
33
|
|
|
15
|
cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); |
83
|
|
|
|
|
|
|
|
84
|
|
|
|
|
|
|
# If no longer blocking, try getting whatever is left on the queue |
85
|
7
|
50
|
|
|
|
10
|
return $self->dequeue_nb($count) if ($$self{'ENDED'}); |
86
|
|
|
|
|
|
|
|
87
|
|
|
|
|
|
|
# Return single item |
88
|
7
|
50
|
|
|
|
12
|
if ($count == 1) { |
89
|
7
|
|
|
|
|
7
|
my $item = shift(@$queue); |
90
|
7
|
|
|
|
|
129
|
cond_signal(%$self); # Unblock possibly waiting threads |
91
|
7
|
|
|
|
|
28
|
return $item; |
92
|
|
|
|
|
|
|
} |
93
|
|
|
|
|
|
|
|
94
|
|
|
|
|
|
|
# Return multiple items |
95
|
0
|
|
|
|
|
0
|
my @items; |
96
|
0
|
|
|
|
|
0
|
push(@items, shift(@$queue)) for (1..$count); |
97
|
0
|
|
|
|
|
0
|
cond_signal(%$self); # Unblock possibly waiting threads |
98
|
0
|
|
|
|
|
0
|
return @items; |
99
|
|
|
|
|
|
|
} |
100
|
|
|
|
|
|
|
|
101
|
|
|
|
|
|
|
# Return items from the head of a queue with no blocking |
102
|
|
|
|
|
|
|
sub dequeue_nb |
103
|
|
|
|
|
|
|
{ |
104
|
6
|
|
|
6
|
1
|
2181
|
my $self = shift; |
105
|
6
|
|
|
|
|
11
|
lock(%$self); |
106
|
6
|
|
|
|
|
9
|
my $queue = $$self{'queue'}; |
107
|
|
|
|
|
|
|
|
108
|
6
|
100
|
|
|
|
15
|
my $count = @_ ? $self->_validate_count(shift) : 1; |
109
|
|
|
|
|
|
|
|
110
|
|
|
|
|
|
|
# Return single item |
111
|
1
|
50
|
|
|
|
4
|
if ($count == 1) { |
112
|
1
|
|
|
|
|
2
|
my $item = shift(@$queue); |
113
|
1
|
|
|
|
|
19
|
cond_signal(%$self); # Unblock possibly waiting threads |
114
|
1
|
|
|
|
|
4
|
return $item; |
115
|
|
|
|
|
|
|
} |
116
|
|
|
|
|
|
|
|
117
|
|
|
|
|
|
|
# Return multiple items |
118
|
0
|
|
|
|
|
0
|
my @items; |
119
|
0
|
|
|
|
|
0
|
for (1..$count) { |
120
|
0
|
0
|
|
|
|
0
|
last if (! @$queue); |
121
|
0
|
|
|
|
|
0
|
push(@items, shift(@$queue)); |
122
|
|
|
|
|
|
|
} |
123
|
0
|
|
|
|
|
0
|
cond_signal(%$self); # Unblock possibly waiting threads |
124
|
0
|
|
|
|
|
0
|
return @items; |
125
|
|
|
|
|
|
|
} |
126
|
|
|
|
|
|
|
|
127
|
|
|
|
|
|
|
# Return items from the head of a queue, blocking if needed up to a timeout |
128
|
|
|
|
|
|
|
sub dequeue_timed |
129
|
|
|
|
|
|
|
{ |
130
|
0
|
|
|
0
|
1
|
0
|
my $self = shift; |
131
|
0
|
|
|
|
|
0
|
lock(%$self); |
132
|
0
|
|
|
|
|
0
|
my $queue = $$self{'queue'}; |
133
|
|
|
|
|
|
|
|
134
|
|
|
|
|
|
|
# Timeout may be relative or absolute |
135
|
0
|
0
|
|
|
|
0
|
my $timeout = @_ ? $self->_validate_timeout(shift) : -1; |
136
|
|
|
|
|
|
|
# Convert to an absolute time for use with cond_timedwait() |
137
|
0
|
0
|
|
|
|
0
|
if ($timeout < 32000000) { # More than one year |
138
|
0
|
|
|
|
|
0
|
$timeout += time(); |
139
|
|
|
|
|
|
|
} |
140
|
|
|
|
|
|
|
|
141
|
0
|
0
|
|
|
|
0
|
my $count = @_ ? $self->_validate_count(shift) : 1; |
142
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# Wait for requisite number of items, or until timeout |
144
|
0
|
|
0
|
|
|
0
|
while ((@$queue < $count) && ! $$self{'ENDED'}) { |
145
|
0
|
0
|
|
|
|
0
|
last if (! cond_timedwait(%$self, $timeout)); |
146
|
|
|
|
|
|
|
} |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
# Get whatever we need off the queue if available |
149
|
0
|
|
|
|
|
0
|
return $self->dequeue_nb($count); |
150
|
|
|
|
|
|
|
} |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
# Return an item without removing it from a queue |
153
|
|
|
|
|
|
|
sub peek |
154
|
|
|
|
|
|
|
{ |
155
|
4
|
|
|
4
|
1
|
1310
|
my $self = shift; |
156
|
4
|
|
|
|
|
6
|
lock(%$self); |
157
|
4
|
50
|
|
|
|
14
|
my $index = @_ ? $self->_validate_index(shift) : 0; |
158
|
1
|
|
|
|
|
3
|
return $$self{'queue'}[$index]; |
159
|
|
|
|
|
|
|
} |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
# Insert items anywhere into a queue |
162
|
|
|
|
|
|
|
sub insert |
163
|
|
|
|
|
|
|
{ |
164
|
4
|
|
|
4
|
1
|
1697
|
my $self = shift; |
165
|
4
|
|
|
|
|
7
|
lock(%$self); |
166
|
|
|
|
|
|
|
|
167
|
4
|
50
|
|
|
|
7
|
if ($$self{'ENDED'}) { |
168
|
0
|
|
|
|
|
0
|
require Carp; |
169
|
0
|
|
|
|
|
0
|
Carp::croak("'insert' method called on queue that has been 'end'ed"); |
170
|
|
|
|
|
|
|
} |
171
|
|
|
|
|
|
|
|
172
|
4
|
|
|
|
|
5
|
my $queue = $$self{'queue'}; |
173
|
|
|
|
|
|
|
|
174
|
4
|
|
|
|
|
7
|
my $index = $self->_validate_index(shift); |
175
|
|
|
|
|
|
|
|
176
|
0
|
0
|
|
|
|
0
|
return if (! @_); # Nothing to insert |
177
|
|
|
|
|
|
|
|
178
|
|
|
|
|
|
|
# Support negative indices |
179
|
0
|
0
|
|
|
|
0
|
if ($index < 0) { |
180
|
0
|
|
|
|
|
0
|
$index += @$queue; |
181
|
0
|
0
|
|
|
|
0
|
if ($index < 0) { |
182
|
0
|
|
|
|
|
0
|
$index = 0; |
183
|
|
|
|
|
|
|
} |
184
|
|
|
|
|
|
|
} |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
# Dequeue items from $index onward |
187
|
0
|
|
|
|
|
0
|
my @tmp; |
188
|
0
|
|
|
|
|
0
|
while (@$queue > $index) { |
189
|
0
|
|
|
|
|
0
|
unshift(@tmp, pop(@$queue)) |
190
|
|
|
|
|
|
|
} |
191
|
|
|
|
|
|
|
|
192
|
|
|
|
|
|
|
# Add new items to the queue |
193
|
0
|
|
|
|
|
0
|
push(@$queue, map { shared_clone($_) } @_); |
|
0
|
|
|
|
|
0
|
|
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
# Add previous items back onto the queue |
196
|
0
|
|
|
|
|
0
|
push(@$queue, @tmp); |
197
|
|
|
|
|
|
|
|
198
|
0
|
|
|
|
|
0
|
cond_signal(%$self); # Unblock possibly waiting threads |
199
|
|
|
|
|
|
|
} |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
# Remove items from anywhere in a queue |
202
|
|
|
|
|
|
|
sub extract |
203
|
|
|
|
|
|
|
{ |
204
|
8
|
|
|
8
|
1
|
3484
|
my $self = shift; |
205
|
8
|
|
|
|
|
12
|
lock(%$self); |
206
|
8
|
|
|
|
|
12
|
my $queue = $$self{'queue'}; |
207
|
|
|
|
|
|
|
|
208
|
8
|
50
|
|
|
|
17
|
my $index = @_ ? $self->_validate_index(shift) : 0; |
209
|
5
|
50
|
|
|
|
10
|
my $count = @_ ? $self->_validate_count(shift) : 1; |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
# Support negative indices |
212
|
0
|
0
|
|
|
|
0
|
if ($index < 0) { |
213
|
0
|
|
|
|
|
0
|
$index += @$queue; |
214
|
0
|
0
|
|
|
|
0
|
if ($index < 0) { |
215
|
0
|
|
|
|
|
0
|
$count += $index; |
216
|
0
|
0
|
|
|
|
0
|
return if ($count <= 0); # Beyond the head of the queue |
217
|
0
|
|
|
|
|
0
|
return $self->dequeue_nb($count); # Extract from the head |
218
|
|
|
|
|
|
|
} |
219
|
|
|
|
|
|
|
} |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
# Dequeue items from $index+$count onward |
222
|
0
|
|
|
|
|
0
|
my @tmp; |
223
|
0
|
|
|
|
|
0
|
while (@$queue > ($index+$count)) { |
224
|
0
|
|
|
|
|
0
|
unshift(@tmp, pop(@$queue)) |
225
|
|
|
|
|
|
|
} |
226
|
|
|
|
|
|
|
|
227
|
|
|
|
|
|
|
# Extract desired items |
228
|
0
|
|
|
|
|
0
|
my @items; |
229
|
0
|
|
|
|
|
0
|
unshift(@items, pop(@$queue)) while (@$queue > $index); |
230
|
|
|
|
|
|
|
|
231
|
|
|
|
|
|
|
# Add back any removed items |
232
|
0
|
|
|
|
|
0
|
push(@$queue, @tmp); |
233
|
|
|
|
|
|
|
|
234
|
0
|
|
|
|
|
0
|
cond_signal(%$self); # Unblock possibly waiting threads |
235
|
|
|
|
|
|
|
|
236
|
|
|
|
|
|
|
# Return single item |
237
|
0
|
0
|
|
|
|
0
|
return $items[0] if ($count == 1); |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
# Return multiple items |
240
|
0
|
|
|
|
|
0
|
return @items; |
241
|
|
|
|
|
|
|
} |
242
|
|
|
|
|
|
|
|
243
|
|
|
|
|
|
|
### Internal Methods ### |
244
|
|
|
|
|
|
|
|
245
|
|
|
|
|
|
|
# Check value of the requested index |
246
|
|
|
|
|
|
|
sub _validate_index |
247
|
|
|
|
|
|
|
{ |
248
|
16
|
|
|
16
|
|
15
|
my $self = shift; |
249
|
16
|
|
|
|
|
20
|
my $index = shift; |
250
|
|
|
|
|
|
|
|
251
|
16
|
100
|
100
|
|
|
80
|
if (! defined($index) || |
|
|
|
100
|
|
|
|
|
252
|
|
|
|
|
|
|
! looks_like_number($index) || |
253
|
|
|
|
|
|
|
(int($index) != $index)) |
254
|
|
|
|
|
|
|
{ |
255
|
10
|
|
|
|
|
39
|
require Carp; |
256
|
10
|
|
|
|
|
41
|
my ($method) = (caller(1))[3]; |
257
|
10
|
|
|
|
|
17
|
my $class_name = ref($self); |
258
|
10
|
|
|
|
|
41
|
$method =~ s/$class_name\:://; |
259
|
10
|
100
|
|
|
|
20
|
$index = 'undef' if (! defined($index)); |
260
|
10
|
|
|
|
|
635
|
Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
261
|
|
|
|
|
|
|
} |
262
|
|
|
|
|
|
|
|
263
|
6
|
|
|
|
|
11
|
return $index; |
264
|
|
|
|
|
|
|
}; |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
# Check value of the requested count |
267
|
|
|
|
|
|
|
sub _validate_count |
268
|
|
|
|
|
|
|
{ |
269
|
15
|
|
|
15
|
|
16
|
my $self = shift; |
270
|
15
|
|
|
|
|
15
|
my $count = shift; |
271
|
|
|
|
|
|
|
|
272
|
15
|
50
|
100
|
|
|
76
|
if (! defined($count) || |
|
|
|
100
|
|
|
|
|
|
|
|
66
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
33
|
|
|
|
|
273
|
|
|
|
|
|
|
! looks_like_number($count) || |
274
|
|
|
|
|
|
|
(int($count) != $count) || |
275
|
|
|
|
|
|
|
($count < 1) || |
276
|
|
|
|
|
|
|
($$self{'LIMIT'} && $count > $$self{'LIMIT'})) |
277
|
|
|
|
|
|
|
{ |
278
|
15
|
|
|
|
|
61
|
require Carp; |
279
|
15
|
|
|
|
|
62
|
my ($method) = (caller(1))[3]; |
280
|
15
|
|
|
|
|
26
|
my $class_name = ref($self); |
281
|
15
|
|
|
|
|
61
|
$method =~ s/$class_name\:://; |
282
|
15
|
100
|
|
|
|
24
|
$count = 'undef' if (! defined($count)); |
283
|
15
|
50
|
33
|
|
|
60
|
if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) { |
284
|
0
|
|
|
|
|
0
|
Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})"); |
285
|
|
|
|
|
|
|
} else { |
286
|
15
|
|
|
|
|
1067
|
Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
287
|
|
|
|
|
|
|
} |
288
|
|
|
|
|
|
|
} |
289
|
|
|
|
|
|
|
|
290
|
0
|
|
|
|
|
0
|
return $count; |
291
|
|
|
|
|
|
|
}; |
292
|
|
|
|
|
|
|
|
293
|
|
|
|
|
|
|
# Check value of the requested timeout |
294
|
|
|
|
|
|
|
sub _validate_timeout |
295
|
|
|
|
|
|
|
{ |
296
|
0
|
|
|
0
|
|
0
|
my $self = shift; |
297
|
0
|
|
|
|
|
0
|
my $timeout = shift; |
298
|
|
|
|
|
|
|
|
299
|
0
|
0
|
0
|
|
|
0
|
if (! defined($timeout) || |
300
|
|
|
|
|
|
|
! looks_like_number($timeout)) |
301
|
|
|
|
|
|
|
{ |
302
|
0
|
|
|
|
|
0
|
require Carp; |
303
|
0
|
|
|
|
|
0
|
my ($method) = (caller(1))[3]; |
304
|
0
|
|
|
|
|
0
|
my $class_name = ref($self); |
305
|
0
|
|
|
|
|
0
|
$method =~ s/$class_name\:://; |
306
|
0
|
0
|
|
|
|
0
|
$timeout = 'undef' if (! defined($timeout)); |
307
|
0
|
|
|
|
|
0
|
Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); |
308
|
|
|
|
|
|
|
} |
309
|
|
|
|
|
|
|
|
310
|
0
|
|
|
|
|
0
|
return $timeout; |
311
|
|
|
|
|
|
|
}; |
312
|
|
|
|
|
|
|
|
313
|
|
|
|
|
|
|
1; |
314
|
|
|
|
|
|
|
|
315
|
|
|
|
|
|
|
=head1 NAME |
316
|
|
|
|
|
|
|
|
317
|
|
|
|
|
|
|
Thread::Queue - Thread-safe queues |
318
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
=head1 VERSION |
320
|
|
|
|
|
|
|
|
321
|
|
|
|
|
|
|
This document describes Thread::Queue version 3.13 |
322
|
|
|
|
|
|
|
|
323
|
|
|
|
|
|
|
=head1 SYNOPSIS |
324
|
|
|
|
|
|
|
|
325
|
|
|
|
|
|
|
use strict; |
326
|
|
|
|
|
|
|
use warnings; |
327
|
|
|
|
|
|
|
|
328
|
|
|
|
|
|
|
use threads; |
329
|
|
|
|
|
|
|
use Thread::Queue; |
330
|
|
|
|
|
|
|
|
331
|
|
|
|
|
|
|
my $q = Thread::Queue->new(); # A new empty queue |
332
|
|
|
|
|
|
|
|
333
|
|
|
|
|
|
|
# Worker thread |
334
|
|
|
|
|
|
|
my $thr = threads->create( |
335
|
|
|
|
|
|
|
sub { |
336
|
|
|
|
|
|
|
# Thread will loop until no more work |
337
|
|
|
|
|
|
|
while (defined(my $item = $q->dequeue())) { |
338
|
|
|
|
|
|
|
# Do work on $item |
339
|
|
|
|
|
|
|
... |
340
|
|
|
|
|
|
|
} |
341
|
|
|
|
|
|
|
} |
342
|
|
|
|
|
|
|
); |
343
|
|
|
|
|
|
|
|
344
|
|
|
|
|
|
|
# Send work to the thread |
345
|
|
|
|
|
|
|
$q->enqueue($item1, ...); |
346
|
|
|
|
|
|
|
# Signal that there is no more work to be sent |
347
|
|
|
|
|
|
|
$q->end(); |
348
|
|
|
|
|
|
|
# Join up with the thread when it finishes |
349
|
|
|
|
|
|
|
$thr->join(); |
350
|
|
|
|
|
|
|
|
351
|
|
|
|
|
|
|
... |
352
|
|
|
|
|
|
|
|
353
|
|
|
|
|
|
|
# Count of items in the queue |
354
|
|
|
|
|
|
|
my $left = $q->pending(); |
355
|
|
|
|
|
|
|
|
356
|
|
|
|
|
|
|
# Non-blocking dequeue |
357
|
|
|
|
|
|
|
if (defined(my $item = $q->dequeue_nb())) { |
358
|
|
|
|
|
|
|
# Work on $item |
359
|
|
|
|
|
|
|
} |
360
|
|
|
|
|
|
|
|
361
|
|
|
|
|
|
|
# Blocking dequeue with 5-second timeout |
362
|
|
|
|
|
|
|
if (defined(my $item = $q->dequeue_timed(5))) { |
363
|
|
|
|
|
|
|
# Work on $item |
364
|
|
|
|
|
|
|
} |
365
|
|
|
|
|
|
|
|
366
|
|
|
|
|
|
|
# Set a size for a queue |
367
|
|
|
|
|
|
|
$q->limit = 5; |
368
|
|
|
|
|
|
|
|
369
|
|
|
|
|
|
|
# Get the second item in the queue without dequeuing anything |
370
|
|
|
|
|
|
|
my $item = $q->peek(1); |
371
|
|
|
|
|
|
|
|
372
|
|
|
|
|
|
|
# Insert two items into the queue just behind the head |
373
|
|
|
|
|
|
|
$q->insert(1, $item1, $item2); |
374
|
|
|
|
|
|
|
|
375
|
|
|
|
|
|
|
# Extract the last two items on the queue |
376
|
|
|
|
|
|
|
my ($item1, $item2) = $q->extract(-2, 2); |
377
|
|
|
|
|
|
|
|
378
|
|
|
|
|
|
|
=head1 DESCRIPTION |
379
|
|
|
|
|
|
|
|
380
|
|
|
|
|
|
|
This module provides thread-safe FIFO queues that can be accessed safely by |
381
|
|
|
|
|
|
|
any number of threads. |
382
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
Any data types supported by L can be passed via queues: |
384
|
|
|
|
|
|
|
|
385
|
|
|
|
|
|
|
=over |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
=item Ordinary scalars |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
=item Array refs |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
=item Hash refs |
392
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
=item Scalar refs |
394
|
|
|
|
|
|
|
|
395
|
|
|
|
|
|
|
=item Objects based on the above |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
=back |
398
|
|
|
|
|
|
|
|
399
|
|
|
|
|
|
|
Ordinary scalars are added to queues as they are. |
400
|
|
|
|
|
|
|
|
401
|
|
|
|
|
|
|
If not already thread-shared, the other complex data types will be cloned |
402
|
|
|
|
|
|
|
(recursively, if needed, and including any Cings and read-only |
403
|
|
|
|
|
|
|
settings) into thread-shared structures before being placed onto a queue. |
404
|
|
|
|
|
|
|
|
405
|
|
|
|
|
|
|
For example, the following would cause L to create a empty, |
406
|
|
|
|
|
|
|
shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
407
|
|
|
|
|
|
|
and 'baz' from C<@ary> into it, and then place that shared reference onto |
408
|
|
|
|
|
|
|
the queue: |
409
|
|
|
|
|
|
|
|
410
|
|
|
|
|
|
|
my @ary = qw/foo bar baz/; |
411
|
|
|
|
|
|
|
$q->enqueue(\@ary); |
412
|
|
|
|
|
|
|
|
413
|
|
|
|
|
|
|
However, for the following, the items are already shared, so their references |
414
|
|
|
|
|
|
|
are added directly to the queue, and no cloning takes place: |
415
|
|
|
|
|
|
|
|
416
|
|
|
|
|
|
|
my @ary :shared = qw/foo bar baz/; |
417
|
|
|
|
|
|
|
$q->enqueue(\@ary); |
418
|
|
|
|
|
|
|
|
419
|
|
|
|
|
|
|
my $obj = &shared({}); |
420
|
|
|
|
|
|
|
$$obj{'foo'} = 'bar'; |
421
|
|
|
|
|
|
|
$$obj{'qux'} = 99; |
422
|
|
|
|
|
|
|
bless($obj, 'My::Class'); |
423
|
|
|
|
|
|
|
$q->enqueue($obj); |
424
|
|
|
|
|
|
|
|
425
|
|
|
|
|
|
|
See L"LIMITATIONS"> for caveats related to passing objects via queues. |
426
|
|
|
|
|
|
|
|
427
|
|
|
|
|
|
|
=head1 QUEUE CREATION |
428
|
|
|
|
|
|
|
|
429
|
|
|
|
|
|
|
=over |
430
|
|
|
|
|
|
|
|
431
|
|
|
|
|
|
|
=item ->new() |
432
|
|
|
|
|
|
|
|
433
|
|
|
|
|
|
|
Creates a new empty queue. |
434
|
|
|
|
|
|
|
|
435
|
|
|
|
|
|
|
=item ->new(LIST) |
436
|
|
|
|
|
|
|
|
437
|
|
|
|
|
|
|
Creates a new queue pre-populated with the provided list of items. |
438
|
|
|
|
|
|
|
|
439
|
|
|
|
|
|
|
=back |
440
|
|
|
|
|
|
|
|
441
|
|
|
|
|
|
|
=head1 BASIC METHODS |
442
|
|
|
|
|
|
|
|
443
|
|
|
|
|
|
|
The following methods deal with queues on a FIFO basis. |
444
|
|
|
|
|
|
|
|
445
|
|
|
|
|
|
|
=over |
446
|
|
|
|
|
|
|
|
447
|
|
|
|
|
|
|
=item ->enqueue(LIST) |
448
|
|
|
|
|
|
|
|
449
|
|
|
|
|
|
|
Adds a list of items onto the end of the queue. |
450
|
|
|
|
|
|
|
|
451
|
|
|
|
|
|
|
=item ->dequeue() |
452
|
|
|
|
|
|
|
|
453
|
|
|
|
|
|
|
=item ->dequeue(COUNT) |
454
|
|
|
|
|
|
|
|
455
|
|
|
|
|
|
|
Removes the requested number of items (default is 1) from the head of the |
456
|
|
|
|
|
|
|
queue, and returns them. If the queue contains fewer than the requested |
457
|
|
|
|
|
|
|
number of items, then the thread will be blocked until the requisite number |
458
|
|
|
|
|
|
|
of items are available (i.e., until other threads C more items). |
459
|
|
|
|
|
|
|
|
460
|
|
|
|
|
|
|
=item ->dequeue_nb() |
461
|
|
|
|
|
|
|
|
462
|
|
|
|
|
|
|
=item ->dequeue_nb(COUNT) |
463
|
|
|
|
|
|
|
|
464
|
|
|
|
|
|
|
Removes the requested number of items (default is 1) from the head of the |
465
|
|
|
|
|
|
|
queue, and returns them. If the queue contains fewer than the requested |
466
|
|
|
|
|
|
|
number of items, then it immediately (i.e., non-blocking) returns whatever |
467
|
|
|
|
|
|
|
items there are on the queue. If the queue is empty, then C is |
468
|
|
|
|
|
|
|
returned. |
469
|
|
|
|
|
|
|
|
470
|
|
|
|
|
|
|
=item ->dequeue_timed(TIMEOUT) |
471
|
|
|
|
|
|
|
|
472
|
|
|
|
|
|
|
=item ->dequeue_timed(TIMEOUT, COUNT) |
473
|
|
|
|
|
|
|
|
474
|
|
|
|
|
|
|
Removes the requested number of items (default is 1) from the head of the |
475
|
|
|
|
|
|
|
queue, and returns them. If the queue contains fewer than the requested |
476
|
|
|
|
|
|
|
number of items, then the thread will be blocked until the requisite number of |
477
|
|
|
|
|
|
|
items are available, or until the timeout is reached. If the timeout is |
478
|
|
|
|
|
|
|
reached, it returns whatever items there are on the queue, or C if the |
479
|
|
|
|
|
|
|
queue is empty. |
480
|
|
|
|
|
|
|
|
481
|
|
|
|
|
|
|
The timeout may be a number of seconds relative to the current time (e.g., 5 |
482
|
|
|
|
|
|
|
seconds from when the call is made), or may be an absolute timeout in I |
483
|
|
|
|
|
|
|
seconds the same as would be used with |
484
|
|
|
|
|
|
|
L. |
485
|
|
|
|
|
|
|
Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of |
486
|
|
|
|
|
|
|
the underlying implementation). |
487
|
|
|
|
|
|
|
|
488
|
|
|
|
|
|
|
If C is missing, C, or less than or equal to 0, then this call |
489
|
|
|
|
|
|
|
behaves the same as C. |
490
|
|
|
|
|
|
|
|
491
|
|
|
|
|
|
|
=item ->pending() |
492
|
|
|
|
|
|
|
|
493
|
|
|
|
|
|
|
Returns the number of items still in the queue. Returns C if the queue |
494
|
|
|
|
|
|
|
has been ended (see below), and there are no more items in the queue. |
495
|
|
|
|
|
|
|
|
496
|
|
|
|
|
|
|
=item ->limit |
497
|
|
|
|
|
|
|
|
498
|
|
|
|
|
|
|
Sets the size of the queue. If set, calls to C will block until |
499
|
|
|
|
|
|
|
the number of pending items in the queue drops below the C. The |
500
|
|
|
|
|
|
|
C does not prevent enqueuing items beyond that count: |
501
|
|
|
|
|
|
|
|
502
|
|
|
|
|
|
|
my $q = Thread::Queue->new(1, 2); |
503
|
|
|
|
|
|
|
$q->limit = 4; |
504
|
|
|
|
|
|
|
$q->enqueue(3, 4, 5); # Does not block |
505
|
|
|
|
|
|
|
$q->enqueue(6); # Blocks until at least 2 items are |
506
|
|
|
|
|
|
|
# dequeued |
507
|
|
|
|
|
|
|
my $size = $q->limit; # Returns the current limit (may return |
508
|
|
|
|
|
|
|
# 'undef') |
509
|
|
|
|
|
|
|
$q->limit = 0; # Queue size is now unlimited |
510
|
|
|
|
|
|
|
|
511
|
|
|
|
|
|
|
Calling any of the dequeue methods with C greater than a queue's |
512
|
|
|
|
|
|
|
C will generate an error. |
513
|
|
|
|
|
|
|
|
514
|
|
|
|
|
|
|
=item ->end() |
515
|
|
|
|
|
|
|
|
516
|
|
|
|
|
|
|
Declares that no more items will be added to the queue. |
517
|
|
|
|
|
|
|
|
518
|
|
|
|
|
|
|
All threads blocking on C calls will be unblocked with any |
519
|
|
|
|
|
|
|
remaining items in the queue and/or C being returned. Any subsequent |
520
|
|
|
|
|
|
|
calls to C will behave like C. |
521
|
|
|
|
|
|
|
|
522
|
|
|
|
|
|
|
Once ended, no more items may be placed in the queue. |
523
|
|
|
|
|
|
|
|
524
|
|
|
|
|
|
|
=back |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=head1 ADVANCED METHODS |
527
|
|
|
|
|
|
|
|
528
|
|
|
|
|
|
|
The following methods can be used to manipulate items anywhere in a queue. |
529
|
|
|
|
|
|
|
|
530
|
|
|
|
|
|
|
To prevent the contents of a queue from being modified by another thread |
531
|
|
|
|
|
|
|
while it is being examined and/or changed, L
|
532
|
|
|
|
|
|
|
VARIABLE"> the queue inside a local block: |
533
|
|
|
|
|
|
|
|
534
|
|
|
|
|
|
|
{ |
535
|
|
|
|
|
|
|
lock($q); # Keep other threads from changing the queue's contents |
536
|
|
|
|
|
|
|
my $item = $q->peek(); |
537
|
|
|
|
|
|
|
if ($item ...) { |
538
|
|
|
|
|
|
|
... |
539
|
|
|
|
|
|
|
} |
540
|
|
|
|
|
|
|
} |
541
|
|
|
|
|
|
|
# Queue is now unlocked |
542
|
|
|
|
|
|
|
|
543
|
|
|
|
|
|
|
=over |
544
|
|
|
|
|
|
|
|
545
|
|
|
|
|
|
|
=item ->peek() |
546
|
|
|
|
|
|
|
|
547
|
|
|
|
|
|
|
=item ->peek(INDEX) |
548
|
|
|
|
|
|
|
|
549
|
|
|
|
|
|
|
Returns an item from the queue without dequeuing anything. Defaults to the |
550
|
|
|
|
|
|
|
the head of queue (at index position 0) if no index is specified. Negative |
551
|
|
|
|
|
|
|
index values are supported as with L (i.e., -1 |
552
|
|
|
|
|
|
|
is the end of the queue, -2 is next to last, and so on). |
553
|
|
|
|
|
|
|
|
554
|
|
|
|
|
|
|
If no items exists at the specified index (i.e., the queue is empty, or the |
555
|
|
|
|
|
|
|
index is beyond the number of items on the queue), then C is returned. |
556
|
|
|
|
|
|
|
|
557
|
|
|
|
|
|
|
Remember, the returned item is not removed from the queue, so manipulating a |
558
|
|
|
|
|
|
|
Ced at reference affects the item on the queue. |
559
|
|
|
|
|
|
|
|
560
|
|
|
|
|
|
|
=item ->insert(INDEX, LIST) |
561
|
|
|
|
|
|
|
|
562
|
|
|
|
|
|
|
Adds the list of items to the queue at the specified index position (0 |
563
|
|
|
|
|
|
|
is the head of the list). Any existing items at and beyond that position are |
564
|
|
|
|
|
|
|
pushed back past the newly added items: |
565
|
|
|
|
|
|
|
|
566
|
|
|
|
|
|
|
$q->enqueue(1, 2, 3, 4); |
567
|
|
|
|
|
|
|
$q->insert(1, qw/foo bar/); |
568
|
|
|
|
|
|
|
# Queue now contains: 1, foo, bar, 2, 3, 4 |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
Specifying an index position greater than the number of items in the queue |
571
|
|
|
|
|
|
|
just adds the list to the end. |
572
|
|
|
|
|
|
|
|
573
|
|
|
|
|
|
|
Negative index positions are supported: |
574
|
|
|
|
|
|
|
|
575
|
|
|
|
|
|
|
$q->enqueue(1, 2, 3, 4); |
576
|
|
|
|
|
|
|
$q->insert(-2, qw/foo bar/); |
577
|
|
|
|
|
|
|
# Queue now contains: 1, 2, foo, bar, 3, 4 |
578
|
|
|
|
|
|
|
|
579
|
|
|
|
|
|
|
Specifying a negative index position greater than the number of items in the |
580
|
|
|
|
|
|
|
queue adds the list to the head of the queue. |
581
|
|
|
|
|
|
|
|
582
|
|
|
|
|
|
|
=item ->extract() |
583
|
|
|
|
|
|
|
|
584
|
|
|
|
|
|
|
=item ->extract(INDEX) |
585
|
|
|
|
|
|
|
|
586
|
|
|
|
|
|
|
=item ->extract(INDEX, COUNT) |
587
|
|
|
|
|
|
|
|
588
|
|
|
|
|
|
|
Removes and returns the specified number of items (defaults to 1) from the |
589
|
|
|
|
|
|
|
specified index position in the queue (0 is the head of the queue). When |
590
|
|
|
|
|
|
|
called with no arguments, C operates the same as C. |
591
|
|
|
|
|
|
|
|
592
|
|
|
|
|
|
|
This method is non-blocking, and will return only as many items as are |
593
|
|
|
|
|
|
|
available to fulfill the request: |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
$q->enqueue(1, 2, 3, 4); |
596
|
|
|
|
|
|
|
my $item = $q->extract(2) # Returns 3 |
597
|
|
|
|
|
|
|
# Queue now contains: 1, 2, 4 |
598
|
|
|
|
|
|
|
my @items = $q->extract(1, 3) # Returns (2, 4) |
599
|
|
|
|
|
|
|
# Queue now contains: 1 |
600
|
|
|
|
|
|
|
|
601
|
|
|
|
|
|
|
Specifying an index position greater than the number of items in the |
602
|
|
|
|
|
|
|
queue results in C or an empty list being returned. |
603
|
|
|
|
|
|
|
|
604
|
|
|
|
|
|
|
$q->enqueue('foo'); |
605
|
|
|
|
|
|
|
my $nada = $q->extract(3) # Returns undef |
606
|
|
|
|
|
|
|
my @nada = $q->extract(1, 3) # Returns () |
607
|
|
|
|
|
|
|
|
608
|
|
|
|
|
|
|
Negative index positions are supported. Specifying a negative index position |
609
|
|
|
|
|
|
|
greater than the number of items in the queue may return items from the head |
610
|
|
|
|
|
|
|
of the queue (similar to C) if the count overlaps the head of the |
611
|
|
|
|
|
|
|
queue from the specified position (i.e. if queue size + index + count is |
612
|
|
|
|
|
|
|
greater than zero): |
613
|
|
|
|
|
|
|
|
614
|
|
|
|
|
|
|
$q->enqueue(qw/foo bar baz/); |
615
|
|
|
|
|
|
|
my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
616
|
|
|
|
|
|
|
my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
617
|
|
|
|
|
|
|
# Queue now contains: bar, baz |
618
|
|
|
|
|
|
|
my @rest = $q->extract(-3, 4); # Returns (bar, baz) - |
619
|
|
|
|
|
|
|
# (2+(-3)+4) > 0 |
620
|
|
|
|
|
|
|
|
621
|
|
|
|
|
|
|
=back |
622
|
|
|
|
|
|
|
|
623
|
|
|
|
|
|
|
=head1 NOTES |
624
|
|
|
|
|
|
|
|
625
|
|
|
|
|
|
|
Queues created by L can be used in both threaded and |
626
|
|
|
|
|
|
|
non-threaded applications. |
627
|
|
|
|
|
|
|
|
628
|
|
|
|
|
|
|
=head1 LIMITATIONS |
629
|
|
|
|
|
|
|
|
630
|
|
|
|
|
|
|
Passing objects on queues may not work if the objects' classes do not support |
631
|
|
|
|
|
|
|
sharing. See L for more. |
632
|
|
|
|
|
|
|
|
633
|
|
|
|
|
|
|
Passing array/hash refs that contain objects may not work for Perl prior to |
634
|
|
|
|
|
|
|
5.10.0. |
635
|
|
|
|
|
|
|
|
636
|
|
|
|
|
|
|
=head1 SEE ALSO |
637
|
|
|
|
|
|
|
|
638
|
|
|
|
|
|
|
Thread::Queue on MetaCPAN: |
639
|
|
|
|
|
|
|
L |
640
|
|
|
|
|
|
|
|
641
|
|
|
|
|
|
|
Code repository for CPAN distribution: |
642
|
|
|
|
|
|
|
L |
643
|
|
|
|
|
|
|
|
644
|
|
|
|
|
|
|
L, L |
645
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
Sample code in the I directory of this distribution on CPAN. |
647
|
|
|
|
|
|
|
|
648
|
|
|
|
|
|
|
=head1 MAINTAINER |
649
|
|
|
|
|
|
|
|
650
|
|
|
|
|
|
|
Jerry D. Hedden, Sjdhedden AT cpan DOT orgE> |
651
|
|
|
|
|
|
|
|
652
|
|
|
|
|
|
|
=head1 LICENSE |
653
|
|
|
|
|
|
|
|
654
|
|
|
|
|
|
|
This program is free software; you can redistribute it and/or modify it under |
655
|
|
|
|
|
|
|
the same terms as Perl itself. |
656
|
|
|
|
|
|
|
|
657
|
|
|
|
|
|
|
=cut |