File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Generic.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Generic;
19 1     1   1897 use Moose;
  0            
  0            
20             use POE;
21             use POE::Component::Generic 0.1001;
22             use POE::Component::MessageQueue::Logger;
23              
24             # We're going to proxy some methods to the generic object. Yay MOP!
25             my @proxy_methods = qw(
26             get get_all
27             get_oldest claim_and_retrieve
28             claim empty
29             remove store
30             disown_all disown_destination
31             );
32             foreach my $method (@proxy_methods)
33             {
34             __PACKAGE__->meta->add_method($method, sub {
35             my ($self, @args) = @_;
36             $self->generic->yield(
37             $method,
38             {session => $self->alias, event => '_general_handler'},
39             @args,
40             );
41             return;
42             });
43             }
44              
45             # Have to do with after we add those methods, or the role will fail.
46             with qw(POE::Component::MessageQueue::Storage);
47              
48             has alias => (
49             is => 'ro',
50             isa => 'Str',
51             default => 'MQ-Storage-Generic',
52             required => 1,
53             );
54              
55             has generic => (
56             is => 'rw',
57             isa => 'POE::Component::Generic',
58             );
59              
60             has package => (
61             is => 'ro',
62             isa => 'Str',
63             required => 1,
64             );
65              
66             has options => (
67             is => 'rw',
68             isa => 'ArrayRef',
69             default => sub { [] },
70             );
71              
72             # Because PoCo::Generic needs the constructor options passed to it in this
73             # funny way, we have to set up generic in BUILD.
74             sub BUILD
75             {
76             my $self = $_[0];
77              
78             POE::Session->create(
79             object_states => [
80             $self => [qw(_general_handler _log_proxy _error _start _shutdown)],
81             ],
82             );
83              
84             $self->generic(POE::Component::Generic->spawn(
85             package => $self->package,
86             object_options => $self->options,
87             packages => {
88             $self->package, {
89             callbacks => [@proxy_methods, qw(storage_shutdown)],
90             postbacks => [qw(set_log_function)],
91             },
92             },
93             error => {
94             session => $self->alias,
95             event => '_error'
96             },
97             #debug => 1,
98             #verbose => 1,
99             ));
100              
101             $self->generic->set_log_function({}, {
102             session => $self->alias,
103             event => '_log_proxy'
104             });
105              
106             use POE::Component::MessageQueue;
107             $self->generic->ignore_signals({},
108             POE::Component::MessageQueue->SHUTDOWN_SIGNALS);
109             };
110              
111             sub _start
112             {
113             my ($self, $kernel) = @_[OBJECT, KERNEL];
114             $kernel->alias_set($self->alias);
115             }
116              
117             sub _shutdown
118             {
119             my ($self, $kernel, $callback) = @_[OBJECT, KERNEL, ARG0];
120             $self->generic->shutdown();
121             $kernel->alias_remove($self->alias);
122             $self->log('alert', 'Generic storage engine is shutdown!');
123             goto $callback;
124             }
125              
126             sub storage_shutdown
127             {
128             my ($self, $complete) = @_;
129             $self->log('alert', 'Shutting down generic storage engine...');
130              
131             # Send the shutdown message to generic - it will come back when it's cleaned
132             # up its resources, and we can stop it for reals (as well as stop our own
133             # session).
134             $self->generic->yield('storage_shutdown', {}, sub {
135             $poe_kernel->post($self->alias, '_shutdown', $complete);
136             });
137              
138             return;
139             }
140              
141             sub _general_handler
142             {
143             my ($self, $kernel, $ref, $result) = @_[ OBJECT, KERNEL, ARG0, ARG1 ];
144              
145             if ( $ref->{error} )
146             {
147             $self->log('error', "Generic error: $ref->{error}");
148             }
149             return;
150             }
151              
152             sub _error
153             {
154             my ( $self, $err ) = @_[ OBJECT, ARG0 ];
155              
156             if ( $err->{stderr} )
157             {
158             $self->log('debug', $err->{stderr});
159             }
160             else
161             {
162             my $op = $err->{operation} || q{};
163             my $num = $err->{errnum} || q{};
164             my $str = $err->{errstr} || q{};
165             $self->log('error', "Generic error: $op $num $str");
166             }
167             return;
168             }
169              
170             sub _log_proxy
171             {
172             my ($self, $type, $msg) = @_[ OBJECT, ARG0, ARG1 ];
173              
174             $self->log($type, $msg);
175             return;
176             }
177              
178             1;
179              
180             __END__
181              
182             =pod
183              
184             =head1 NAME
185              
186             POE::Component::MessageQueue::Storage::Generic -- Wraps storage engines that aren't asynchronous via L<POE::Component::Generic> so they can be used.
187              
188             =head1 SYNOPSIS
189              
190             use POE;
191             use POE::Component::MessageQueue;
192             use POE::Component::MessageQueue::Storage::Generic;
193             use POE::Component::MessageQueue::Storage::Generic::DBI;
194             use strict;
195              
196             # For mysql:
197             my $DB_DSN = 'DBI:mysql:database=perl_mq';
198             my $DB_USERNAME = 'perl_mq';
199             my $DB_PASSWORD = 'perl_mq';
200             my $DB_OPTIONS = undef;
201              
202             POE::Component::MessageQueue->new({
203             storage => POE::Component::MessageQueue::Storage::Generic->new({
204             package => 'POE::Component::MessageQueue::Storage::DBI',
205             options => [
206             dsn => $DB_DSN,
207             username => $DB_USERNAME,
208             password => $DB_PASSWORD,
209             options => $DB_OPTIONS
210             ],
211             })
212             });
213              
214             POE::Kernel->run();
215             exit;
216              
217             =head1 DESCRIPTION
218              
219             Wraps storage engines that aren't asynchronous via L<POE::Component::Generic> so they can be used.
220              
221             Using this module is by far the easiest way to write custom storage engines because you don't have to worry about making your operations asynchronous. This approach isn't without its down-sides, but on the whole, the simplicity is worth it.
222              
223             There is only one package currently provided designed to work with this module: L<POE::Component::MessageQueue::Storage::Generic::DBI>.
224              
225             =head1 ATTRIBUTES
226              
227             =over 2
228              
229             =item package_name
230              
231             The name of the package to wrap. Required.
232              
233             =item options
234              
235             An arrayref of the options to be passed to the supplied package's constructor.
236              
237             =back
238              
239             =head1 SEE ALSO
240              
241             L<POE::Component::MessageQueue>,
242             L<POE::Component::MessageQueue::Storage>,
243             L<POE::Component::Generic>
244              
245             I<Other storage engines:>
246              
247             L<POE::Component::MessageQueue::Storage::Memory>,
248             L<POE::Component::MessageQueue::Storage::BigMemory>,
249             L<POE::Component::MessageQueue::Storage::FileSystem>,
250             L<POE::Component::MessageQueue::Storage::DBI>,
251             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
252             L<POE::Component::MessageQueue::Storage::Throttled>,
253             L<POE::Component::MessageQueue::Storage::Complex>,
254             L<POE::Component::MessageQueue::Storage::Default>
255              
256             =cut
257