File Coverage

blib/lib/Thread/Queue.pm
Criterion Covered Total %
statement 84 155 54.1
branch 22 62 35.4
condition 19 39 48.7
subroutine 15 19 78.9
pod 11 11 100.0
total 151 286 52.8


line stmt bran cond sub pod time code
1             package Thread::Queue;
2              
3 3     3   51413 use strict;
  3         16  
  3         65  
4 3     3   9 use warnings;
  3         5  
  3         118  
5              
6             our $VERSION = '3.13';
7             $VERSION = eval $VERSION;
8              
9 3     3   1200 use threads::shared 1.21;
  3         2568  
  3         9  
10 3     3   177 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
  3         48  
  3         1406  
11              
12             # Carp errors from threads::shared calls should complain about caller
13             our @CARP_NOT = ("threads::shared");
14              
15             # Create a new queue possibly pre-populated with items
16             sub new
17             {
18 2     2 1 755 my $class = shift;
19 2     2   6 my @queue :shared = map { shared_clone($_) } @_;
  12         86  
  2         780  
  2         1818  
  2         2323  
20 2         74 my %self :shared = ( 'queue' => \@queue );
21 2         35 return bless(\%self, $class);
22             }
23              
24             # Add items to the tail of a queue
25             sub enqueue
26             {
27 2     2 1 5 my $self = shift;
28 2         3 lock(%$self);
29              
30 2 50       5 if ($$self{'ENDED'}) {
31 0         0 require Carp;
32 0         0 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33             }
34              
35             # Block if queue size exceeds any specified limit
36 2         3 my $queue = $$self{'queue'};
37 2   33     7 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38              
39             # Add items to queue, and then signal other threads
40 2 50       10 push(@$queue, map { shared_clone($_) } @_)
  5         77  
41             and cond_signal(%$self);
42             }
43              
44             # Set or return the max. size for a queue
45             sub limit : lvalue
46             {
47 0     0 1 0 my $self = shift;
48 0         0 lock(%$self);
49 0         0 $$self{'LIMIT'};
50             }
51              
52             # Return a count of the number of items on a queue
53             sub pending
54             {
55 6     6 1 739 my $self = shift;
56 6         10 lock(%$self);
57 6 50 33     14 return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
  0         0  
58 6         7 return scalar(@{$$self{'queue'}});
  6         18  
59             }
60              
61             # Indicate that no more data will enter the queue
62             sub end
63             {
64 0     0 1 0 my $self = shift;
65 0         0 lock(%$self);
66             # No more data is coming
67 0         0 $$self{'ENDED'} = 1;
68              
69 0         0 cond_signal(%$self); # Unblock possibly waiting threads
70             }
71              
72             # Return 1 or more items from the head of a queue, blocking if needed
73             sub dequeue
74             {
75 12     12 1 9163 my $self = shift;
76 12         16 lock(%$self);
77 12         19 my $queue = $$self{'queue'};
78              
79 12 100       26 my $count = @_ ? $self->_validate_count(shift) : 1;
80              
81             # Wait for requisite number of items
82 7   33     15 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83              
84             # If no longer blocking, try getting whatever is left on the queue
85 7 50       10 return $self->dequeue_nb($count) if ($$self{'ENDED'});
86              
87             # Return single item
88 7 50       12 if ($count == 1) {
89 7         7 my $item = shift(@$queue);
90 7         129 cond_signal(%$self); # Unblock possibly waiting threads
91 7         28 return $item;
92             }
93              
94             # Return multiple items
95 0         0 my @items;
96 0         0 push(@items, shift(@$queue)) for (1..$count);
97 0         0 cond_signal(%$self); # Unblock possibly waiting threads
98 0         0 return @items;
99             }
100              
101             # Return items from the head of a queue with no blocking
102             sub dequeue_nb
103             {
104 6     6 1 2181 my $self = shift;
105 6         11 lock(%$self);
106 6         9 my $queue = $$self{'queue'};
107              
108 6 100       15 my $count = @_ ? $self->_validate_count(shift) : 1;
109              
110             # Return single item
111 1 50       4 if ($count == 1) {
112 1         2 my $item = shift(@$queue);
113 1         19 cond_signal(%$self); # Unblock possibly waiting threads
114 1         4 return $item;
115             }
116              
117             # Return multiple items
118 0         0 my @items;
119 0         0 for (1..$count) {
120 0 0       0 last if (! @$queue);
121 0         0 push(@items, shift(@$queue));
122             }
123 0         0 cond_signal(%$self); # Unblock possibly waiting threads
124 0         0 return @items;
125             }
126              
127             # Return items from the head of a queue, blocking if needed up to a timeout
128             sub dequeue_timed
129             {
130 0     0 1 0 my $self = shift;
131 0         0 lock(%$self);
132 0         0 my $queue = $$self{'queue'};
133              
134             # Timeout may be relative or absolute
135 0 0       0 my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
136             # Convert to an absolute time for use with cond_timedwait()
137 0 0       0 if ($timeout < 32000000) { # More than one year
138 0         0 $timeout += time();
139             }
140              
141 0 0       0 my $count = @_ ? $self->_validate_count(shift) : 1;
142              
143             # Wait for requisite number of items, or until timeout
144 0   0     0 while ((@$queue < $count) && ! $$self{'ENDED'}) {
145 0 0       0 last if (! cond_timedwait(%$self, $timeout));
146             }
147              
148             # Get whatever we need off the queue if available
149 0         0 return $self->dequeue_nb($count);
150             }
151              
152             # Return an item without removing it from a queue
153             sub peek
154             {
155 4     4 1 1310 my $self = shift;
156 4         6 lock(%$self);
157 4 50       14 my $index = @_ ? $self->_validate_index(shift) : 0;
158 1         3 return $$self{'queue'}[$index];
159             }
160              
161             # Insert items anywhere into a queue
162             sub insert
163             {
164 4     4 1 1697 my $self = shift;
165 4         7 lock(%$self);
166              
167 4 50       7 if ($$self{'ENDED'}) {
168 0         0 require Carp;
169 0         0 Carp::croak("'insert' method called on queue that has been 'end'ed");
170             }
171              
172 4         5 my $queue = $$self{'queue'};
173              
174 4         7 my $index = $self->_validate_index(shift);
175              
176 0 0       0 return if (! @_); # Nothing to insert
177              
178             # Support negative indices
179 0 0       0 if ($index < 0) {
180 0         0 $index += @$queue;
181 0 0       0 if ($index < 0) {
182 0         0 $index = 0;
183             }
184             }
185              
186             # Dequeue items from $index onward
187 0         0 my @tmp;
188 0         0 while (@$queue > $index) {
189 0         0 unshift(@tmp, pop(@$queue))
190             }
191              
192             # Add new items to the queue
193 0         0 push(@$queue, map { shared_clone($_) } @_);
  0         0  
194              
195             # Add previous items back onto the queue
196 0         0 push(@$queue, @tmp);
197              
198 0         0 cond_signal(%$self); # Unblock possibly waiting threads
199             }
200              
201             # Remove items from anywhere in a queue
202             sub extract
203             {
204 8     8 1 3484 my $self = shift;
205 8         12 lock(%$self);
206 8         12 my $queue = $$self{'queue'};
207              
208 8 50       17 my $index = @_ ? $self->_validate_index(shift) : 0;
209 5 50       10 my $count = @_ ? $self->_validate_count(shift) : 1;
210              
211             # Support negative indices
212 0 0       0 if ($index < 0) {
213 0         0 $index += @$queue;
214 0 0       0 if ($index < 0) {
215 0         0 $count += $index;
216 0 0       0 return if ($count <= 0); # Beyond the head of the queue
217 0         0 return $self->dequeue_nb($count); # Extract from the head
218             }
219             }
220              
221             # Dequeue items from $index+$count onward
222 0         0 my @tmp;
223 0         0 while (@$queue > ($index+$count)) {
224 0         0 unshift(@tmp, pop(@$queue))
225             }
226              
227             # Extract desired items
228 0         0 my @items;
229 0         0 unshift(@items, pop(@$queue)) while (@$queue > $index);
230              
231             # Add back any removed items
232 0         0 push(@$queue, @tmp);
233              
234 0         0 cond_signal(%$self); # Unblock possibly waiting threads
235              
236             # Return single item
237 0 0       0 return $items[0] if ($count == 1);
238              
239             # Return multiple items
240 0         0 return @items;
241             }
242              
243             ### Internal Methods ###
244              
245             # Check value of the requested index
246             sub _validate_index
247             {
248 16     16   15 my $self = shift;
249 16         20 my $index = shift;
250              
251 16 100 100     80 if (! defined($index) ||
      100        
252             ! looks_like_number($index) ||
253             (int($index) != $index))
254             {
255 10         39 require Carp;
256 10         41 my ($method) = (caller(1))[3];
257 10         17 my $class_name = ref($self);
258 10         41 $method =~ s/$class_name\:://;
259 10 100       20 $index = 'undef' if (! defined($index));
260 10         635 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
261             }
262              
263 6         11 return $index;
264             };
265              
266             # Check value of the requested count
267             sub _validate_count
268             {
269 15     15   16 my $self = shift;
270 15         15 my $count = shift;
271              
272 15 50 100     76 if (! defined($count) ||
      100        
      66        
      0        
      33        
273             ! looks_like_number($count) ||
274             (int($count) != $count) ||
275             ($count < 1) ||
276             ($$self{'LIMIT'} && $count > $$self{'LIMIT'}))
277             {
278 15         61 require Carp;
279 15         62 my ($method) = (caller(1))[3];
280 15         26 my $class_name = ref($self);
281 15         61 $method =~ s/$class_name\:://;
282 15 100       24 $count = 'undef' if (! defined($count));
283 15 50 33     60 if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) {
284 0         0 Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})");
285             } else {
286 15         1067 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
287             }
288             }
289              
290 0         0 return $count;
291             };
292              
293             # Check value of the requested timeout
294             sub _validate_timeout
295             {
296 0     0   0 my $self = shift;
297 0         0 my $timeout = shift;
298              
299 0 0 0     0 if (! defined($timeout) ||
300             ! looks_like_number($timeout))
301             {
302 0         0 require Carp;
303 0         0 my ($method) = (caller(1))[3];
304 0         0 my $class_name = ref($self);
305 0         0 $method =~ s/$class_name\:://;
306 0 0       0 $timeout = 'undef' if (! defined($timeout));
307 0         0 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
308             }
309              
310 0         0 return $timeout;
311             };
312              
313             1;
314              
315             =head1 NAME
316              
317             Thread::Queue - Thread-safe queues
318              
319             =head1 VERSION
320              
321             This document describes Thread::Queue version 3.13
322              
323             =head1 SYNOPSIS
324              
325             use strict;
326             use warnings;
327              
328             use threads;
329             use Thread::Queue;
330              
331             my $q = Thread::Queue->new(); # A new empty queue
332              
333             # Worker thread
334             my $thr = threads->create(
335             sub {
336             # Thread will loop until no more work
337             while (defined(my $item = $q->dequeue())) {
338             # Do work on $item
339             ...
340             }
341             }
342             );
343              
344             # Send work to the thread
345             $q->enqueue($item1, ...);
346             # Signal that there is no more work to be sent
347             $q->end();
348             # Join up with the thread when it finishes
349             $thr->join();
350              
351             ...
352              
353             # Count of items in the queue
354             my $left = $q->pending();
355              
356             # Non-blocking dequeue
357             if (defined(my $item = $q->dequeue_nb())) {
358             # Work on $item
359             }
360              
361             # Blocking dequeue with 5-second timeout
362             if (defined(my $item = $q->dequeue_timed(5))) {
363             # Work on $item
364             }
365              
366             # Set a size for a queue
367             $q->limit = 5;
368              
369             # Get the second item in the queue without dequeuing anything
370             my $item = $q->peek(1);
371              
372             # Insert two items into the queue just behind the head
373             $q->insert(1, $item1, $item2);
374              
375             # Extract the last two items on the queue
376             my ($item1, $item2) = $q->extract(-2, 2);
377              
378             =head1 DESCRIPTION
379              
380             This module provides thread-safe FIFO queues that can be accessed safely by
381             any number of threads.
382              
383             Any data types supported by L can be passed via queues:
384              
385             =over
386              
387             =item Ordinary scalars
388              
389             =item Array refs
390              
391             =item Hash refs
392              
393             =item Scalar refs
394              
395             =item Objects based on the above
396              
397             =back
398              
399             Ordinary scalars are added to queues as they are.
400              
401             If not already thread-shared, the other complex data types will be cloned
402             (recursively, if needed, and including any Cings and read-only
403             settings) into thread-shared structures before being placed onto a queue.
404              
405             For example, the following would cause L to create a empty,
406             shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
407             and 'baz' from C<@ary> into it, and then place that shared reference onto
408             the queue:
409              
410             my @ary = qw/foo bar baz/;
411             $q->enqueue(\@ary);
412              
413             However, for the following, the items are already shared, so their references
414             are added directly to the queue, and no cloning takes place:
415              
416             my @ary :shared = qw/foo bar baz/;
417             $q->enqueue(\@ary);
418              
419             my $obj = &shared({});
420             $$obj{'foo'} = 'bar';
421             $$obj{'qux'} = 99;
422             bless($obj, 'My::Class');
423             $q->enqueue($obj);
424              
425             See L for caveats related to passing objects via queues.
426              
427             =head1 QUEUE CREATION
428              
429             =over
430              
431             =item ->new()
432              
433             Creates a new empty queue.
434              
435             =item ->new(LIST)
436              
437             Creates a new queue pre-populated with the provided list of items.
438              
439             =back
440              
441             =head1 BASIC METHODS
442              
443             The following methods deal with queues on a FIFO basis.
444              
445             =over
446              
447             =item ->enqueue(LIST)
448              
449             Adds a list of items onto the end of the queue.
450              
451             =item ->dequeue()
452              
453             =item ->dequeue(COUNT)
454              
455             Removes the requested number of items (default is 1) from the head of the
456             queue, and returns them. If the queue contains fewer than the requested
457             number of items, then the thread will be blocked until the requisite number
458             of items are available (i.e., until other threads C more items).
459              
460             =item ->dequeue_nb()
461              
462             =item ->dequeue_nb(COUNT)
463              
464             Removes the requested number of items (default is 1) from the head of the
465             queue, and returns them. If the queue contains fewer than the requested
466             number of items, then it immediately (i.e., non-blocking) returns whatever
467             items there are on the queue. If the queue is empty, then C is
468             returned.
469              
470             =item ->dequeue_timed(TIMEOUT)
471              
472             =item ->dequeue_timed(TIMEOUT, COUNT)
473              
474             Removes the requested number of items (default is 1) from the head of the
475             queue, and returns them. If the queue contains fewer than the requested
476             number of items, then the thread will be blocked until the requisite number of
477             items are available, or until the timeout is reached. If the timeout is
478             reached, it returns whatever items there are on the queue, or C if the
479             queue is empty.
480              
481             The timeout may be a number of seconds relative to the current time (e.g., 5
482             seconds from when the call is made), or may be an absolute timeout in I
483             seconds the same as would be used with
484             L.
485             Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
486             the underlying implementation).
487              
488             If C is missing, C, or less than or equal to 0, then this call
489             behaves the same as C.
490              
491             =item ->pending()
492              
493             Returns the number of items still in the queue. Returns C if the queue
494             has been ended (see below), and there are no more items in the queue.
495              
496             =item ->limit
497              
498             Sets the size of the queue. If set, calls to C will block until
499             the number of pending items in the queue drops below the C. The
500             C does not prevent enqueuing items beyond that count:
501              
502             my $q = Thread::Queue->new(1, 2);
503             $q->limit = 4;
504             $q->enqueue(3, 4, 5); # Does not block
505             $q->enqueue(6); # Blocks until at least 2 items are
506             # dequeued
507             my $size = $q->limit; # Returns the current limit (may return
508             # 'undef')
509             $q->limit = 0; # Queue size is now unlimited
510              
511             Calling any of the dequeue methods with C greater than a queue's
512             C will generate an error.
513              
514             =item ->end()
515              
516             Declares that no more items will be added to the queue.
517              
518             All threads blocking on C calls will be unblocked with any
519             remaining items in the queue and/or C being returned. Any subsequent
520             calls to C will behave like C.
521              
522             Once ended, no more items may be placed in the queue.
523              
524             =back
525              
526             =head1 ADVANCED METHODS
527              
528             The following methods can be used to manipulate items anywhere in a queue.
529              
530             To prevent the contents of a queue from being modified by another thread
531             while it is being examined and/or changed, L
532             VARIABLE"> the queue inside a local block:
533              
534             {
535             lock($q); # Keep other threads from changing the queue's contents
536             my $item = $q->peek();
537             if ($item ...) {
538             ...
539             }
540             }
541             # Queue is now unlocked
542              
543             =over
544              
545             =item ->peek()
546              
547             =item ->peek(INDEX)
548              
549             Returns an item from the queue without dequeuing anything. Defaults to the
550             the head of queue (at index position 0) if no index is specified. Negative
551             index values are supported as with L (i.e., -1
552             is the end of the queue, -2 is next to last, and so on).
553              
554             If no items exists at the specified index (i.e., the queue is empty, or the
555             index is beyond the number of items on the queue), then C is returned.
556              
557             Remember, the returned item is not removed from the queue, so manipulating a
558             Ced at reference affects the item on the queue.
559              
560             =item ->insert(INDEX, LIST)
561              
562             Adds the list of items to the queue at the specified index position (0
563             is the head of the list). Any existing items at and beyond that position are
564             pushed back past the newly added items:
565              
566             $q->enqueue(1, 2, 3, 4);
567             $q->insert(1, qw/foo bar/);
568             # Queue now contains: 1, foo, bar, 2, 3, 4
569              
570             Specifying an index position greater than the number of items in the queue
571             just adds the list to the end.
572              
573             Negative index positions are supported:
574              
575             $q->enqueue(1, 2, 3, 4);
576             $q->insert(-2, qw/foo bar/);
577             # Queue now contains: 1, 2, foo, bar, 3, 4
578              
579             Specifying a negative index position greater than the number of items in the
580             queue adds the list to the head of the queue.
581              
582             =item ->extract()
583              
584             =item ->extract(INDEX)
585              
586             =item ->extract(INDEX, COUNT)
587              
588             Removes and returns the specified number of items (defaults to 1) from the
589             specified index position in the queue (0 is the head of the queue). When
590             called with no arguments, C operates the same as C.
591              
592             This method is non-blocking, and will return only as many items as are
593             available to fulfill the request:
594              
595             $q->enqueue(1, 2, 3, 4);
596             my $item = $q->extract(2) # Returns 3
597             # Queue now contains: 1, 2, 4
598             my @items = $q->extract(1, 3) # Returns (2, 4)
599             # Queue now contains: 1
600              
601             Specifying an index position greater than the number of items in the
602             queue results in C or an empty list being returned.
603              
604             $q->enqueue('foo');
605             my $nada = $q->extract(3) # Returns undef
606             my @nada = $q->extract(1, 3) # Returns ()
607              
608             Negative index positions are supported. Specifying a negative index position
609             greater than the number of items in the queue may return items from the head
610             of the queue (similar to C) if the count overlaps the head of the
611             queue from the specified position (i.e. if queue size + index + count is
612             greater than zero):
613              
614             $q->enqueue(qw/foo bar baz/);
615             my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
616             my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
617             # Queue now contains: bar, baz
618             my @rest = $q->extract(-3, 4); # Returns (bar, baz) -
619             # (2+(-3)+4) > 0
620              
621             =back
622              
623             =head1 NOTES
624              
625             Queues created by L can be used in both threaded and
626             non-threaded applications.
627              
628             =head1 LIMITATIONS
629              
630             Passing objects on queues may not work if the objects' classes do not support
631             sharing. See L for more.
632              
633             Passing array/hash refs that contain objects may not work for Perl prior to
634             5.10.0.
635              
636             =head1 SEE ALSO
637              
638             Thread::Queue on MetaCPAN:
639             L
640              
641             Code repository for CPAN distribution:
642             L
643              
644             L, L
645              
646             Sample code in the I directory of this distribution on CPAN.
647              
648             =head1 MAINTAINER
649              
650             Jerry D. Hedden, Sjdhedden AT cpan DOT orgE>
651              
652             =head1 LICENSE
653              
654             This program is free software; you can redistribute it and/or modify it under
655             the same terms as Perl itself.
656              
657             =cut