File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Complex.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Complex::IdleElement;
19 1     1   2363 use Heap::Elem;
  0            
  0            
20             use base qw(Heap::Elem);
21             BEGIN {eval q(use Time::HiRes qw(time))}
22              
23             sub new
24             {
25             my ($class, $id) = @_;
26             my $self = bless([ @{ $class->SUPER::new($id) }, time() ], $class);
27             }
28              
29             sub cmp
30             {
31             my ($self, $other) = @_;
32             return $self->[2] <=> $other->[2];
33             }
34              
35             1;
36              
37             package POE::Component::MessageQueue::Storage::Complex;
38             use Moose;
39             with qw(POE::Component::MessageQueue::Storage::Double);
40              
41             use POE;
42             use Heap::Fibonacci;
43             BEGIN {eval q(use Time::HiRes qw(time))}
44              
45             has timeout => (
46             is => 'ro',
47             isa => 'Int',
48             required => 1,
49             );
50              
51             has granularity => (
52             is => 'ro',
53             isa => 'Int',
54             lazy => 1,
55             default => sub { $_[0]->timeout / 2 },
56             );
57              
58             has alias => (
59             is => 'ro',
60             default => 'MQ-Expire-Timer',
61             required => 1,
62             );
63              
64             has front_size => (
65             is => 'rw',
66             isa => 'Int',
67             default => 0,
68             traits => ['Number'],
69             handles => {
70             'more_front' => 'add',
71             'less_front' => 'sub',
72             },
73             );
74              
75             has front_max => (
76             is => 'ro',
77             isa => 'Int',
78             required => 1,
79             );
80              
81             has front_expirations => (
82             is => 'ro',
83             isa => 'HashRef[Num]',
84             default => sub { {} },
85             traits => ['Hash'],
86             handles => {
87             'expire_from_front' => 'set',
88             'delete_front_expiration' => 'delete',
89             'clear_front_expirations' => 'clear',
90             'count_front_expirations' => 'count',
91             'front_expiration_pairs' => 'kv',
92             },
93             );
94              
95             has nonpersistent_expirations => (
96             is => 'ro',
97             isa => 'HashRef',
98             default => sub { {} },
99             traits => ['Hash'],
100             handles => {
101             'expire_nonpersistent' => 'set',
102             'delete_nonpersistent_expiration' => 'delete',
103             'clear_nonpersistent_expirations' => 'clear',
104             'count_nonpersistent_expirations' => 'count',
105             'nonpersistent_expiration_pairs' => 'kv',
106             },
107             );
108              
109             sub count_expirations
110             {
111             my $self = $_[0];
112             return $self->count_nonpersistent_expirations +
113             $self->count_front_expirations;
114             }
115              
116             has idle_hash => (
117             is => 'ro',
118             isa => 'HashRef',
119             default => sub { {} },
120             traits => ['Hash'],
121             handles => {
122             '_hashset_idle' => 'set',
123             'get_idle' => 'get',
124             'delete_idle' => 'delete',
125             'clear_idle' => 'clear',
126             },
127             );
128              
129             has idle_heap => (
130             is => 'ro',
131             isa => 'Heap::Fibonacci',
132             lazy => 1,
133             default => sub { Heap::Fibonacci->new },
134             clearer => 'reset_idle_heap',
135             );
136              
137             sub set_idle
138             {
139             my ($self, @ids) = @_;
140              
141             my %idles = map {($_ =>
142             POE::Component::MessageQueue::Storage::Complex::IdleElement->new($_)
143             )} @ids;
144              
145             $self->_hashset_idle(%idles);
146             $self->idle_heap->add($_) foreach (values %idles);
147             }
148              
149             around delete_idle => sub {
150             my $original = shift;
151             $_[0]->idle_heap->delete($_) foreach ($original->(@_));
152             };
153              
154             after clear_idle => sub {$_[0]->reset_idle_heap()};
155              
156             has shutting_down => (
157             is => 'rw',
158             default => 0,
159             );
160              
161             after remove => sub {
162             my ($self, $arg, $callback) = @_;
163             my $aref = (ref $arg eq 'ARRAY') ? $arg : [$arg];
164             my @ids = (grep $self->in_front($_), @$aref) or return;
165              
166             $self->delete_idle(@ids);
167             $self->delete_front_expiration(@ids);
168             $self->delete_nonpersistent_expiration(@ids);
169              
170             my $sum = 0;
171             foreach my $info ($self->delete_front(@ids))
172             {
173             $sum += $info->{size} if $info;
174             }
175             $self->less_front($sum);
176             };
177              
178             after empty => sub {
179             my ($self) = @_;
180             $self->clear_front();
181             $self->clear_idle();
182             $self->clear_front_expirations();
183             $self->clear_nonpersistent_expirations();
184             $self->front_size(0);
185             };
186              
187             after $_ => sub {$_[0]->_activity($_[1])} foreach qw(claim get);
188              
189             around claim_and_retrieve => sub {
190             my $original = shift;
191             my $self = $_[0];
192             my $callback = pop;
193             $original->(@_, sub {
194             if (my $msg = $_[0])
195             {
196             $self->_activity($msg->id);
197             }
198             goto $callback;
199             });
200             };
201              
202             sub _activity
203             {
204             my ($self, $arg) = @_;
205             my $aref = (ref $arg eq 'ARRAY' ? $arg : [$arg]);
206            
207             my $time = time();
208             foreach my $elem (grep {$_} $self->get_idle(@$aref))
209             {
210             # we can't just decrease_key, the values get bigger as we go.
211             $self->idle_heap->delete($elem);
212             $elem->[2] = $time;
213             $self->idle_heap->add($elem);
214             }
215             }
216              
217             sub BUILD
218             {
219             my $self = shift;
220             POE::Session->create(
221             object_states => [ $self => [qw(_expire)] ],
222             inline_states => {
223             _start => sub {
224             $poe_kernel->alias_set($self->alias);
225             },
226             _check => sub {
227             $poe_kernel->delay(_expire => $self->granularity);
228             },
229             },
230             );
231             $self->children({FRONT => $self->front, BACK => $self->back});
232             $self->add_names('COMPLEX');
233             }
234              
235             sub store
236             {
237             my ($self, $message, $callback) = @_;
238             my $id = $message->id;
239              
240             $self->more_front($message->size);
241             $self->set_front($id => {persisted => 0, size => $message->size});
242             $self->set_idle($id);
243              
244             # Move a bunch of messages to the backstore to keep size respectable
245             my (@bump, %need_persist);
246             while($self->front_size > $self->front_max)
247             {
248             my $top = $self->idle_heap->extract_top or last;
249             my $id = $top->val;
250             $need_persist{$id} = 1 unless $self->in_back($id);
251             $self->less_front($self->delete_front($id)->{size});
252             push(@bump, $id);
253             }
254              
255             if(@bump)
256             {
257             my $idstr = join(', ', @bump);
258             $self->log(info => "Bumping ($idstr) off the frontstore.");
259             $self->delete_idle(@bump);
260             $self->delete_front_expiration(@bump);
261             $self->front->get(\@bump, sub {
262             my $now = time();
263             $self->front->remove(\@bump);
264             $self->back->store($_) foreach
265             grep { $need_persist{$_->id} }
266             grep { !$_->has_expiration or $now < $_->expire_at }
267             grep { $_->persistent || $_->has_expiration }
268             @{ $_[0] };
269             });
270             }
271              
272             if ($message->persistent)
273             {
274             $self->expire_from_front($id, time() + $self->timeout);
275             }
276             elsif ($message->has_expiration)
277             {
278             $self->expire_nonpersistent($id, $message->expire_at);
279             }
280              
281             $self->front->store($message, $callback);
282             $poe_kernel->post($self->alias, '_check') if ($self->count_expirations == 1);
283             }
284              
285             sub _is_expired
286             {
287             my $now = time();
288             map {$_->[0]}
289             grep {$_->[1] <= $now}
290             @_;
291             }
292              
293             sub _expire
294             {
295             my ($self, $kernel) = @_[OBJECT, KERNEL];
296              
297             return if $self->shutting_down;
298              
299             if (my @front_exp = _is_expired($self->front_expiration_pairs))
300             {
301             my $idstr = join(', ', @front_exp);
302             $self->log(info => "Pushing expired messages ($idstr) to backstore.");
303             $_->{persisted} = 1 foreach $self->get_front(@front_exp);
304             $self->delete_front_expiration(@front_exp);
305              
306             $self->front->get(\@front_exp, sub {
307             # Messages in two places is dangerous, so we are careful!
308             $self->back->store($_->clone) foreach (@{$_[0]});
309             });
310             }
311              
312             if (my @np_exp = _is_expired($self->nonpersistent_expiration_pairs))
313             {
314             my $idstr = join(', ', @np_exp);
315             $self->log(info => "Nonpersistent messages ($idstr) have expired.");
316             my @remove = grep { $self->in_back($_) } @np_exp;
317             $self->back->remove(\@remove) if (@remove);
318             $self->delete_nonpersistent_expiration(@np_exp);
319             }
320              
321             $kernel->yield('_check') if ($self->count_expirations);
322             }
323              
324             sub storage_shutdown
325             {
326             my ($self, $complete) = @_;
327              
328             $self->shutting_down(1);
329              
330             # shutdown our check messages session
331             $poe_kernel->alias_remove($self->alias);
332              
333             $self->front->get_all(sub {
334             my $message_aref = $_[0];
335              
336             my @messages = grep {$_->persistent && !$self->in_back($_)}
337             @$message_aref;
338            
339             $self->log(info => 'Moving all messages into backstore.');
340             $self->back->store($_) foreach @messages;
341             $self->front->empty(sub {
342             $self->front->storage_shutdown(sub {
343             $self->back->storage_shutdown($complete);
344             });
345             });
346             });
347             }
348              
349             1;
350              
351             __END__
352              
353             =pod
354              
355             =head1 NAME
356              
357             POE::Component::MessageQueue::Storage::Complex -- A configurable storage
358             engine that keeps a front-store (something fast) and a back-store (something
359             persistent), only storing messages in the back-store after a configurable
360             timeout period.
361              
362             =head1 SYNOPSIS
363              
364             use POE;
365             use POE::Component::MessageQueue;
366             use POE::Component::MessageQueue::Storage::Complex;
367             use strict;
368              
369             POE::Component::MessageQueue->new({
370             storage => POE::Component::MessageQueue::Storage::Complex->new({
371             timeout => 4,
372             granularity => 2,
373              
374             # Only allow the front store to grow to 64Mb
375             front_max => 64 * 1024 * 1024,
376              
377             front => POE::Component::MessageQueue::Storage::Memory->new(),
378             # Or, an alternative memory store is available!
379             #front => POE::Component::MessageQueue::Storage::BigMemory->new(),
380              
381             back => POE::Component::MessageQueue::Storage::Throttled->new({
382             storage => My::Persistent::But::Slow::Datastore->new()
383              
384             # Examples include:
385             #storage => POE::Component::MessageQueue::Storage::DBI->new({ ... });
386             #storage => POE::Component::MessageQueue::Storage::FileSystem->new({ ... });
387             })
388             })
389             });
390              
391             POE::Kernel->run();
392             exit;
393              
394             =head1 DESCRIPTION
395              
396             The idea of having a front store (something quick) and a back store (something
397             persistent) is common and recommended, so this class exists as a helper to
398             implementing that pattern.
399              
400             The front store acts as a cache who's max size is specified by front_max.
401             All messages that come in are added to the front store. Messages are only
402             removed after having been successfully delivered or when pushed out of the
403             cache by newer messages.
404              
405             Persistent messages that are not removed after the number of seconds specified
406             by timeout are added to the back store (but not removed from the front store).
407             This optimization allows for the possibility that messages will be handled
408             before having been persisted, reducing the load on the back store.
409              
410             Non-persistent messages will be discarded when eventually pushed off the front
411             store, unless the I<expire-after> header is specified, in which case they may
412             be stored on the back store inorder to keep around them long enough.
413             Non-persistent messages on the back store which are passed their expiration
414             date will be periodically cleaned up.
415              
416             =head1 CONSTRUCTOR PARAMETERS
417              
418             =over 2
419              
420             =item timeout => SCALAR
421              
422             The number of seconds after a message enters the front-store before it
423             expires. After this time, if the message hasn't been removed, it will be
424             moved into the backstore.
425              
426             =item granularity => SCALAR
427              
428             The number of seconds to wait between checks for timeout expiration.
429              
430             =item front_max => SCALAR
431              
432             The maximum number of bytes to allow the front store to grow to. If the front
433             store grows to big, old messages will be "pushed off" to make room for new
434             messages.
435              
436             =item front => SCALAR
437              
438             An optional reference to a storage engine to use as the front store instead of
439             L<POE::Component::MessageQueue::Storage::BigMemory>.
440              
441             =item back => SCALAR
442              
443             Takes a reference to a storage engine to use as the back store.
444              
445             Using L<POE::Component::MessageQueue::Storage::Throttled> to wrap your main
446             storage engine is highly recommended for the reasons explained in its specific
447             documentation.
448              
449             =back
450              
451             =head1 SUPPORTED STOMP HEADERS
452              
453             =over 4
454              
455             =item B<persistent>
456              
457             I<Fully supported>.
458              
459             =item B<expire-after>
460              
461             I<Fully Supported>.
462              
463             =item B<deliver-after>
464              
465             I<Fully Supported>.
466              
467             =back
468              
469             =head1 SEE ALSO
470              
471             L<POE::Component::MessageQueue::Storage::Complex::Default> - The most common case. Based on this storage engine.
472              
473             L<POE::Component::MessageQueue>,
474             L<POE::Component::MessageQueue::Storage>,
475             L<POE::Component::MessageQueue::Storage::Double>
476              
477             I<Other storage engines:>
478              
479             L<POE::Component::MessageQueue::Storage::Default>,
480             L<POE::Component::MessageQueue::Storage::Memory>,
481             L<POE::Component::MessageQueue::Storage::BigMemory>,
482             L<POE::Component::MessageQueue::Storage::FileSystem>,
483             L<POE::Component::MessageQueue::Storage::DBI>,
484             L<POE::Component::MessageQueue::Storage::Generic>,
485             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
486             L<POE::Component::MessageQueue::Storage::Throttled>
487             L<POE::Component::MessageQueue::Storage::Default>
488              
489             =cut