File Coverage

blib/lib/Argon/Queue.pm
Criterion Covered Total %
statement 61 61 100.0
branch 14 14 100.0
condition 4 5 80.0
subroutine 13 13 100.0
pod 4 5 80.0
total 96 98 97.9


line stmt bran cond sub pod time code
1             package Argon::Queue;
2             # ABSTRACT: Bounded, prioritized queue class
3             $Argon::Queue::VERSION = '0.18';
4              
5 1     1   243678 use strict;
  1         12  
  1         35  
6 1     1   9 use warnings;
  1         3  
  1         36  
7 1     1   8 use Carp;
  1         3  
  1         92  
8 1     1   473 use Moose;
  1         680344  
  1         9  
9 1     1   8435 use Argon::Constants qw(:priorities);
  1         2  
  1         143  
10 1     1   284 use Argon::Tracker;
  1         4  
  1         63  
11 1     1   13 use Argon::Log;
  1         2  
  1         710  
12              
13              
14             has max => (
15             is => 'rw',
16             isa => 'Int',
17             default => 0,
18             );
19              
20             has tracker => (
21             is => 'ro',
22             isa => 'Argon::Tracker',
23             lazy => 1,
24             builder => '_build_tracker',
25             handles => {
26             },
27             );
28              
29             sub _build_tracker {
30 2     2   7 my $self = shift;
31 2         30 Argon::Tracker->new(
32             capacity => $self->max,
33             length => $self->max * $self->max,
34             );
35             }
36              
37             has msgs => (
38             is => 'ro',
39             isa => 'ArrayRef[ArrayRef[Argon::Message]]',
40             default => sub { [[], [], []] },
41             );
42              
43             has count => (
44             is => 'rw',
45             isa => 'Int',
46             default => 0,
47             traits => ['Counter'],
48             handles => {
49             inc_count => 'inc',
50             dec_count => 'dec',
51             }
52             );
53              
54             has balanced => (
55             is => 'rw',
56             isa => 'Int',
57             default => sub { time },
58             );
59              
60             after max => sub {
61             my $self = shift;
62             if (@_) {
63             $self->tracker->capacity($self->max);
64             $self->tracker->length($self->max * $self->max);
65             }
66             };
67              
68              
69 8     8 1 492 sub is_empty { $_[0]->count == 0 }
70 9     9 1 340 sub is_full { $_[0]->count >= $_[0]->max }
71              
72              
73             sub put {
74 7     7 1 1394 my ($self, $msg) = @_;
75              
76 7 100 100     170 croak 'usage: $queue->put($msg)'
      66        
77             unless defined $msg
78             && (ref $msg || '') eq 'Argon::Message';
79              
80 6         30 $self->promote;
81              
82 6 100       218 croak 'queue full' if $self->is_full;
83              
84 5         55 push @{$self->msgs->[$msg->pri]}, $msg;
  5         174  
85              
86 5         263 $self->tracker->start($msg);
87 5         221 $self->inc_count;
88 5         166 $self->count;
89             }
90              
91              
92             sub get {
93 5     5 1 498 my $self = shift;
94 5 100       18 return if $self->is_empty;
95              
96 4         16 foreach my $pri ($HIGH, $NORMAL, $LOW) {
97 8         264 my $queue = $self->msgs->[$pri];
98              
99 8 100       86 if (@$queue) {
100 4         13 my $msg = shift @$queue;
101 4         130 $self->tracker->finish($msg);
102 4         179 $self->dec_count;
103              
104 4         28 return $msg;
105             }
106             }
107             }
108              
109             sub promote {
110 9     9 0 33 my $self = shift;
111 9         548 my $avg = $self->tracker->avg_time;
112 9         34 my $max = $avg * 1.5;
113 9 100       360 return 0 unless time - $self->balanced >= $max;
114              
115 4         11 my $moved = 0;
116              
117 4         14 foreach my $pri ($LOW, $NORMAL) {
118 8         16 while (my $msg = shift @{$self->msgs->[$pri]}) {
  9         281  
119 3 100       137 if ($self->tracker->age($msg) > $max) {
120 1         2 push @{$self->msgs->[$pri - 1]}, $msg;
  1         34  
121 1         140 $self->tracker->touch($msg);
122 1         3 ++$moved;
123             } else {
124 2         5 unshift @{$self->msgs->[$pri]}, $msg;
  2         83  
125 2         10 last;
126             }
127             }
128             }
129              
130 4 100       18 log_trace 'promoted %d msgs', $moved
131             if $moved;
132              
133 4         181 return $moved;
134             }
135              
136             __PACKAGE__->meta->make_immutable;
137              
138             1;
139              
140             __END__
141              
142             =pod
143              
144             =encoding UTF-8
145              
146             =head1 NAME
147              
148             Argon::Queue - Bounded, prioritized queue class
149              
150             =head1 VERSION
151              
152             version 0.18
153              
154             =head1 SYNOPSIS
155              
156             use Argon::Queue;
157              
158             my $q = Argon::Queue->new(max => 32);
159              
160             unless ($q->is_full) {
161             $q->put(...);
162             }
163              
164             unless ($q->is_empty) {
165             my $msg = $q->get;
166             }
167              
168             =head1 DESCRIPTION
169              
170             The bounded priority queue used by the L<Argon::Manager> for L<Argon::Message>s
171             submitted to the Ar network.
172              
173             =head1 ATTRIBUTES
174              
175             =head2 max
176              
177             The maximum number of messages supported by the queue.
178              
179             =head1 METHODS
180              
181             =head2 is_empty
182              
183             Returns true if the queue is empty.
184              
185             =head2 is_full
186              
187             Returns true if there is at least one L<Argon::Message> in the queue.
188              
189             =head2 put
190              
191             Adds a message to the queue. Croaks if the supplied message is not an
192             L<Argon::Message> or if the queue is full.
193              
194             Adding a message to the queue has the side effect of promoting any previously
195             added messages that have been stuck at a lower level priority for an overly
196             long time.
197              
198             =head2 get
199              
200             Removes and returns the next L<Argon::Message> on the queue. Returns C<undef>
201             if the queue is empty.
202              
203             =head1 AUTHOR
204              
205             Jeff Ober <sysread@fastmail.fm>
206              
207             =head1 COPYRIGHT AND LICENSE
208              
209             This software is copyright (c) 2017 by Jeff Ober.
210              
211             This is free software; you can redistribute it and/or modify it under
212             the same terms as the Perl 5 programming language system itself.
213              
214             =cut