File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Complex.pm
Criterion Covered Total %
statement 113 113 100.0
branch 26 30 86.6
condition n/a
subroutine 24 24 100.0
pod 0 5 0.0
total 163 172 94.7


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Complex::IdleElement;
19 12     12   3644 use Heap::Elem;
  12         432  
  12         386  
20 12     12   63 use base qw(Heap::Elem);
  12         29  
  12         995  
21 12     12   742 BEGIN {eval q(use Time::HiRes qw(time))}
  12     12   76  
  12         50  
  12         118  
22              
23             sub new
24             {
25 267     267   671 my ($class, $id) = @_;
26 267         539 my $self = bless([ @{ $class->SUPER::new($id) }, time() ], $class);
  267         1059  
27             }
28              
29             sub cmp
30             {
31 1434     1434   54209 my ($self, $other) = @_;
32 1434         3632 return $self->[2] <=> $other->[2];
33             }
34              
35             1;
36              
37             package POE::Component::MessageQueue::Storage::Complex;
38 12     12   3038 use Moose;
  12         29  
  12         92  
39             with qw(POE::Component::MessageQueue::Storage::Double);
40              
41 12     12   76564 use POE;
  12         39  
  12         103  
42 12     12   4914 use Heap::Fibonacci;
  12         2964  
  12         323  
43 12     12   753 BEGIN {eval q(use Time::HiRes qw(time))}
  12     12   77  
  12         25  
  12         73  
