| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | # | 
| 2 |  |  |  |  |  |  | # Copyright 2007-2010 David Snopek <dsnopek@gmail.com> | 
| 3 |  |  |  |  |  |  | # | 
| 4 |  |  |  |  |  |  | # This program is free software: you can redistribute it and/or modify | 
| 5 |  |  |  |  |  |  | # it under the terms of the GNU General Public License as published by | 
| 6 |  |  |  |  |  |  | # the Free Software Foundation, either version 2 of the License, or | 
| 7 |  |  |  |  |  |  | # (at your option) any later version. | 
| 8 |  |  |  |  |  |  | # | 
| 9 |  |  |  |  |  |  | # This program is distributed in the hope that it will be useful, | 
| 10 |  |  |  |  |  |  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
| 11 |  |  |  |  |  |  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
| 12 |  |  |  |  |  |  | # GNU General Public License for more details. | 
| 13 |  |  |  |  |  |  | # | 
| 14 |  |  |  |  |  |  | # You should have received a copy of the GNU General Public License | 
| 15 |  |  |  |  |  |  | # along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
| 16 |  |  |  |  |  |  | # | 
| 17 |  |  |  |  |  |  |  | 
| 18 |  |  |  |  |  |  | package POE::Component::MessageQueue::Storage::Complex::IdleElement; | 
| 19 | 12 |  |  | 12 |  | 3644 | use Heap::Elem; | 
|  | 12 |  |  |  |  | 432 |  | 
|  | 12 |  |  |  |  | 386 |  | 
| 20 | 12 |  |  | 12 |  | 63 | use base qw(Heap::Elem); | 
|  | 12 |  |  |  |  | 29 |  | 
|  | 12 |  |  |  |  | 995 |  | 
| 21 | 12 |  |  | 12 |  | 742 | BEGIN {eval q(use Time::HiRes qw(time))} | 
|  | 12 |  |  | 12 |  | 76 |  | 
|  | 12 |  |  |  |  | 50 |  | 
|  | 12 |  |  |  |  | 118 |  | 
| 22 |  |  |  |  |  |  |  | 
| 23 |  |  |  |  |  |  | sub new | 
| 24 |  |  |  |  |  |  | { | 
| 25 | 267 |  |  | 267 |  | 671 | my ($class, $id) = @_; | 
| 26 | 267 |  |  |  |  | 539 | my $self = bless([ @{ $class->SUPER::new($id) }, time() ], $class); | 
|  | 267 |  |  |  |  | 1059 |  | 
| 27 |  |  |  |  |  |  | } | 
| 28 |  |  |  |  |  |  |  | 
| 29 |  |  |  |  |  |  | sub cmp | 
| 30 |  |  |  |  |  |  | { | 
| 31 | 1434 |  |  | 1434 |  | 54209 | my ($self, $other) = @_; | 
| 32 | 1434 |  |  |  |  | 3632 | return $self->[2] <=> $other->[2]; | 
| 33 |  |  |  |  |  |  | } | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | 1; | 
| 36 |  |  |  |  |  |  |  | 
| 37 |  |  |  |  |  |  | package POE::Component::MessageQueue::Storage::Complex; | 
| 38 | 12 |  |  | 12 |  | 3038 | use Moose; | 
|  | 12 |  |  |  |  | 29 |  | 
|  | 12 |  |  |  |  | 92 |  | 
| 39 |  |  |  |  |  |  | with qw(POE::Component::MessageQueue::Storage::Double); | 
| 40 |  |  |  |  |  |  |  | 
| 41 | 12 |  |  | 12 |  | 76564 | use POE; | 
|  | 12 |  |  |  |  | 39 |  | 
|  | 12 |  |  |  |  | 103 |  | 
| 42 | 12 |  |  | 12 |  | 4914 | use Heap::Fibonacci; | 
|  | 12 |  |  |  |  | 2964 |  | 
|  | 12 |  |  |  |  | 323 |  | 
| 43 | 12 |  |  | 12 |  | 753 | BEGIN {eval q(use Time::HiRes qw(time))} | 
|  | 12 |  |  | 12 |  | 77 |  | 
|  | 12 |  |  |  |  | 25 |  | 
|  | 12 |  |  |  |  | 73 |  | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  | has timeout => ( | 
| 46 |  |  |  |  |  |  | is       => 'ro', | 
| 47 |  |  |  |  |  |  | isa      => 'Int', | 
| 48 |  |  |  |  |  |  | required => 1, | 
| 49 |  |  |  |  |  |  | ); | 
| 50 |  |  |  |  |  |  |  | 
| 51 |  |  |  |  |  |  | has granularity => ( | 
| 52 |  |  |  |  |  |  | is       => 'ro', | 
| 53 |  |  |  |  |  |  | isa      => 'Int', | 
| 54 |  |  |  |  |  |  | lazy     => 1, | 
| 55 |  |  |  |  |  |  | default  => sub { $_[0]->timeout / 2 }, | 
| 56 |  |  |  |  |  |  | ); | 
| 57 |  |  |  |  |  |  |  | 
| 58 |  |  |  |  |  |  | has alias => ( | 
| 59 |  |  |  |  |  |  | is       => 'ro', | 
| 60 |  |  |  |  |  |  | default  => 'MQ-Expire-Timer', | 
| 61 |  |  |  |  |  |  | required => 1, | 
| 62 |  |  |  |  |  |  | ); | 
| 63 |  |  |  |  |  |  |  | 
| 64 |  |  |  |  |  |  | has front_size => ( | 
| 65 |  |  |  |  |  |  | is      => 'rw', | 
| 66 |  |  |  |  |  |  | isa     => 'Int', | 
| 67 |  |  |  |  |  |  | default => 0, | 
| 68 |  |  |  |  |  |  | traits  => ['Number'], | 
| 69 |  |  |  |  |  |  | handles => { | 
| 70 |  |  |  |  |  |  | 'more_front' => 'add', | 
| 71 |  |  |  |  |  |  | 'less_front' => 'sub', | 
| 72 |  |  |  |  |  |  | }, | 
| 73 |  |  |  |  |  |  | ); | 
| 74 |  |  |  |  |  |  |  | 
| 75 |  |  |  |  |  |  | has front_max => ( | 
| 76 |  |  |  |  |  |  | is       => 'ro', | 
| 77 |  |  |  |  |  |  | isa      => 'Int', | 
| 78 |  |  |  |  |  |  | required => 1, | 
| 79 |  |  |  |  |  |  | ); | 
| 80 |  |  |  |  |  |  |  | 
| 81 |  |  |  |  |  |  | has front_expirations => ( | 
| 82 |  |  |  |  |  |  | is      => 'ro', | 
| 83 |  |  |  |  |  |  | isa     => 'HashRef[Num]', | 
| 84 |  |  |  |  |  |  | default => sub { {} }, | 
| 85 |  |  |  |  |  |  | traits  => ['Hash'], | 
| 86 |  |  |  |  |  |  | handles => { | 
| 87 |  |  |  |  |  |  | 'expire_from_front'       => 'set', | 
| 88 |  |  |  |  |  |  | 'delete_front_expiration' => 'delete', | 
| 89 |  |  |  |  |  |  | 'clear_front_expirations' => 'clear', | 
| 90 |  |  |  |  |  |  | 'count_front_expirations' => 'count', | 
| 91 |  |  |  |  |  |  | 'front_expiration_pairs'  => 'kv', | 
| 92 |  |  |  |  |  |  | }, | 
| 93 |  |  |  |  |  |  | ); | 
| 94 |  |  |  |  |  |  |  | 
| 95 |  |  |  |  |  |  | has nonpersistent_expirations => ( | 
| 96 |  |  |  |  |  |  | is => 'ro', | 
| 97 |  |  |  |  |  |  | isa => 'HashRef', | 
| 98 |  |  |  |  |  |  | default => sub { {} }, | 
| 99 |  |  |  |  |  |  | traits  => ['Hash'], | 
| 100 |  |  |  |  |  |  | handles => { | 
| 101 |  |  |  |  |  |  | 'expire_nonpersistent'            => 'set', | 
| 102 |  |  |  |  |  |  | 'delete_nonpersistent_expiration' => 'delete', | 
| 103 |  |  |  |  |  |  | 'clear_nonpersistent_expirations' => 'clear', | 
| 104 |  |  |  |  |  |  | 'count_nonpersistent_expirations' => 'count', | 
| 105 |  |  |  |  |  |  | 'nonpersistent_expiration_pairs'  => 'kv', | 
| 106 |  |  |  |  |  |  | }, | 
| 107 |  |  |  |  |  |  | ); | 
| 108 |  |  |  |  |  |  |  | 
| 109 |  |  |  |  |  |  | sub count_expirations | 
| 110 |  |  |  |  |  |  | { | 
| 111 | 272 |  |  | 272 | 0 | 511 | my $self = $_[0]; | 
| 112 | 272 |  |  |  |  | 11350 | return $self->count_nonpersistent_expirations + | 
| 113 |  |  |  |  |  |  | $self->count_front_expirations; | 
| 114 |  |  |  |  |  |  | } | 
| 115 |  |  |  |  |  |  |  | 
| 116 |  |  |  |  |  |  | has idle_hash => ( | 
| 117 |  |  |  |  |  |  | is => 'ro', | 
| 118 |  |  |  |  |  |  | isa => 'HashRef', | 
| 119 |  |  |  |  |  |  | default   => sub { {} }, | 
| 120 |  |  |  |  |  |  | traits  => ['Hash'], | 
| 121 |  |  |  |  |  |  | handles => { | 
| 122 |  |  |  |  |  |  | '_hashset_idle' => 'set', | 
| 123 |  |  |  |  |  |  | 'get_idle'      => 'get', | 
| 124 |  |  |  |  |  |  | 'delete_idle'   => 'delete', | 
| 125 |  |  |  |  |  |  | 'clear_idle'    => 'clear', | 
| 126 |  |  |  |  |  |  | }, | 
| 127 |  |  |  |  |  |  | ); | 
| 128 |  |  |  |  |  |  |  | 
| 129 |  |  |  |  |  |  | has idle_heap => ( | 
| 130 |  |  |  |  |  |  | is => 'ro', | 
| 131 |  |  |  |  |  |  | isa => 'Heap::Fibonacci', | 
| 132 |  |  |  |  |  |  | lazy => 1, | 
| 133 |  |  |  |  |  |  | default => sub { Heap::Fibonacci->new }, | 
| 134 |  |  |  |  |  |  | clearer => 'reset_idle_heap', | 
| 135 |  |  |  |  |  |  | ); | 
| 136 |  |  |  |  |  |  |  | 
| 137 |  |  |  |  |  |  | sub set_idle | 
| 138 |  |  |  |  |  |  | { | 
| 139 | 267 |  |  | 267 | 0 | 807 | my ($self, @ids) = @_; | 
| 140 |  |  |  |  |  |  |  | 
| 141 | 267 |  |  |  |  | 677 | my %idles = map {($_ => | 
|  | 267 |  |  |  |  | 1011 |  | 
| 142 |  |  |  |  |  |  | POE::Component::MessageQueue::Storage::Complex::IdleElement->new($_) | 
| 143 |  |  |  |  |  |  | )} @ids; | 
| 144 |  |  |  |  |  |  |  | 
| 145 | 267 |  |  |  |  | 15531 | $self->_hashset_idle(%idles); | 
| 146 | 267 |  |  |  |  | 9162 | $self->idle_heap->add($_) foreach (values %idles); | 
| 147 |  |  |  |  |  |  | } | 
| 148 |  |  |  |  |  |  |  | 
| 149 |  |  |  |  |  |  | around delete_idle => sub { | 
| 150 |  |  |  |  |  |  | my $original = shift; | 
| 151 |  |  |  |  |  |  | $_[0]->idle_heap->delete($_) foreach ($original->(@_)); | 
| 152 |  |  |  |  |  |  | }; | 
| 153 |  |  |  |  |  |  |  | 
| 154 |  |  |  |  |  |  | after clear_idle => sub {$_[0]->reset_idle_heap()}; | 
| 155 |  |  |  |  |  |  |  | 
| 156 |  |  |  |  |  |  | has shutting_down => ( | 
| 157 |  |  |  |  |  |  | is       => 'rw', | 
| 158 |  |  |  |  |  |  | default  => 0, | 
| 159 |  |  |  |  |  |  | ); | 
| 160 |  |  |  |  |  |  |  | 
| 161 |  |  |  |  |  |  | after remove => sub { | 
| 162 |  |  |  |  |  |  | my ($self, $arg, $callback) = @_; | 
| 163 |  |  |  |  |  |  | my $aref = (ref $arg eq 'ARRAY') ? $arg : [$arg]; | 
| 164 |  |  |  |  |  |  | my @ids = (grep $self->in_front($_), @$aref) or return; | 
| 165 |  |  |  |  |  |  |  | 
| 166 |  |  |  |  |  |  | $self->delete_idle(@ids); | 
| 167 |  |  |  |  |  |  | $self->delete_front_expiration(@ids); | 
| 168 |  |  |  |  |  |  | $self->delete_nonpersistent_expiration(@ids); | 
| 169 |  |  |  |  |  |  |  | 
| 170 |  |  |  |  |  |  | my $sum = 0; | 
| 171 |  |  |  |  |  |  | foreach my $info ($self->delete_front(@ids)) | 
| 172 |  |  |  |  |  |  | { | 
| 173 |  |  |  |  |  |  | $sum += $info->{size} if $info; | 
| 174 |  |  |  |  |  |  | } | 
| 175 |  |  |  |  |  |  | $self->less_front($sum); | 
| 176 |  |  |  |  |  |  | }; | 
| 177 |  |  |  |  |  |  |  | 
| 178 |  |  |  |  |  |  | after empty => sub { | 
| 179 |  |  |  |  |  |  | my ($self) = @_; | 
| 180 |  |  |  |  |  |  | $self->clear_front(); | 
| 181 |  |  |  |  |  |  | $self->clear_idle(); | 
| 182 |  |  |  |  |  |  | $self->clear_front_expirations(); | 
| 183 |  |  |  |  |  |  | $self->clear_nonpersistent_expirations(); | 
| 184 |  |  |  |  |  |  | $self->front_size(0); | 
| 185 |  |  |  |  |  |  | }; | 
| 186 |  |  |  |  |  |  |  | 
| 187 |  |  |  |  |  |  | after $_ => sub {$_[0]->_activity($_[1])} foreach qw(claim get); | 
| 188 |  |  |  |  |  |  |  | 
| 189 |  |  |  |  |  |  | around claim_and_retrieve => sub { | 
| 190 |  |  |  |  |  |  | my $original = shift; | 
| 191 |  |  |  |  |  |  | my $self = $_[0]; | 
| 192 |  |  |  |  |  |  | my $callback = pop; | 
| 193 |  |  |  |  |  |  | $original->(@_, sub { | 
| 194 |  |  |  |  |  |  | if (my $msg = $_[0]) | 
| 195 |  |  |  |  |  |  | { | 
| 196 |  |  |  |  |  |  | $self->_activity($msg->id); | 
| 197 |  |  |  |  |  |  | } | 
| 198 |  |  |  |  |  |  | goto $callback; | 
| 199 |  |  |  |  |  |  | }); | 
| 200 |  |  |  |  |  |  | }; | 
| 201 |  |  |  |  |  |  |  | 
| 202 |  |  |  |  |  |  | sub _activity | 
| 203 |  |  |  |  |  |  | { | 
| 204 | 404 |  |  | 404 |  | 1240 | my ($self, $arg) = @_; | 
| 205 | 404 | 50 |  |  |  | 1458 | my $aref = (ref $arg eq 'ARRAY' ? $arg : [$arg]); | 
| 206 |  |  |  |  |  |  |  | 
| 207 | 404 |  |  |  |  | 1426 | my $time = time(); | 
| 208 | 404 |  |  |  |  | 16201 | foreach my $elem (grep {$_} $self->get_idle(@$aref)) | 
|  | 404 |  |  |  |  | 1553 |  | 
| 209 |  |  |  |  |  |  | { | 
| 210 |  |  |  |  |  |  | # we can't just decrease_key, the values get bigger as we go. | 
| 211 | 25 |  |  |  |  | 897 | $self->idle_heap->delete($elem); | 
| 212 | 25 |  |  |  |  | 351 | $elem->[2] = $time; | 
| 213 | 25 |  |  |  |  | 874 | $self->idle_heap->add($elem); | 
| 214 |  |  |  |  |  |  | } | 
| 215 |  |  |  |  |  |  | } | 
| 216 |  |  |  |  |  |  |  | 
| 217 |  |  |  |  |  |  | sub BUILD | 
| 218 |  |  |  |  |  |  | { | 
| 219 | 3 |  |  | 3 | 0 | 8170 | my $self = shift; | 
| 220 |  |  |  |  |  |  | POE::Session->create( | 
| 221 |  |  |  |  |  |  | object_states => [ $self => [qw(_expire)] ], | 
| 222 |  |  |  |  |  |  | inline_states => { | 
| 223 |  |  |  |  |  |  | _start => sub { | 
| 224 | 3 |  |  | 3 |  | 1013 | $poe_kernel->alias_set($self->alias); | 
| 225 |  |  |  |  |  |  | }, | 
| 226 |  |  |  |  |  |  | _check => sub { | 
| 227 | 6 |  |  | 6 |  | 2857 | $poe_kernel->delay(_expire => $self->granularity); | 
| 228 |  |  |  |  |  |  | }, | 
| 229 |  |  |  |  |  |  | }, | 
| 230 | 3 |  |  |  |  | 74 | ); | 
| 231 | 3 |  |  |  |  | 799 | $self->children({FRONT => $self->front, BACK => $self->back}); | 
| 232 | 3 |  |  |  |  | 16 | $self->add_names('COMPLEX'); | 
| 233 |  |  |  |  |  |  | } | 
| 234 |  |  |  |  |  |  |  | 
| 235 |  |  |  |  |  |  | sub store | 
| 236 |  |  |  |  |  |  | { | 
| 237 | 267 |  |  | 267 | 0 | 131181 | my ($self, $message, $callback) = @_; | 
| 238 | 267 |  |  |  |  | 9702 | my $id = $message->id; | 
| 239 |  |  |  |  |  |  |  | 
| 240 | 267 |  |  |  |  | 7846 | $self->more_front($message->size); | 
| 241 | 267 |  |  |  |  | 7726 | $self->set_front($id => {persisted => 0, size => $message->size}); | 
| 242 | 267 |  |  |  |  | 1001 | $self->set_idle($id); | 
| 243 |  |  |  |  |  |  |  | 
| 244 |  |  |  |  |  |  | # Move a bunch of messages to the backstore to keep size respectable | 
| 245 | 267 |  |  |  |  | 1514 | my (@bump, %need_persist); | 
| 246 | 267 |  |  |  |  | 9115 | while($self->front_size > $self->front_max) | 
| 247 |  |  |  |  |  |  | { | 
| 248 | 246 | 50 |  |  |  | 7592 | my $top = $self->idle_heap->extract_top or last; | 
| 249 | 246 |  |  |  |  | 3932 | my $id = $top->val; | 
| 250 | 246 | 50 |  |  |  | 1714 | $need_persist{$id} = 1 unless $self->in_back($id); | 
| 251 | 246 |  |  |  |  | 9445 | $self->less_front($self->delete_front($id)->{size}); | 
| 252 | 246 |  |  |  |  | 8393 | push(@bump, $id); | 
| 253 |  |  |  |  |  |  | } | 
| 254 |  |  |  |  |  |  |  | 
| 255 | 267 | 100 |  |  |  | 802 | if(@bump) | 
| 256 |  |  |  |  |  |  | { | 
| 257 | 246 |  |  |  |  | 819 | my $idstr = join(', ', @bump); | 
| 258 | 246 |  |  |  |  | 1408 | $self->log(info => "Bumping ($idstr) off the frontstore."); | 
| 259 | 246 |  |  |  |  | 1396 | $self->delete_idle(@bump); | 
| 260 | 246 |  |  |  |  | 12539 | $self->delete_front_expiration(@bump); | 
| 261 |  |  |  |  |  |  | $self->front->get(\@bump, sub { | 
| 262 | 246 |  |  | 246 |  | 833 | my $now = time(); | 
| 263 | 246 |  |  |  |  | 7532 | $self->front->remove(\@bump); | 
| 264 | 246 |  |  |  |  | 516 | $self->back->store($_) foreach | 
| 265 | 218 |  |  |  |  | 6158 | grep { $need_persist{$_->id} } | 
| 266 | 218 | 100 |  |  |  | 7222 | grep { !$_->has_expiration or $now < $_->expire_at } | 
| 267 | 246 | 100 |  |  |  | 7564 | grep { $_->persistent || $_->has_expiration } | 
| 268 | 246 |  |  |  |  | 580 | @{ $_[0] }; | 
| 269 | 246 |  |  |  |  | 7654 | }); | 
| 270 |  |  |  |  |  |  | } | 
| 271 |  |  |  |  |  |  |  | 
| 272 | 267 | 100 |  |  |  | 9316 | if ($message->persistent) | 
|  |  | 100 |  |  |  |  |  | 
| 273 |  |  |  |  |  |  | { | 
| 274 | 233 |  |  |  |  | 7823 | $self->expire_from_front($id, time() + $self->timeout); | 
| 275 |  |  |  |  |  |  | } | 
| 276 |  |  |  |  |  |  | elsif ($message->has_expiration) | 
| 277 |  |  |  |  |  |  | { | 
| 278 | 2 |  |  |  |  | 76 | $self->expire_nonpersistent($id, $message->expire_at); | 
| 279 |  |  |  |  |  |  | } | 
| 280 |  |  |  |  |  |  |  | 
| 281 | 267 |  |  |  |  | 8965 | $self->front->store($message, $callback); | 
| 282 | 267 | 100 |  |  |  | 9116 | $poe_kernel->post($self->alias, '_check') if ($self->count_expirations == 1); | 
| 283 |  |  |  |  |  |  | } | 
| 284 |  |  |  |  |  |  |  | 
| 285 |  |  |  |  |  |  | sub _is_expired | 
| 286 |  |  |  |  |  |  | { | 
| 287 | 10 |  |  | 10 |  | 47 | my $now = time(); | 
| 288 | 6 |  |  |  |  | 31 | map  {$_->[0]} | 
| 289 | 10 |  |  |  |  | 44 | grep {$_->[1] <= $now} | 
|  | 18 |  |  |  |  | 99 |  | 
| 290 |  |  |  |  |  |  | @_; | 
| 291 |  |  |  |  |  |  | } | 
| 292 |  |  |  |  |  |  |  | 
| 293 |  |  |  |  |  |  | sub _expire | 
| 294 |  |  |  |  |  |  | { | 
| 295 | 6 |  |  | 6 |  | 4448488 | my ($self, $kernel) = @_[OBJECT, KERNEL]; | 
| 296 |  |  |  |  |  |  |  | 
| 297 | 6 | 100 |  |  |  | 551 | return if $self->shutting_down; | 
| 298 |  |  |  |  |  |  |  | 
| 299 | 5 | 100 |  |  |  | 364 | if (my @front_exp = _is_expired($self->front_expiration_pairs)) | 
| 300 |  |  |  |  |  |  | { | 
| 301 | 1 |  |  |  |  | 7 | my $idstr = join(', ', @front_exp); | 
| 302 | 1 |  |  |  |  | 14 | $self->log(info => "Pushing expired messages ($idstr) to backstore."); | 
| 303 | 1 |  |  |  |  | 56 | $_->{persisted} = 1 foreach $self->get_front(@front_exp); | 
| 304 | 1 |  |  |  |  | 45 | $self->delete_front_expiration(@front_exp); | 
| 305 |  |  |  |  |  |  |  | 
| 306 |  |  |  |  |  |  | $self->front->get(\@front_exp, sub { | 
| 307 |  |  |  |  |  |  | # Messages in two places is dangerous, so we are careful! | 
| 308 | 1 |  |  | 1 |  | 5 | $self->back->store($_->clone) foreach (@{$_[0]}); | 
|  | 1 |  |  |  |  | 54 |  | 
| 309 | 1 |  |  |  |  | 39 | }); | 
| 310 |  |  |  |  |  |  | } | 
| 311 |  |  |  |  |  |  |  | 
| 312 | 5 | 100 |  |  |  | 375 | if (my @np_exp = _is_expired($self->nonpersistent_expiration_pairs)) | 
| 313 |  |  |  |  |  |  | { | 
| 314 | 2 |  |  |  |  | 17 | my $idstr = join(', ', @np_exp); | 
| 315 | 2 |  |  |  |  | 29 | $self->log(info => "Nonpersistent messages ($idstr) have expired."); | 
| 316 | 2 |  |  |  |  | 18 | my @remove = grep { $self->in_back($_) } @np_exp; | 
|  | 2 |  |  |  |  | 18 |  | 
| 317 | 2 | 50 |  |  |  | 101 | $self->back->remove(\@remove) if (@remove); | 
| 318 | 2 |  |  |  |  | 125 | $self->delete_nonpersistent_expiration(@np_exp); | 
| 319 |  |  |  |  |  |  | } | 
| 320 |  |  |  |  |  |  |  | 
| 321 | 5 | 100 |  |  |  | 32 | $kernel->yield('_check') if ($self->count_expirations); | 
| 322 |  |  |  |  |  |  | } | 
| 323 |  |  |  |  |  |  |  | 
| 324 |  |  |  |  |  |  | sub storage_shutdown | 
| 325 |  |  |  |  |  |  | { | 
| 326 | 3 |  |  | 3 | 0 | 2725 | my ($self, $complete) = @_; | 
| 327 |  |  |  |  |  |  |  | 
| 328 | 3 |  |  |  |  | 148 | $self->shutting_down(1); | 
| 329 |  |  |  |  |  |  |  | 
| 330 |  |  |  |  |  |  | # shutdown our check messages session | 
| 331 | 3 |  |  |  |  | 128 | $poe_kernel->alias_remove($self->alias); | 
| 332 |  |  |  |  |  |  |  | 
| 333 |  |  |  |  |  |  | $self->front->get_all(sub { | 
| 334 | 3 |  |  | 3 |  | 12 | my $message_aref = $_[0]; | 
| 335 |  |  |  |  |  |  |  | 
| 336 | 3 | 100 |  |  |  | 13 | my @messages = grep {$_->persistent && !$self->in_back($_)} | 
|  | 9 |  |  |  |  | 466 |  | 
| 337 |  |  |  |  |  |  | @$message_aref; | 
| 338 |  |  |  |  |  |  |  | 
| 339 | 3 |  |  |  |  | 32 | $self->log(info => 'Moving all messages into backstore.'); | 
| 340 | 3 |  |  |  |  | 18 | $self->back->store($_) foreach @messages; | 
| 341 |  |  |  |  |  |  | $self->front->empty(sub { | 
| 342 |  |  |  |  |  |  | $self->front->storage_shutdown(sub { | 
| 343 | 3 |  |  |  |  | 126 | $self->back->storage_shutdown($complete); | 
| 344 | 3 |  |  |  |  | 130 | }); | 
| 345 | 3 |  |  |  |  | 122 | }); | 
| 346 | 3 |  |  |  |  | 267 | }); | 
| 347 |  |  |  |  |  |  | } | 
| 348 |  |  |  |  |  |  |  | 
| 349 |  |  |  |  |  |  | 1; | 
| 350 |  |  |  |  |  |  |  | 
| 351 |  |  |  |  |  |  | __END__ | 
| 352 |  |  |  |  |  |  |  | 
| 353 |  |  |  |  |  |  | =pod | 
| 354 |  |  |  |  |  |  |  | 
| 355 |  |  |  |  |  |  | =head1 NAME | 
| 356 |  |  |  |  |  |  |  | 
| 357 |  |  |  |  |  |  | POE::Component::MessageQueue::Storage::Complex -- A configurable storage | 
| 358 |  |  |  |  |  |  | engine that keeps a front-store (something fast) and a back-store (something | 
| 359 |  |  |  |  |  |  | persistent), only storing messages in the back-store after a configurable | 
| 360 |  |  |  |  |  |  | timeout period. | 
| 361 |  |  |  |  |  |  |  | 
| 362 |  |  |  |  |  |  | =head1 SYNOPSIS | 
| 363 |  |  |  |  |  |  |  | 
| 364 |  |  |  |  |  |  | use POE; | 
| 365 |  |  |  |  |  |  | use POE::Component::MessageQueue; | 
| 366 |  |  |  |  |  |  | use POE::Component::MessageQueue::Storage::Complex; | 
| 367 |  |  |  |  |  |  | use strict; | 
| 368 |  |  |  |  |  |  |  | 
| 369 |  |  |  |  |  |  | POE::Component::MessageQueue->new({ | 
| 370 |  |  |  |  |  |  | storage => POE::Component::MessageQueue::Storage::Complex->new({ | 
| 371 |  |  |  |  |  |  | timeout      => 4, | 
| 372 |  |  |  |  |  |  | granularity  => 2, | 
| 373 |  |  |  |  |  |  |  | 
| 374 |  |  |  |  |  |  | # Only allow the front store to grow to 64Mb | 
| 375 |  |  |  |  |  |  | front_max => 64 * 1024 * 1024, | 
| 376 |  |  |  |  |  |  |  | 
| 377 |  |  |  |  |  |  | front => POE::Component::MessageQueue::Storage::Memory->new(), | 
| 378 |  |  |  |  |  |  | # Or, an alternative memory store is available! | 
| 379 |  |  |  |  |  |  | #front => POE::Component::MessageQueue::Storage::BigMemory->new(), | 
| 380 |  |  |  |  |  |  |  | 
| 381 |  |  |  |  |  |  | back => POE::Component::MessageQueue::Storage::Throttled->new({ | 
| 382 |  |  |  |  |  |  | storage => My::Persistent::But::Slow::Datastore->new() | 
| 383 |  |  |  |  |  |  |  | 
| 384 |  |  |  |  |  |  | # Examples include: | 
| 385 |  |  |  |  |  |  | #storage => POE::Component::MessageQueue::Storage::DBI->new({ ... }); | 
| 386 |  |  |  |  |  |  | #storage => POE::Component::MessageQueue::Storage::FileSystem->new({ ... }); | 
| 387 |  |  |  |  |  |  | }) | 
| 388 |  |  |  |  |  |  | }) | 
| 389 |  |  |  |  |  |  | }); | 
| 390 |  |  |  |  |  |  |  | 
| 391 |  |  |  |  |  |  | POE::Kernel->run(); | 
| 392 |  |  |  |  |  |  | exit; | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | =head1 DESCRIPTION | 
| 395 |  |  |  |  |  |  |  | 
| 396 |  |  |  |  |  |  | The idea of having a front store (something quick) and a back store (something | 
| 397 |  |  |  |  |  |  | persistent) is common and recommended, so this class exists as a helper to | 
| 398 |  |  |  |  |  |  | implementing that pattern. | 
| 399 |  |  |  |  |  |  |  | 
| 400 |  |  |  |  |  |  | The front store acts as a cache who's max size is specified by front_max. | 
| 401 |  |  |  |  |  |  | All messages that come in are added to the front store.  Messages are only | 
| 402 |  |  |  |  |  |  | removed after having been successfully delivered or when pushed out of the | 
| 403 |  |  |  |  |  |  | cache by newer messages. | 
| 404 |  |  |  |  |  |  |  | 
| 405 |  |  |  |  |  |  | Persistent messages that are not removed after the number of seconds specified | 
| 406 |  |  |  |  |  |  | by timeout are added to the back store (but not removed from the front store). | 
| 407 |  |  |  |  |  |  | This optimization allows for the possibility that messages will be handled | 
| 408 |  |  |  |  |  |  | before having been persisted, reducing the load on the back store. | 
| 409 |  |  |  |  |  |  |  | 
| 410 |  |  |  |  |  |  | Non-persistent messages will be discarded when eventually pushed off the front | 
| 411 |  |  |  |  |  |  | store, unless the I<expire-after> header is specified, in which case they may | 
| 412 |  |  |  |  |  |  | be stored on the back store inorder to keep around them long enough. | 
| 413 |  |  |  |  |  |  | Non-persistent messages on the back store which are passed their expiration | 
| 414 |  |  |  |  |  |  | date will be periodically cleaned up. | 
| 415 |  |  |  |  |  |  |  | 
| 416 |  |  |  |  |  |  | =head1 CONSTRUCTOR PARAMETERS | 
| 417 |  |  |  |  |  |  |  | 
| 418 |  |  |  |  |  |  | =over 2 | 
| 419 |  |  |  |  |  |  |  | 
| 420 |  |  |  |  |  |  | =item timeout => SCALAR | 
| 421 |  |  |  |  |  |  |  | 
| 422 |  |  |  |  |  |  | The number of seconds after a message enters the front-store before it | 
| 423 |  |  |  |  |  |  | expires.  After this time, if the message hasn't been removed, it will be | 
| 424 |  |  |  |  |  |  | moved into the backstore. | 
| 425 |  |  |  |  |  |  |  | 
| 426 |  |  |  |  |  |  | =item granularity => SCALAR | 
| 427 |  |  |  |  |  |  |  | 
| 428 |  |  |  |  |  |  | The number of seconds to wait between checks for timeout expiration. | 
| 429 |  |  |  |  |  |  |  | 
| 430 |  |  |  |  |  |  | =item front_max => SCALAR | 
| 431 |  |  |  |  |  |  |  | 
| 432 |  |  |  |  |  |  | The maximum number of bytes to allow the front store to grow to.  If the front | 
| 433 |  |  |  |  |  |  | store grows to big, old messages will be "pushed off" to make room for new | 
| 434 |  |  |  |  |  |  | messages. | 
| 435 |  |  |  |  |  |  |  | 
| 436 |  |  |  |  |  |  | =item front => SCALAR | 
| 437 |  |  |  |  |  |  |  | 
| 438 |  |  |  |  |  |  | An optional reference to a storage engine to use as the front store instead of | 
| 439 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::BigMemory>. | 
| 440 |  |  |  |  |  |  |  | 
| 441 |  |  |  |  |  |  | =item back => SCALAR | 
| 442 |  |  |  |  |  |  |  | 
| 443 |  |  |  |  |  |  | Takes a reference to a storage engine to use as the back store. | 
| 444 |  |  |  |  |  |  |  | 
| 445 |  |  |  |  |  |  | Using L<POE::Component::MessageQueue::Storage::Throttled> to wrap your main | 
| 446 |  |  |  |  |  |  | storage engine is highly recommended for the reasons explained in its specific | 
| 447 |  |  |  |  |  |  | documentation. | 
| 448 |  |  |  |  |  |  |  | 
| 449 |  |  |  |  |  |  | =back | 
| 450 |  |  |  |  |  |  |  | 
| 451 |  |  |  |  |  |  | =head1 SUPPORTED STOMP HEADERS | 
| 452 |  |  |  |  |  |  |  | 
| 453 |  |  |  |  |  |  | =over 4 | 
| 454 |  |  |  |  |  |  |  | 
| 455 |  |  |  |  |  |  | =item B<persistent> | 
| 456 |  |  |  |  |  |  |  | 
| 457 |  |  |  |  |  |  | I<Fully supported>. | 
| 458 |  |  |  |  |  |  |  | 
| 459 |  |  |  |  |  |  | =item B<expire-after> | 
| 460 |  |  |  |  |  |  |  | 
| 461 |  |  |  |  |  |  | I<Fully Supported>. | 
| 462 |  |  |  |  |  |  |  | 
| 463 |  |  |  |  |  |  | =item B<deliver-after> | 
| 464 |  |  |  |  |  |  |  | 
| 465 |  |  |  |  |  |  | I<Fully Supported>. | 
| 466 |  |  |  |  |  |  |  | 
| 467 |  |  |  |  |  |  | =back | 
| 468 |  |  |  |  |  |  |  | 
| 469 |  |  |  |  |  |  | =head1 SEE ALSO | 
| 470 |  |  |  |  |  |  |  | 
| 471 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Complex::Default> - The most common case.  Based on this storage engine. | 
| 472 |  |  |  |  |  |  |  | 
| 473 |  |  |  |  |  |  | L<POE::Component::MessageQueue>, | 
| 474 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage>, | 
| 475 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Double> | 
| 476 |  |  |  |  |  |  |  | 
| 477 |  |  |  |  |  |  | I<Other storage engines:> | 
| 478 |  |  |  |  |  |  |  | 
| 479 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Default>, | 
| 480 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Memory>, | 
| 481 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::BigMemory>, | 
| 482 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::FileSystem>, | 
| 483 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::DBI>, | 
| 484 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Generic>, | 
| 485 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Generic::DBI>, | 
| 486 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Throttled> | 
| 487 |  |  |  |  |  |  | L<POE::Component::MessageQueue::Storage::Default> | 
| 488 |  |  |  |  |  |  |  | 
| 489 |  |  |  |  |  |  | =cut |