File Coverage

blib/lib/Forks/Queue.pm
Criterion Covered Total %
statement 100 153 65.3
branch 39 64 60.9
condition 19 19 100.0
subroutine 27 46 58.7
pod 23 26 88.4
total 208 308 67.5


line stmt bran cond sub pod time code
1             package Forks::Queue;
2 122     122   2985262 use strict;
  122         708  
  122         3734  
3 122     122   1250 use warnings;
  122         281  
  122         3910  
4 122     122   662 use Scalar::Util 'looks_like_number';
  122         291  
  122         6331  
5 122     122   744 use Carp;
  122         359  
  122         6996  
6 122     122   25800 use Time::HiRes;
  122         64352  
  122         593  
7 122     122   11249 use Config;
  122         557  
  122         20050  
8              
9             our $VERSION = '0.15';
10             our $DEBUG = $ENV{FORKS_QUEUE_DEBUG} || 0;
11              
12             our $NOTIFY_OK = $ENV{FORKS_QUEUE_NOTIFY} // do {
13             $Config::Config{sig_name} =~ /\bIO\b/;
14             };
15              
16             our $SLEEP_INTERVAL = $NOTIFY_OK ? 2 : 1;
17             our $SLEEP_INTERVALX = 2;
18              
19             # default values to apply to all new Forks::Queue implementations
20             our %OPTS = (limit => -1, on_limit => 'fail', style => 'fifo',
21             impl => $ENV{FORKS_QUEUE_IMPL} || 'File' );
22              
23             sub new {
24 171     171 1 89430684 my $class = shift;
25 122     122   845 no warnings 'once';
  122         768  
  122         201701  
26 171         6084 my %opts = (%OPTS, @_);
27              
28 171 100       1971 if ($opts{impl}) {
29 170         1059 my $pkg = delete $opts{impl};
30 170 50       2483 if ($pkg =~ /[^\w:]/) {
31 0         0 croak "Forks::Queue cannot be instantiated. Invalid 'impl' $pkg";
32             }
33 170         26965 my $e1 = eval "require Forks::Queue::$pkg; 1";
34 170 50       1475 if ($e1) {
35 170         855 $pkg = "Forks::Queue::" . $pkg;
36 170         3020 return $pkg->new(%opts);
37             }
38 0         0 my $err1 = $@;
39 0 0       0 if (eval "require $pkg; 1") {
40 0         0 return $pkg->new(%opts);
41             } else {
42 0 0       0 warn $err1 if $err1;
43 0         0 croak "Forks::Queue cannot be instantiated. ",
44             "Did not recognize 'impl' option '$opts{impl}'";
45             }
46             }
47 1         190 croak "Forks::Queue cannot be instantiated. ",
48             "Use an implementation or pass the 'impl' option to the constructor";
49             }
50              
51             sub import {
52 246     246   3562 my ($class, @args) = @_;
53 246         6036 for (my $i=0; $i<@args; $i++) {
54 0         0 foreach my $key (qw(limit on_limit impl style)) {
55 0 0       0 if ($args[$i] eq $key) {
56 0         0 $OPTS{$args[$i]} = $args[$i+1];
57 0         0 $i++;
58 0         0 next;
59             }
60             }
61             }
62             }
63              
64             sub put {
65 524     524 1 36165055 my $self = shift;
66 524         6183 return $self->push(@_);
67             }
68              
69             sub enqueue {
70 0     0 1 0 my $self = shift;
71 0         0 return $self->push(@_);
72             }
73              
74             sub get {
75 161     161 1 30065017 my $self = shift;
76 161 100       1254 _validate_input($_[0], 'count', 1) if @_;
77 161 100       971 if ($self->{style} eq 'fifo') {
78 133 100       2048 return @_ ? $self->shift(@_) : $self->shift;
79             } else {
80 28 100       203 my @gotten = @_ ? reverse($self->pop(@_)) : $self->pop;
81 28 100       193 return @_ ? @gotten : $gotten[0];
82             }
83             }
84              
85 0     0 1 0 sub dequeue { goto &get; }
86              
87             sub dequeue_timed {
88 18     18 1 69 my $self = shift;
89 18         43 my $timeout = shift;
90 18         86 _validate_input($timeout, 'timeout', 0, 1);
91 18         78 local $self->{_expire} = Time::HiRes::time + $timeout;
92 18         52 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
93 18         105 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
94 18 50       160 return @_ ? $self->dequeue(@_) : $self->dequeue;
95             }
96              
97             sub get_timed {
98 12     12 1 42 my $self = shift;
99 12         28 my $timeout = shift;
100 12         95 _validate_input($timeout, 'timeout', 0, 1);
101 12         107 local $self->{_expire} = Time::HiRes::time + $timeout;
102 12         84 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
103 12         88 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
104 12 100       165 return @_ ? $self->get(@_) : $self->get;
105             }
106              
107             sub shift_timed {
108 0     0 1 0 my $self = shift;
109 0         0 my $timeout = shift;
110 0         0 _validate_input($timeout, 'timeout', 0, 1);
111 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
112 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
113 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
114 0 0       0 return @_ ? $self->shift(@_) : $self->shift;
115             }
116              
117             sub pop_timed {
118 0     0 1 0 my $self = shift;
119 0         0 my $timeout = shift;
120 0         0 _validate_input($timeout, 'timeout', 0, 1);
121 0         0 local $self->{_expire} = Time::HiRes::time + $timeout;
122 0         0 local $SLEEP_INTERVAL = $Forks::Queue::SLEEP_INTERVAL;
123 0         0 $SLEEP_INTERVAL /= 2 while $SLEEP_INTERVAL > 0.25 * $timeout;
124 0 0       0 return @_ ? $self->pop(@_) : $self->pop;
125             }
126              
127             sub _expired {
128 35105     35105   74631 my ($self) = @_;
129 35105 100       246478 $self->{_expire} && Time::HiRes::time >= $self->{_expire};
130             }
131              
132             sub get_nb {
133 18     18 1 75 my $self = shift;
134 18 100       112 _validate_input($_[0], 'count', 1) if @_;
135 3 50       47 if ($self->{style} eq 'fifo') {
136 0 0       0 return @_ ? $self->shift_nb(@_) : $self->shift_nb;
137             } else {
138 3 50       147 return @_ ? @ := reverse($self->pop_nb(@_)) : $self->pop_nb;
139             }
140             }
141 15     15 1 9617 sub dequeue_nb { goto &get_nb; }
142              
143             sub peek {
144 129     129 1 14819 my ($self,$index) = @_;
145 129 100       507 _validate_input($index, 'index') if @_ > 1;
146 120 100       330 if ($self->{style} eq 'lifo') {
147 48   100     236 return $self->peek_back($index || 0);
148             } else {
149 72   100     350 return $self->peek_front($index || 0);
150             }
151             }
152              
153             sub pending {
154 269     269 1 139522 my $self = shift;
155 269         1880 my $s = $self->status;
156 269 50       3207 return $s->{avail} ? $s->{avail} : $s->{end} ? undef : 0;
    100          
157             }
158              
159             sub _unimpl {
160 0     0   0 my $func = shift;
161 0         0 croak "Forks::Queue: $func not implemented in abstract base class";
162             }
163              
164             sub limit :lvalue {
165 0     0 1 0 my $self = shift;
166 0 0       0 if (@_) {
167 0         0 $self->{limit} = shift @_;
168 0 0       0 if (@_) {
169 0         0 $self->{on_limit} = shift @_;
170             }
171             }
172 0         0 $self->{limit};
173             }
174              
175             sub _validate_input {
176 430     430   1598 my ($value,$name,$ge,$float_ok) = @_;
177 430 100 100     12621 croak "Invalid '$name'"
      100        
      100        
      100        
      100        
178             if !defined($value) ||
179             !looks_like_number($value) ||
180             (!$float_ok && $value != int($value)) ||
181             (defined($ge) && $value < $ge);
182 355         776 return $value;
183             }
184              
185 0     0 1 0 sub push { _unimpl("push/put") }
186 0     0 0 0 sub peek_front { _unimpl("peek") }
187 0     0 0 0 sub peek_back { _unimpl("peek") }
188 0     0 1 0 sub shift :method { _unimpl("shift/get") }
189 0     0 1 0 sub unshift { _unimpl("unshift") }
190 0     0 1 0 sub pop { _unimpl("pop/get") }
191 0     0 1 0 sub shift_nb { _unimpl("shift/get") }
192 0     0 1 0 sub pop_nb { _unimpl("pop/get") }
193 0     0 1 0 sub status { _unimpl("pending/status") }
194 0     0 1 0 sub clear { _unimpl("clear") }
195 0     0 1 0 sub end { _unimpl("end") }
196 0     0 0 0 sub lock :method { _unimpl("lock") }
197              
198              
199              
200              
201              
202             sub Forks::Queue::Util::__is_nfs {
203 198     198   34395 my ($dir) = @_;
204 198 50       1583 if ($^O ne 'linux') {
205 0         0 return;
206             }
207 198         202958 my $pid = fork();
208 198 100       28818 if ($pid == 0) {
209 48         4339 close STDOUT;
210 48         1218 close STDERR;
211             # http://superuser.com/q/422061
212 48         0 exec("df",$dir,"-t","nfs");
213 0         0 die;
214             }
215 150         8748 local $?;
216 150         147081248 waitpid $pid,0;
217 150         16782 return $? == 0;
218             }
219              
220             sub Forks::Queue::Util::QID {
221 122     122   1135 no warnings 'uninitialized';
  122         297  
  122         16097  
222 136 100   136   1161 if ($Forks::Queue::Util::PID != $$) {
223 86         406 $Forks::Queue::Util::PID = $$;
224 86         324 $Forks::Queue::Util::QID = 0;
225             }
226 136         526 $Forks::Queue::Util::QID++;
227 136         2254 join("-", $Forks::Queue::Util::PID, $Forks::Queue::Util::QID);
228             }
229              
230              
231             # manage global destruction phase
232             BEGIN {
233 122 50   122   961 if (defined(${^GLOBAL_PHASE})) {
234 122 50   14429   11598 eval 'sub __inGD(){%{^GLOBAL_PHASE} eq q{DESTRUCT} && __END()};1'
  14429         77351  
235             } else {
236 0         0 require B;
237 0         0 eval 'sub __inGD(){${B::main_cv()}==0 && __END()};1'
238             }
239             }
240 74     74   1082989 END { &__END }
241             sub __END {
242 122     122   909 no warnings 'redefine';
  122         372  
  122         25293  
243 74     0   2708 *DB::DB = sub {};
        74      
244 74         8469 *__inGD = sub () { 1 };
245             }
246              
247              
248              
249             1;
250              
251             =head1 NAME
252              
253             Forks::Queue - queue that can be shared safely across processes
254              
255             =head1 VERSION
256              
257             0.15
258              
259             =head1 SYNOPSIS
260              
261             use Forks::Queue;
262             $q = Forks::Queue->new( impl => ..., style => 'lifo' );
263              
264             # put items on queue
265             $q->put("a scalar item");
266             $q->put(["an","arrayref","item"]);
267             $q->put({"a"=>"hash","reference"=>"item"});
268             $q->put("list","of","multiple",["items"]);
269             $q->end; # no more jobs will be added to queue
270              
271             # retrieve items from queue, possibly after a fork
272             $item = $q->get;
273             $item = $q->peek; # get item without removing it
274             @up_to_10_items = $q->get(10);
275             $remaining_items = $q->pending;
276              
277             =head1 DESCRIPTION
278              
279             Interface for a queue object that can be shared across processes
280             and threads.
281             Available implementations are L,
282             L,
283             L.
284              
285             =head1 METHODS
286              
287             Many of these methods pass or return "items". For this distribution,
288             an "item" is any scalar or reference that can be serialized and
289             shared across processes.
290              
291             This will include scalars and most unblessed references
292              
293             "42"
294             [1,2,3,"forty-two"]
295             { name=>"a job", timestamp=>time, input=>{foo=>[19],bar=>\%bardata} }
296              
297             but will generally preclude data with blessed references and code references
298              
299             { name => "bad job", callback => \&my_callback_routine }
300             [ 46, $url13, File::Temp->new ]
301              
302             Many of the methods of C have analogues in the
303             L class, and many scripts using L
304             can be easily transformed to use C.
305              
306             =head2 new
307              
308             $queue = Forks::Queue->new( %opts )
309              
310             Instantiates a new queue object with the given configuration.
311              
312             If one of the options is C, the constructor from that
313             C subclass will be invoked.
314              
315             Other options that should be supported on all implementations include
316              
317             =over 4
318              
319             =item * style
320              
321             =item * C<< style => 'fifo' | 'lifo' >>
322              
323             Indicates whether the L<"get"> method will return items in
324             first-in-first-out order or last-in-first-out order (and which
325             end of the queue the L<"peek"> method will examine)
326              
327             =item * limit
328              
329             =item * C<< limit => int >>
330              
331             A maximum size for the queue. Set to a non-positive value to
332             specify an unlimited size queue.
333              
334             =item * on_limit
335              
336             =item * C<< on_limit => 'block' | 'fail' | 'tq-compat' >>
337              
338             Dictates what the queue should do when an attempt is made to
339             add items beyond the queue's limit. If C, the queue
340             will block and wait until items are removed from the queue.
341             If C, the queue will warn and return immediately without
342             changing the queue.
343              
344             The setting C is similar to C, but has the
345             additional effect where the L<"insert"> method operates without
346             regard to the queue limit. This behavior is compatible with
347             the way queue limits and the insert method work in the
348             L package.
349              
350             See the L<"enqueue">, L<"put">, L<"enqueue">, L<"push">, L<"unshift">,
351             and L<"insert"> methods, which increase the length
352             of the queue and may be affected by this setting.
353              
354             =item * join
355              
356             =item * C<< join => bool >>
357              
358             If true, expects that the queue referred to by this constructor
359             has already been created in another process, and that the current
360             process should access the existing queue. This allows a queue to
361             be shared across unrelated processes (i.e., processes that do not
362             have a parent-child relationship).
363              
364             # my_daemon.pl - may run "all the time" in the background
365             $q = Forks::Queue::File->new(file=>'/var/spool/foo/q17');
366             # creates new queue object
367             ...
368              
369             # worker.pl - may run periodically for a short time, launched from
370             # cron or from command line, but not from the daemon
371             $q = Forks::Queue->new( impl => 'File', join => 1,
372             file => '/var/spool/foo/q17',
373             # the new queue attaches to existing file at /var/spool/foo/q17
374             ...
375              
376             C is not necessary for child processes forked from a process with
377             an existing queue
378              
379             $q = Forks::Queue->new(...)
380             ...
381             if (fork() == 0) {
382             # $q already exists and the child process can begin using it,
383             # no need for a Forks::Queue constructor with join
384             ...
385             }
386              
387             =item * persist
388              
389             =item * C<< persist => bool >>
390              
391             Active C objects affect your system, writing to disk or
392             writing to memory, and in general they clean themselves up when they
393             detect that no more processes are using the queue. The C option,
394             if set to true, instructs the queue object to leave its state intact
395             after destruction.
396              
397             An obvious use case for this option is debugging, to examine the
398             state of the queue after abnormal termination of your program.
399              
400             A second use case is to create persistent queues -- queues that are
401             shared not only among different processes, but among different
402             processes that are running at different times. The persistent queue
403             can be used by supplying both the C and the C options
404             to the C constructor.
405              
406             $queue_file = "/tmp/persistent.job.queue";
407             $join = -f $queue_file;
408             $q = Forks::Queue->new( impl => 'File', file => $queue_file,
409             join => $join, persist => 1 );
410             ... work with the queue ...
411             # the queue remains intact if this program exits or aborts
412              
413             =item * C<< list => ARRAYREF >>
414              
415             Initializes the contents of the queue with the argument to the
416             C option. The argument must be an array reference.
417              
418             If the C option is specified, the contents of the list
419             could be added to an already existing queue.
420              
421             =back
422              
423             See the global L<"%OPTS"> variable for information about
424             default values for many of these settings.
425              
426             =head2 put
427              
428             =head2 enqueue
429              
430             $count = $queue->put(@items);
431             $count = $queue->enqueue(@items)
432              
433             Place one or more "items" on the queue, and returns the number of
434             items successfully added to the queue.
435              
436             Adding items to the queue will fail if the L<"end"> method of
437             the queue had previously been called from any process.
438              
439             See the L<"limit"> method to see how the C method behaves
440             when adding items would cause the queue to exceed its maximum size.
441              
442             The C method name is provided for compatibility with
443             L.
444              
445              
446              
447             =head2 push
448              
449             $count = $queue->push(@items)
450              
451             Equivalent to L<"put">, adding items to the end of the queue and
452             returning the number of items successfully added. The most recent
453             items appended to the queue by C or C will be the first
454             items taken from the queue by L<"pop"> or by L<"get"> with LIFO
455             style queues, and the last items removed by L<"shift"> or L<"get">
456             with FIFO style queues.
457              
458             If the items added to the queue would cause the queue to exceed
459             its queue size limit (as determined by the L<"limit"> attribute),
460             this method will either block until queue capacity is available,
461             or issue a warning about the uninserted items and return the
462             number of items added, depending on the queue's setting for
463             L<"on_limit"|Forks::Queue/"new">.
464              
465              
466             =head2 unshift
467              
468             $count = $queue->unshift(@items)
469              
470             Equivalent to C, adding items to the front
471             of the queue, and returning the number of items successfully
472             added. In FIFO queues, items added to the queue with C
473             will be the last items taken from the queue by L<"get">,
474             and in LIFO queues, they will be the first items taken from the
475             queue by L<"get">.
476              
477             This method is inefficient for some queue implementations.
478              
479             =head2 end
480              
481             $queue->end
482              
483             Indicates that no more items are to be put on the queue,
484             so that when a process tries to retrieve an item from an empty queue,
485             it will not block and wait for new items to be added. Causes any
486             processes blocking on a L<"get">/L<"dequeue">/L<"shift">/L<"pop">
487             call to become unblocked and return C.
488             This method may be called from any process that has access to the queue.
489              
490             Calling C on a queue more than once will generate a warning
491             message, even if the caller is not the same process/thread that
492             made the original C call.
493              
494              
495             =head2 get
496              
497             =head2 dequeue
498              
499             $item = $queue->get;
500             $item = $queue->dequeue;
501              
502             @items = $queue->get($count);
503             @items = $queue->dequeue($count);
504              
505             Attempt to retrieve one or more "items" on the queue. If the
506             queue is empty, and if L<"end"> has not been called on the queue,
507             this call blocks until an item is available or until the L<"end">
508             method has been called from some other process. If the queue is
509             empty and L<"end"> has been called, this method returns an
510             empty list in list context or C in scalar context.
511              
512             If a C<$count> argument is supplied, returns up to C<$count> items
513             or however many items are currently availble on the queue, whichever
514             is fewer. But the call still blocks if L<"end"> has not been called
515             until there is at least one item available. See L<"get_nb"> for a
516             non-blocking version of this method. The return value of this
517             function when a C<$count> argument is supplied is always a list,
518             so if you evaluate it in scalar context you will get the number of items
519             retrieved from the queue, not the items themselves.
520              
521             $job = $q->get; # $job is an item from the queue
522             $job = $q->get(1); # returns # of items retrieved, not an actual item!
523             ($job) = $q->get(1); # $job is an item from the queue
524              
525             The only important difference between C and C is what
526             happens when there is a C<$count> argument, and the queue currently has
527             more than zero but fewer than C<$count> items available. In this case,
528             the C call will return all of the available items. The C
529             method will block until at least C<$count> items are available on the
530             queue, or until the L<"end"> method has been called on the queue.
531             This C behavior is consistent with the behavior of the
532             L<"dequeue" method in Thread::Queue|Thread::Queue/"dequeue">.
533              
534              
535             =head2 pop
536              
537             $item = $queue->pop
538             @items = $queue->pop($count)
539              
540             Retrieves one or more items from the "back" of the queue.
541             For LIFO style queues, the L<"get"> method is equivalent to this method.
542             Like C<"get">, this method blocks while the queue is empty and the
543             L<"end"> method has not been called on the queue.
544              
545             If a C<$count> argument is supplied, returns up to C<$count> items or however
546             many items are currently available on the queue, whichever is fewer.
547             (Like the L<"get"> call, this method blocks when waiting for input. See
548             L<"pop_nb"> for a non-blocking version of the method. Also like
549             L<"get">, you should be wary of using this method in scalar context
550             if you provide a C<$count> argument).
551              
552             =head2 shift
553              
554             $item = $queue->shift
555             @items = $queue->shift($count)
556              
557             Retrieves one or more items from the "front" of the queue.
558             For FIFO style queues, the L<"get"> method is equivalent to this method.
559             Like C<"get">, this method blocks while the queue is empty and the
560             L<"end"> method has not been called on the queue.
561              
562             If a C<$count> argument is supplied, returns up to C<$count> items or however
563             many items are currently available on the queue, whichever is fewer. (Like the
564             L<"get"> call, this method blocks when waiting for input. See
565             L<"shift_nb"> for a non-blocking version of the method. Also like
566             L<"get">, you should be wary of using this method in scalar context
567             if you provide a C<$count> argument).
568              
569              
570             =head2 get_nb
571              
572             =head2 dequeue_nb
573              
574             =head2 pop_nb
575              
576             =head2 shift_nb
577              
578             $item = $queue->XXX_nb
579             @items = $queue->XXX_nb($count)
580              
581             Non-blocking versions of the L<"get">, L<"dequeue">, L<"pop">,
582             and L<"shift"> methods. These functions return immediately if
583             there are no items in the queue to retrieve, returning C
584             in the case with no arguments and an empty list when a
585             C<$count> argument is supplied.
586              
587             =head2 get_timed
588              
589             =head2 dequeue_timed
590              
591             =head2 shift_timed
592              
593             =head2 pop_timed
594              
595             $item = $queue->XXX_timed($timeout)
596             @item = $queue->XXX_timed($timeout,$count)
597              
598             Timed versions of L<"get">, L<"dequeue">, L<"shift">, and L<"pop">
599             that take a C<$timeout> argument and will stop blocking after
600             C<$timeout> seconds have elapsed.
601              
602             If a C<$count> argument is supplied to C, the function
603             will wait up to C<$timeout> seconds for at least C<$count> items to
604             be available on the queue. After C<$timeout> seconds have passed,
605             the function will return up to C<$count> available items.
606              
607             For other timed methods, supplying a C<$count> argument for a
608             queue with more than zero but fewer than C<$count> items available
609             will return all available items without blocking.
610              
611              
612             =head2 peek
613              
614             $item = $queue->peek
615             $item = $queue->peek($index)
616             $item = $queue->peek_front
617             $item = $queue->peek_back
618              
619             Returns an item from the queue without removing it. The C
620             and C methods inspect the item at the front and the back of
621             the queue, respectively. The generic C method is equivalent to
622             C for FIFO style queues and C for LIFO style
623             queues. If an index is specified, returns the item at that position
624             in the queue (where position 0 is the head of the queue). Negative
625             indices are supported, so a call to C<< $queue->peek(-2) >>,
626             for example, would return the second to last item in the queue.
627              
628             If the queue is empty or if the specified index is larger than the
629             number of elements currently in the queue, these methods will
630             return C without blocking.
631              
632             Note that unlike the
633             L<< "peek" method in C|Thread::Queue/"peek" >>,
634             C returns a copy of the item on the queue,
635             so manipulating a reference returned from C while B
636             affect the item on the queue.
637              
638             =head2 extract
639              
640             $item = $queue->extract
641             $item = $queue->extract($index)
642             @items = $queue->extract($index,$count)
643              
644             Removes and returns the specified number of items from the queue
645             at the specified index position, to provide random access to the
646             queue. The method is non-blocking and may return fewer than the
647             number of items requested (or zero items) if there are not enough
648             items in the queue to satisfy the request.
649              
650             If the C<$count> argument is not provided, the method will return
651             (if available) a single item. If the C<$index> argument is also
652             not provided, it will return the first item on the queue exactly
653             like the L<"get_nb"> method with no arguments.
654              
655             Negative C<$index> values are supported, in which case this
656             method will extract the corresponding items at the back of the
657             queue.
658              
659             Like C vs. C, the return value is always a
660             scalar when no C<$count> argument is provided, and always a list
661             when it is.
662              
663              
664             =head2 insert
665              
666             $count = $queue->insert($index, @list)
667              
668             Provides random access to the queue, inserting the items specified
669             in C<@list> into the queue after index position C<$index>.
670             Negative C<$index> values are supported, which indicate that the
671             items should be inserted after that position relative to the
672             back of the queue. Returns the number of items that were
673             inserted into the queue.
674              
675             If the queue has a L<"limit"> set, and inserting all the items on
676             the list would cause the queue size to exceed the limit, the setting
677             of the queue's C<"on_limit">
678             parameter will govern how this method will behave.
679             See the L<"on_limit"|Forks::Queue/"new"> setting for details.
680              
681             This method is inefficient for some queue implementations.
682              
683              
684             =head2 pending
685              
686             $num_items_avail = $queue->pending
687              
688             Returns the total number of items available on the queue. There is no
689             guarantee that the number of available items will not change between a
690             call to C and a subsequent call to L<"get">
691              
692             =head2 clear
693              
694             $queue->clear
695              
696             Removes all items from the queue.
697              
698              
699             =head2 status
700              
701             $status = $queue->status
702              
703             Returns a hash reference with meta information about the queue.
704             The information should at least include the number of items remaining in
705             the queue. Other implementations may provide additional information
706             in this return value.
707              
708              
709             =head2 limit
710              
711             $max = $queue->limit
712             $queue->limit($new_limit)
713             $queue->limit($new_limit,$on_limit)
714             $queue->limit = $new_limit # limit as lvalue requires Perl >=v5.14
715              
716             Returns or updates the maximum size of the queue. With no args, returns
717             the existing maximum queue size, with a non-positive value indicating
718             that the queue does not have a maximum size.
719              
720             The return value also acts as an lvalue through which the maximum
721             queue size can be set, and allows the C method to be used
722             in the same way as
723             L<< the C method in Thread::Queue|Thread::Queue/"limit" >>.
724             I<< Note: lvalue feature rqeuires Perl v5.14 or better. >>
725              
726             If arguments are provided, the first argument is used to set the
727             maximum queue size. A non-positive queue size can be specified to
728             indicate that the queue does not have a maximum size.
729            
730             The second argument, if provided, updates the behavior of the queue
731             when an attempt is made to add items beyond the maximum size.
732             See L<"on_limit"|Forks::Queue/"new"> for the recognized values
733             of this argument and how they affect the behavior of the
734             L<"put">/L<"push">/L<"enqueue">, L<"unshift">,
735             L<"insert">, and L<"dequeue"> methods.
736              
737              
738             =head1 VARIABLES
739              
740             =head2 %OPTS
741              
742             Global hash containing the set of default options for all
743             C constructors. Initially this hash contains the
744             key-value pairs
745              
746             impl "File"
747             style "fifo"
748             limit -1
749             on_limit "fail"
750              
751             but they may be changed at any time to affect all subsequently
752             constructed C objects. The global options can also
753             be set at import time with additional arguments for the C
754             statement.
755              
756             use Forks::Queue impl => 'SQLite'; # use SQLite queues by default
757             $Forks::Queue::OPTS{impl} = 'SQLite'; # equivalent run-time call
758            
759             use Forks::Queue
760             on_limit => 'block', limit => 10; # finite, blocking queues by default
761             $Forks::Queue::OPTS{limit} = 10;
762             $Forks::Queue::OPTS{on_limit} = 'block'; # equivalent run-time calls
763              
764             =head1 ENVIRONMENT
765              
766             Some environment variable settings that can affect this module:
767              
768             =over 4
769              
770             =item * FORKS_QUEUE_IMPL
771              
772             Specifies a default implementation to use, overriding the initial setting
773             of C<$Forks::Queue::OPTS{"impl"}>, in cases where the C
774             constructor is invoked without passing an C option.
775              
776             =item * FORKS_QUEUE_DEBUG
777              
778             If set to a true value, outputs information about the activity of
779             the queues to standard error.
780              
781             =item * FORKS_QUEUE_NOTIFY
782              
783             If set to a false value, disables use of signals on POSIX-y platforms
784             that may help improve queue performance
785              
786             =item * FORKS_QUEUE_DIR
787              
788             Specifies a directory to use for temporary queue files in the
789             L and
790             L implementations.
791             If this directory is not specified, the implementations will try to make
792             a reasonable choice based on your platform and other environment settings.
793              
794             =back
795              
796             =head1 DEPENDENCIES
797              
798             The C module and all its current implementations require
799             the L module.
800              
801             =head1 SEE ALSO
802              
803             L, L,
804             L, L,
805             L, L.
806              
807             =head1 SUPPORT
808              
809             You can find documentation for this module with the perldoc command.
810              
811             perldoc Forks::Queue
812              
813              
814             You can also look for information at:
815              
816             =over 4
817              
818             =item * CPAN Ratings
819              
820             L
821              
822             =item * Search CPAN
823              
824             L
825              
826             =item * Emob@cpan.orgE
827              
828             CPAN's request tracker at L is being decommissioned.
829             You can report bugs and make feature requests for this distribution
830             directly to the author's email, Emob@cpan.orgE.
831              
832             =back
833              
834              
835             =head1 LICENSE AND COPYRIGHT
836              
837             Copyright (c) 2017-2020, Marty O'Brien.
838              
839             This library is free software; you can redistribute it and/or modify
840             it under the same terms as Perl itself, either Perl version 5.10.1 or,
841             at your option, any later version of Perl 5 you may have available.
842              
843             See http://dev.perl.org/licenses/ for more information.
844              
845             =cut
846              
847             # TODO:
848             #
849             # priorities
850             # Directory implementation (see Queue::Dir)
851             # Distinguish enqueue and put . enqueue should behave like
852             # Thread::Queue and only check the limit once