44              
45             has timeout => (
46             is => 'ro',
47             isa => 'Int',
48             required => 1,
49             );
50              
51             has granularity => (
52             is => 'ro',
53             isa => 'Int',
54             lazy => 1,
55             default => sub { $_[0]->timeout / 2 },
56             );
57              
58             has alias => (
59             is => 'ro',
60             default => 'MQ-Expire-Timer',
61             required => 1,
62             );
63              
64             has front_size => (
65             is => 'rw',
66             isa => 'Int',
67             default => 0,
68             traits => ['Number'],
69             handles => {
70             'more_front' => 'add',
71             'less_front' => 'sub',
72             },
73             );
74              
75             has front_max => (
76             is => 'ro',
77             isa => 'Int',
78             required => 1,
79             );
80              
81             has front_expirations => (
82             is => 'ro',
83             isa => 'HashRef[Num]',
84             default => sub { {} },
85             traits => ['Hash'],
86             handles => {
87             'expire_from_front' => 'set',
88             'delete_front_expiration' => 'delete',
89             'clear_front_expirations' => 'clear',
90             'count_front_expirations' => 'count',
91             'front_expiration_pairs' => 'kv',
92             },
93             );
94              
95             has nonpersistent_expirations => (
96             is => 'ro',
97             isa => 'HashRef',
98             default => sub { {} },
99             traits => ['Hash'],
100             handles => {
101             'expire_nonpersistent' => 'set',
102             'delete_nonpersistent_expiration' => 'delete',
103             'clear_nonpersistent_expirations' => 'clear',
104             'count_nonpersistent_expirations' => 'count',
105             'nonpersistent_expiration_pairs' => 'kv',
106             },
107             );
108              
109             sub count_expirations
110             {
111 272     272 0 511 my $self = $_[0];
112 272         11350 return $self->count_nonpersistent_expirations +
113             $self->count_front_expirations;
114             }
115              
116             has idle_hash => (
117             is => 'ro',
118             isa => 'HashRef',
119             default => sub { {} },
120             traits => ['Hash'],
121             handles => {
122             '_hashset_idle' => 'set',
123             'get_idle' => 'get',
124             'delete_idle' => 'delete',
125             'clear_idle' => 'clear',
126             },
127             );
128              
129             has idle_heap => (
130             is => 'ro',
131             isa => 'Heap::Fibonacci',
132             lazy => 1,
133             default => sub { Heap::Fibonacci->new },
134             clearer => 'reset_idle_heap',
135             );
136              
137             sub set_idle
138             {
139 267     267 0 807 my ($self, @ids) = @_;
140              
141 267         677 my %idles = map {($_ =>
  267         1011  
142             POE::Component::MessageQueue::Storage::Complex::IdleElement->new($_)
143             )} @ids;
144              
145 267         15531 $self->_hashset_idle(%idles);
146 267         9162 $self->idle_heap->add($_) foreach (values %idles);
147             }
148              
149             around delete_idle => sub {
150             my $original = shift;
151             $_[0]->idle_heap->delete($_) foreach ($original->(@_));
152             };
153              
154             after clear_idle => sub {$_[0]->reset_idle_heap()};
155              
156             has shutting_down => (
157             is => 'rw',
158             default => 0,
159             );
160              
161             after remove => sub {
162             my ($self, $arg, $callback) = @_;
163             my $aref = (ref $arg eq 'ARRAY') ? $arg : [$arg];
164             my @ids = (grep $self->in_front($_), @$aref) or return;
165              
166             $self->delete_idle(@ids);
167             $self->delete_front_expiration(@ids);
168             $self->delete_nonpersistent_expiration(@ids);
169              
170             my $sum = 0;
171             foreach my $info ($self->delete_front(@ids))
172             {
173             $sum += $info->{size} if $info;
174             }
175             $self->less_front($sum);
176             };
177              
178             after empty => sub {
179             my ($self) = @_;
180             $self->clear_front();
181             $self->clear_idle();
182             $self->clear_front_expirations();
183             $self->clear_nonpersistent_expirations();
184             $self->front_size(0);
185             };
186              
187             after $_ => sub {$_[0]->_activity($_[1])} foreach qw(claim get);
188              
189             around claim_and_retrieve => sub {
190             my $original = shift;
191             my $self = $_[0];
192             my $callback = pop;
193             $original->(@_, sub {
194             if (my $msg = $_[0])
195             {
196             $self->_activity($msg->id);
197             }
198             goto $callback;
199             });
200             };
201              
202             sub _activity
203             {
204 404     404   1240 my ($self, $arg) = @_;
205 404 50       1458 my $aref = (ref $arg eq 'ARRAY' ? $arg : [$arg]);
206            
207 404         1426 my $time = time();
208 404         16201 foreach my $elem (grep {$_} $self->get_idle(@$aref))
  404         1553  
209             {
210             # we can't just decrease_key, the values get bigger as we go.
211 25         897 $self->idle_heap->delete($elem);
212 25         351 $elem->[2] = $time;
213 25         874 $self->idle_heap->add($elem);
214             }
215             }
216              
217             sub BUILD
218             {
219 3     3 0 8170 my $self = shift;
220             POE::Session->create(
221             object_states => [ $self => [qw(_expire)] ],
222             inline_states => {
223             _start => sub {
224 3     3   1013 $poe_kernel->alias_set($self->alias);
225             },
226             _check => sub {
227 6     6   2857 $poe_kernel->delay(_expire => $self->granularity);
228             },
229             },
230 3         74 );
231 3         799 $self->children({FRONT => $self->front, BACK => $self->back});
232 3         16 $self->add_names('COMPLEX');
233             }
234              
235             sub store
236             {
237 267     267 0 131181 my ($self, $message, $callback) = @_;
238 267         9702 my $id = $message->id;
239              
240 267         7846 $self->more_front($message->size);
241 267         7726 $self->set_front($id => {persisted => 0, size => $message->size});
242 267         1001 $self->set_idle($id);
243              
244             # Move a bunch of messages to the backstore to keep size respectable
245 267         1514 my (@bump, %need_persist);
246 267         9115 while($self->front_size > $self->front_max)
247             {
248 246 50       7592 my $top = $self->idle_heap->extract_top or last;
249 246         3932 my $id = $top->val;
250 246 50       1714 $need_persist{$id} = 1 unless $self->in_back($id);
251 246         9445 $self->less_front($self->delete_front($id)->{size});
252 246         8393 push(@bump, $id);
253             }
254              
255 267 100       802 if(@bump)
256             {
257 246         819 my $idstr = join(', ', @bump);
258 246         1408 $self->log(info => "Bumping ($idstr) off the frontstore.");
259 246         1396 $self->delete_idle(@bump);
260 246         12539 $self->delete_front_expiration(@bump);
261             $self->front->get(\@bump, sub {
262 246     246   833 my $now = time();
263 246         7532 $self->front->remove(\@bump);
264 246         516 $self->back->store($_) foreach
265 218         6158 grep { $need_persist{$_->id} }
266 218 100       7222 grep { !$_->has_expiration or $now < $_->expire_at }
267 246 100       7564 grep { $_->persistent || $_->has_expiration }
268 246         580 @{ $_[0] };
269 246         7654 });
270             }
271              
272 267 100       9316 if ($message->persistent)
    100          
273             {
274 233         7823 $self->expire_from_front($id, time() + $self->timeout);
275             }
276             elsif ($message->has_expiration)
277             {
278 2         76 $self->expire_nonpersistent($id, $message->expire_at);
279             }
280              
281 267         8965 $self->front->store($message, $callback);
282 267 100       9116 $poe_kernel->post($self->alias, '_check') if ($self->count_expirations == 1);
283             }
284              
285             sub _is_expired
286             {
287 10     10   47 my $now = time();
288 6         31 map {$_->[0]}
289 10         44 grep {$_->[1] <= $now}
  18         99  
290             @_;
291             }
292              
293             sub _expire
294             {
295 6     6   4448488 my ($self, $kernel) = @_[OBJECT, KERNEL];
296              
297 6 100       551 return if $self->shutting_down;
298              
299 5 100       364 if (my @front_exp = _is_expired($self->front_expiration_pairs))
300             {
301 1         7 my $idstr = join(', ', @front_exp);
302 1         14 $self->log(info => "Pushing expired messages ($idstr) to backstore.");
303 1         56 $_->{persisted} = 1 foreach $self->get_front(@front_exp);
304 1         45 $self->delete_front_expiration(@front_exp);
305              
306             $self->front->get(\@front_exp, sub {
307             # Messages in two places is dangerous, so we are careful!
308 1     1   5 $self->back->store($_->clone) foreach (@{$_[0]});
  1         54  
309 1         39 });
310             }
311              
312 5 100       375 if (my @np_exp = _is_expired($self->nonpersistent_expiration_pairs))
313             {
314 2         17 my $idstr = join(', ', @np_exp);
315 2         29 $self->log(info => "Nonpersistent messages ($idstr) have expired.");
316 2         18 my @remove = grep { $self->in_back($_) } @np_exp;
  2         18  
317 2 50       101 $self->back->remove(\@remove) if (@remove);
318 2         125 $self->delete_nonpersistent_expiration(@np_exp);
319             }
320              
321 5 100       32 $kernel->yield('_check') if ($self->count_expirations);
322             }
323              
324             sub storage_shutdown
325             {
326 3     3 0 2725 my ($self, $complete) = @_;
327              
328 3         148 $self->shutting_down(1);
329              
330             # shutdown our check messages session
331 3         128 $poe_kernel->alias_remove($self->alias);
332              
333             $self->front->get_all(sub {
334 3     3   12 my $message_aref = $_[0];
335              
336 3 100       13 my @messages = grep {$_->persistent && !$self->in_back($_)}
  9         466  
337             @$message_aref;
338            
339 3         32 $self->log(info => 'Moving all messages into backstore.');
340 3         18 $self->back->store($_) foreach @messages;
341             $self->front->empty(sub {
342             $self->front->storage_shutdown(sub {
343 3         126 $self->back->storage_shutdown($complete);
344 3         130 });
345 3         122 });
346 3         267 });
347             }
348              
349             1;
350              
351             __END__
352              
353             =pod
354              
355             =head1 NAME
356              
357             POE::Component::MessageQueue::Storage::Complex -- A configurable storage
358             engine that keeps a front-store (something fast) and a back-store (something
359             persistent), only storing messages in the back-store after a configurable
360             timeout period.
361              
362             =head1 SYNOPSIS
363              
364             use POE;
365             use POE::Component::MessageQueue;
366             use POE::Component::MessageQueue::Storage::Complex;
367             use strict;
368              
369             POE::Component::MessageQueue->new({
370             storage => POE::Component::MessageQueue::Storage::Complex->new({
371             timeout => 4,
372             granularity => 2,
373              
374             # Only allow the front store to grow to 64Mb
375             front_max => 64 * 1024 * 1024,
376              
377             front => POE::Component::MessageQueue::Storage::Memory->new(),
378             # Or, an alternative memory store is available!
379             #front => POE::Component::MessageQueue::Storage::BigMemory->new(),
380              
381             back => POE::Component::MessageQueue::Storage::Throttled->new({
382             storage => My::Persistent::But::Slow::Datastore->new()
383              
384             # Examples include:
385             #storage => POE::Component::MessageQueue::Storage::DBI->new({ ... });
386             #storage => POE::Component::MessageQueue::Storage::FileSystem->new({ ... });
387             })
388             })
389             });
390              
391             POE::Kernel->run();
392             exit;
393              
394             =head1 DESCRIPTION
395              
396             The idea of having a front store (something quick) and a back store (something
397             persistent) is common and recommended, so this class exists as a helper to
398             implementing that pattern.
399              
400             The front store acts as a cache who's max size is specified by front_max.
401             All messages that come in are added to the front store. Messages are only
402             removed after having been successfully delivered or when pushed out of the
403             cache by newer messages.
404              
405             Persistent messages that are not removed after the number of seconds specified
406             by timeout are added to the back store (but not removed from the front store).
407             This optimization allows for the possibility that messages will be handled
408             before having been persisted, reducing the load on the back store.
409              
410             Non-persistent messages will be discarded when eventually pushed off the front
411             store, unless the I<expire-after> header is specified, in which case they may
412             be stored on the back store inorder to keep around them long enough.
413             Non-persistent messages on the back store which are passed their expiration
414             date will be periodically cleaned up.
415              
416             =head1 CONSTRUCTOR PARAMETERS
417              
418             =over 2
419              
420             =item timeout => SCALAR
421              
422             The number of seconds after a message enters the front-store before it
423             expires. After this time, if the message hasn't been removed, it will be
424             moved into the backstore.
425              
426             =item granularity => SCALAR
427              
428             The number of seconds to wait between checks for timeout expiration.
429              
430             =item front_max => SCALAR
431              
432             The maximum number of bytes to allow the front store to grow to. If the front
433             store grows to big, old messages will be "pushed off" to make room for new
434             messages.
435              
436             =item front => SCALAR
437              
438             An optional reference to a storage engine to use as the front store instead of
439             L<POE::Component::MessageQueue::Storage::BigMemory>.
440              
441             =item back => SCALAR
442              
443             Takes a reference to a storage engine to use as the back store.
444              
445             Using L<POE::Component::MessageQueue::Storage::Throttled> to wrap your main
446             storage engine is highly recommended for the reasons explained in its specific
447             documentation.
448              
449             =back
450              
451             =head1 SUPPORTED STOMP HEADERS
452              
453             =over 4
454              
455             =item B<persistent>
456              
457             I<Fully supported>.
458              
459             =item B<expire-after>
460              
461             I<Fully Supported>.
462              
463             =item B<deliver-after>
464              
465             I<Fully Supported>.
466              
467             =back
468              
469             =head1 SEE ALSO
470              
471             L<POE::Component::MessageQueue::Storage::Complex::Default> - The most common case. Based on this storage engine.
472              
473             L<POE::Component::MessageQueue>,
474             L<POE::Component::MessageQueue::Storage>,
475             L<POE::Component::MessageQueue::Storage::Double>
476              
477             I<Other storage engines:>
478              
479             L<POE::Component::MessageQueue::Storage::Default>,
480             L<POE::Component::MessageQueue::Storage::Memory>,
481             L<POE::Component::MessageQueue::Storage::BigMemory>,
482             L<POE::Component::MessageQueue::Storage::FileSystem>,
483             L<POE::Component::MessageQueue::Storage::DBI>,
484             L<POE::Component::MessageQueue::Storage::Generic>,
485             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
486             L<POE::Component::MessageQueue::Storage::Throttled>
487             L<POE::Component::MessageQueue::Storage::Default>
488              
489             =cut