File Coverage

blib/lib/Thread/Queue/MaxSize.pm
Criterion Covered Total %
statement 91 111 81.9
branch 31 46 67.3
condition 24 41 58.5
subroutine 10 10 100.0
pod 3 3 100.0
total 159 211 75.3


line stmt bran cond sub pod time code
1             package Thread::Queue::MaxSize;
2              
3 8     8   184073 use strict;
  8         19  
  8         440  
4 8     8   52 use warnings;
  8         14  
  8         575  
5              
6             our $VERSION = '1.03';
7             $VERSION = eval $VERSION;
8              
9 8     8   5339 use parent qw(Thread::Queue);
  8         3276  
  8         48  
10              
11 8     8   78566 use threads::shared 1.21;
  8         267  
  8         72  
12 8     8   831 use Scalar::Util 1.10 qw(looks_like_number);
  8         188  
  8         14041  
13              
14             sub new {
15 17     17 1 14781 my ($class, $config) = @_;
16 17         153 my $self = $class->SUPER::new();
17              
18 17 50 33     1449 if ($config && (!ref($config) || ref($config) ne "HASH")) {
      33        
19 0         0 require Carp;
20 0         0 Carp::croak("invalid first argument to constructor -- must be a hashref with any configuration options");
21             }
22              
23             # make sure that maxsize is actually a number
24 17 50       85 my $maxsize = ($config) ? $config->{'maxsize'} : undef;
25 17         77 $self->{'MAXSIZE'} = $self->_validate_maxsize($maxsize);
26              
27             # determine what type of action we'll take on exceeding our max size
28             # 1. raise an exception (die)
29             # 2. warn and reject entire addition/insertion
30             # 3. silently reject entire addition/insertion
31             # 4. warn, process addition/insertion and then truncate to max size
32             # 5. silently process addition/insertion and then truncate to max size
33 17 50       94 my $on_maxsize = ($config) ? $config->{'on_maxsize'} : undef;
34 17   100     98 $self->{'ON_MAXSIZE'} = $self->_validate_on_maxsize($on_maxsize || 'silent_truncate');
35              
36 17         66 return $self;
37             }
38              
39             # add items to the tail of a queue
40             sub enqueue {
41 84     84 1 28447 my $self = shift;
42 84         131 lock(%$self);
43              
44 84 50       2509 if ($self->{'ENDED'}) {
45 0         0 require Carp;
46 0         0 Carp::croak("'enqueue' method called on queue that has been 'end'ed");
47             }
48              
49 84         119 my $queue = $self->{'queue'};
50              
51             # queue can't be too big so shift the oldest things off if necessary
52 84 100 66     3303 if (defined($self->{'MAXSIZE'}) && $self->{'MAXSIZE'} > 0) {
53 81 100 100     79 if ((scalar(@{$queue}) + scalar(@_)) > $self->{'MAXSIZE'} &&
  81         11488  
54             $self->{'ON_MAXSIZE'} =~ /^(die|warn_and_reject|silent_reject|warn_and_truncate)$/ix) {
55 35 100       173 if ($1 =~ /^warn_and_truncate$/ix) {
    100          
    100          
    50          
56 11         81 warn "queue exceeding its maximum size: truncating\n";
57             } elsif ($1 =~ /^silent_reject$/ix) {
58 11         16 return;
59             } elsif ($1 =~ /^warn_and_reject$/ix) {
60 11         100 warn "not enqueuing new items: queue would exceed its maximum size\n";
61 11         40 return;
62             } elsif ($1 =~ /^die$/ix) {
63 2         22 die "not enqueuing new items: queue would exceed its maximum size\n";
64             }
65             }
66              
67             # remove things already on the queue
68 57   100     89 while (scalar(@{$queue}) && (scalar(@{$queue}) + scalar(@_)) > $self->{'MAXSIZE'}) {
  79         215  
  64         239  
69 22         25 shift(@{$queue});
  22         71  
70             }
71              
72             # if we've already removed everything off of the queue and we're still
73             # over maxsize then take things off of the list of new items
74 57   33     324 while (scalar(@_) && (scalar(@_)) > $self->{'MAXSIZE'}) {
75 0         0 shift(@_);
76             }
77             }
78              
79 60 50       73 push(@{$queue}, map { shared_clone($_) } @_) and cond_signal(%$self);
  60         126  
  179         1205  
80             }
81              
82             # insert items anywhere into a queue
83             sub insert {
84 9     9 1 11687 my $self = shift;
85 9         24 lock(%$self);
86              
87 9 50       47 if ($self->{'ENDED'}) {
88 0         0 require Carp;
89 0         0 Carp::croak("'insert' method called on queue that has been 'end'ed");
90             }
91              
92 9         21 my $queue = $self->{'queue'};
93              
94 9         82 my $index = $self->_validate_index(shift);
95              
96             # make sure we have something to insert
97 5 50       81 return unless @_;
98              
99             # support negative indices
100 5 50       27 if ($index < 0) {
101 0         0 $index += @{$queue};
  0         0  
102 0 0       0 $index = 0 if ($index < 0);
103             }
104              
105             # dequeue items from $index onward
106 5         41 my @tmp = ();
107 5         34 while (@{$queue} > $index) {
  30         137  
108 25         27 unshift(@tmp, pop(@{$queue}))
  25         52  
109             }
110              
111             # queue can't be too big so shift the oldest things off if necessary
112 5 50 33     48 if (defined($self->{'MAXSIZE'}) && $self->{'MAXSIZE'} > 0) {
113 5 100 66     10 if ((scalar(@{$queue}) + scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'} &&
  5         83  
114             $self->{'ON_MAXSIZE'} =~ /^(die|warn_and_reject|silent_reject|warn_and_truncate)$/ix) {
115 4 100       46 if ($1 =~ /^warn_and_truncate$/ix) {
    100          
    100          
    50          
116 1         12 warn "queue exceeding its maximum size: truncating\n";
117             } elsif ($1 =~ /^silent_reject$/ix) {
118             # reset queue before dying
119 1         1 push(@{$queue}, @tmp);
  1         3  
120 1         5 return;
121             } elsif ($1 =~ /^warn_and_reject$/ix) {
122             # reset queue before dying
123 1         2 push(@{$queue}, @tmp);
  1         3  
124 1         10 warn "not inserting new items: queue would exceed its maximum size\n";
125 1         5 return;
126             } elsif ($1 =~ /^die$/ix) {
127             # reset queue before dying
128 1         2 push(@{$queue}, @tmp);
  1         3  
129 1         11 die "not inserting new items: queue would exceed its maximum size\n";
130             }
131             }
132              
133             # remove things already on the queue
134 2   66     6 while (scalar(@{$queue}) && (scalar(@{$queue}) + scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'}) {
  8         25  
  6         25  
135 6         8 shift(@{$queue});
  6         9  
136             }
137              
138             # if we've already removed everything off of the queue and we're still
139             # over maxsize then take things off of the list of new items
140 2   66     22 while (scalar(@_) && (scalar(@_) + scalar(@tmp)) > $self->{'MAXSIZE'}) {
141 2         12 shift(@_);
142             }
143             }
144              
145             # add new items to the queue
146 2         3 push(@{$queue}, map { shared_clone($_) } @_);
  2         7  
  10         73  
147              
148             # add previous items back onto the queue
149 2         18 push(@{$queue}, @tmp);
  2         36  
150              
151             # soup's up
152 2         84 cond_signal(%$self);
153             }
154              
155             sub _validate_maxsize {
156 17     17   36 my ($self, $maxsize) = @_;
157              
158 17 50 33     254 if (defined($maxsize) && (!looks_like_number($maxsize) || (int($maxsize) != $maxsize) || ($maxsize < 1))) {
      66        
159 0         0 require Carp;
160 0         0 my ($method) = (caller(1))[3];
161 0         0 my $class_name = ref($self);
162 0         0 $method =~ s/${class_name}:://;
163 0         0 Carp::croak("Invalid 'maxsize' argument ($maxsize) to '$method' method");
164             }
165              
166 17         96 return $maxsize;
167             }
168              
169             sub _validate_on_maxsize {
170 17     17   38 my ($self, $on_maxsize) = @_;
171              
172 17 50 33     331 if (defined($on_maxsize) && ($on_maxsize !~ /^(?:die|warn_and_reject|silent_reject|warn_and_truncate|silent_truncate)$/ix)) {
173 0         0 require Carp;
174 0         0 my ($method) = (caller(1))[3];
175 0         0 my $class_name = ref($self);
176 0         0 $method =~ s/${class_name}:://;
177 0         0 Carp::croak("Invalid 'on_maxsize' argument ($on_maxsize) to '$method' method");
178             }
179              
180 17         65 return $on_maxsize;
181             }
182              
183             1;
184              
185             =head1 NAME
186              
187             Thread::Queue::MaxSize - Thread-safe queues with an upper bound
188              
189             =head1 VERSION
190              
191             This document describes Thread::Queue::MaxSize version 1.02
192              
193             =head1 SYNOPSIS
194              
195             use strict;
196             use warnings;
197              
198             use threads;
199             use Thread::Queue::MaxSize;
200              
201             # create a new empty queue with no max limit
202             my $q = Thread::Queue::MaxSize->new();
203              
204             # create a new empty queue that will only ever store 1000 items
205             my $q = Thread::Queue::MaxSize->new({ maxsize => 1000 });
206              
207             # create a queue that will die when too many items are enqueued
208             my $q = Thread::Queue::MaxSize->new({ maxsize => 1000, on_maxsize => 'die' });
209              
210             =head1 DESCRIPTION
211              
212             This is a subclass to L that will enforce an upper bound on the
213             number of items in a queue. This can be used to prevent memory use from
214             exploding on a queue that might never empty.
215              
216             =head1 QUEUE CREATION
217              
218             =over
219              
220             =item ->new()
221              
222             Creates a new empty queue. This queue will have no items to start.
223              
224             =item ->new(OPTIONS)
225              
226             Creates a new empty queue with some options. The two configurable options are:
227              
228             =over
229              
230             =item maxsize
231              
232             Defines the maximum size that the queue can ever be.
233              
234             =item on_maxsize
235              
236             Defines the action that will be taken when a queue reaches its maximum size.
237             There are five actions that can be taken when the list of items to enqueue or
238             insert would cause the queue to go over its maximum size. In all cases either
239             the all items are enqueued/inserted or none of the items are enqueued/inserted.
240              
241             =over
242              
243             =item die
244              
245             No items will be enqueued/inserted and the queue will throw an exception.
246              
247             =item warn_and_reject
248              
249             No items will be enqueued/inserted and the queue will issue a warning.
250              
251             =item silent_reject
252              
253             No items will be enqueued/inserted and no indication will be given as to why.
254              
255             =back warn_and_truncate
256              
257             All items will be enqueued/inserted, the oldest items on the list will be
258             truncated off the end, and the queue will issue a warning.
259              
260             =back silent_truncate
261              
262             All items will be enqueued/insertd, the oldest items on the list will be
263             truncated off the end, and no indication will be given as to why. This is the
264             default action.
265              
266             =back
267              
268             =head1 SEE ALSO
269              
270             L, L, L
271              
272             =head1 MAINTAINER
273              
274             Paul Lockaby Splockaby AT cpan DOT orgE>
275              
276             =head1 CREDIT
277              
278             Significant portions of this module are directly from L which is
279             maintained by Jerry D. Hedden, .
280              
281             =head1 LICENSE
282              
283             This program is free software; you can redistribute it and/or modify it under
284             the same terms as Perl itself.
285              
286             =cut