File Coverage

blib/lib/AnyEvent/STOMP.pm
Criterion Covered Total %
statement 18 89 20.2
branch 0 34 0.0
condition 0 12 0.0
subroutine 6 18 33.3
pod 2 5 40.0
total 26 158 16.4


line stmt bran cond sub pod time code
1             package AnyEvent::STOMP;
2              
3 1     1   42868 use 5.008;
  1         5  
  1         42  
4 1     1   3809 use common::sense;
  1         15  
  1         6  
5              
6 1     1   79 use base 'Object::Event';
  1         8  
  1         4440  
7 1     1   124962 use Carp qw(croak);
  1         3  
  1         70  
8 1     1   7 use AnyEvent;
  1         2  
  1         27  
9 1     1   8406 use AnyEvent::Handle;
  1         12221  
  1         1591  
10              
11             our $VERSION = 0.7;
12              
13             =head1 NAME
14              
15             AnyEvent::STOMP - A lightweight event-driven STOMP client
16              
17             =head1 SYNOPSIS
18              
19             use AnyEvent;
20             use AnyEvent::STOMP;
21              
22             my $client = AnyEvent::STOMP->connect($host, $port, $ssl, $destination, $ack,
23             { connect_headers },
24             { subscribe_headers });
25              
26             $client->send($command, $headers, $body);
27              
28             # Register interest in new messages
29             $client->reg_cb(MESSAGE => sub {
30             my (undef, $body, $headers) = @_;
31             # Do something with the frame
32             });
33              
34             # Register interest in any frame received. Use with caution.
35             $client->reg_cb(frame => sub {
36             my (undef, $type, $body, $headers) = @_;
37             # Do something with the frame
38             });
39              
40             # Start the event loop
41             AnyEvent->condvar->recv;
42              
43             =cut
44              
45             =head1 DESCRIPTION
46              
47             AnyEvent::STOMP is a lightweight event-driven STOMP client. It's intended
48             to be run in the context of an event loop driven by AnyEvent.
49              
50             =head2 Making a connection
51              
52             my $client = AnyEvent::STOMP->connect($host, $port, $ssl, $destination, $ack,
53             { connect_headers },
54             { subscribe_headers });
55              
56             Only the first parameter (the hostname) is required. The remaining optional
57             arguments are:
58              
59             =over
60              
61             =item port
62              
63             The port number to connect to. If not specified, defaults to 61612 (if SSL/TLS
64             is used) or 61613 (if not).
65              
66             =item ssl
67              
68             If set to a true value, use SSL/TLS to connect.
69              
70             =item destination
71              
72             If defined, subscribe to the specified destination (queue) upon connection.
73              
74             =item ack
75              
76             Sets the behavior with respect to STOMP frame acknowledgments.
77              
78             If this value is 0 or undef, no acknowledgment is required: the server will
79             consider all sent frames to be delivered, regardless of whether the client has
80             actually received them. (This is the default behavior according to the STOMP
81             protocol.)
82              
83             If set to C<auto>, the client will automatically acknowledge a frame upon
84             receipt.
85              
86             If set to C<manual>, the caller must acknowledge frames manually via the
87             ack() method.
88              
89             =item connect_headers
90              
91             An anonymous hash of headers (key/value pairs) to send in the STOMP CONNECT
92             frame.
93              
94             =item subscribe_headers
95              
96             An anonymous hash of headers (key/value pairs) to send in the STOMP SUBSCRIBE
97             frame.
98              
99             =back
100              
101             =cut
102              
103             sub connect {
104 0     0 1   my $class = shift;
105 0           my ($host, $port, $ssl, $destination, $ack,
106             $connect_headers, $subscribe_headers) = @_;
107              
108 0 0         croak 'No host provided' unless $host;
109 0 0 0       croak "ack value must be 0, undef, 'auto' or 'manual'"
      0        
110             if $ack && $ack ne 'auto' && $ack ne 'manual';
111              
112 0           my $self = $class->SUPER::new;
113              
114 0           $self->{ack} = $ack;
115              
116 0 0 0       $port ||= ($ssl ? 61612 : 61613);
117              
118 0           my $connect_cb;
119             $self->{handle} = AnyEvent::Handle->new(
120             connect => [ $host, $port ],
121             tls => $ssl ? 'connect' : undef,
122             keepalive => 1,
123 0     0     on_prepare => sub { $self->event('prepare', @_); },
124             on_connect => sub {
125 0     0     $self->event('connect', @_);
126 0           $self->send_frame('CONNECT', undef, $connect_headers);
127 0 0         if ($destination) {
128 0           $subscribe_headers->{destination} = $destination;
129 0 0         $subscribe_headers->{ack} = 'client' if $ack;
130             $connect_cb = $self->reg_cb(CONNECTED => sub {
131 0           $self->{session_id} = $_[2]->{session};
132 0           $self->send_frame('SUBSCRIBE',
133             undef, $subscribe_headers);
134 0           undef $connect_cb;
135 0           });
136             }
137             },
138             on_connect_error => sub {
139 0     0     $self->event('connect_error', $_[1]);
140             },
141             on_error => sub {
142 0 0   0     $self->unreg_cb($connect_cb) if (defined $connect_cb);
143 0           $self->{handle}->destroy;
144 0           $self->event('io_error', $_[2]);
145             },
146 0     0     on_read => sub { $self->_receive_frame },
147 0 0         );
148 0           return bless($self, $class);
149             }
150              
151             =head2 Sending a message
152              
153             To send a message, just call send() with the body, the destination (queue)
154             name, and (optionally) any additional headers:
155              
156             $client->send($body, $destination, $headers); # headers may be undef
157              
158             =cut
159              
160             sub send {
161 0     0 0   my $self = shift;
162 0           my ($body, $destination, $headers) = @_;
163              
164 0 0         croak 'Missing destination' unless defined $destination;
165              
166 0           $headers->{destination} = $destination;
167 0           $self->send_frame('SEND', $body, $headers);
168             }
169              
170             =head2 Sending STOMP frames
171              
172             You can also send arbitrary STOMP frames:
173              
174             $client->send_frame($command, $body, $headers); # headers may be undef
175              
176             See the STOMP protocol documentation for more details on valid commands and
177             headers.
178              
179             =head3 Content Length
180              
181             The C<content-length> header is special because it is sometimes used to
182             indicate the length of the body but also the JMS type of the message in
183             ActiveMQ as per L<http://activemq.apache.org/stomp.html>.
184              
185             If you do not supply a C<content-length> header, following the protocol
186             recommendations, a C<content-length> header will be added if the frame has a
187             body.
188              
189             If you do supply a numerical C<content-length> header, it will be used as
190             is. Warning: this may give unexpected results if the supplied value does not
191             match the body length. Use only with caution!
192              
193             Finally, if you supply an empty string as the C<content-length> header, it
194             will not be sent, even if the frame has a body. This can be used to mark a
195             message as being a TextMessage for ActiveMQ. Here is an example of this:
196              
197             $client->send_frame($command, $body, { 'content-length' => '' } );
198              
199             =cut
200              
201             sub send_frame {
202 0     0 0   my $self = shift;
203 0           my ($command, $body, $headers) = @_;
204              
205 0 0         croak 'Missing command' unless $command;
206              
207 0           my $tmp = $headers->{'content-length'};
208 0 0         if (!defined $tmp) {
    0          
209 0           $headers->{'content-length'} = length $body;
210             } elsif ($tmp eq '') {
211 0           delete $headers->{'content-length'};
212             }
213              
214 0           my $frame = sprintf("%s\n%s\n%s\000",
215             $command,
216 0           join('', map { "$_:$headers->{$_}\n" } keys %$headers),
217             $body);
218              
219 0           $self->{handle}->push_write($frame);
220             }
221              
222             =head2 Events
223              
224             Once you've connected, you can register interest in events, most commonly
225             the receipt of new messages (assuming you've connected as a subscriber).
226              
227             The typical use is:
228              
229             $client->reg_cb($type => $cb->($client, $body, $headers));
230              
231             In most cases, $type is C<MESSAGE>, but you can also register interest
232             in any other type of frame (C<RECEIPT>, C<ERROR>, etc.). (If you register
233             interest in C<CONNECTED> frames, please do so with a priority of C<after>;
234             see Object::Event for more details.)
235              
236             The client object (which can usually be ignored), body and headers (as an
237             anonymous hash of key-value pairs) will be passed to your callback.
238              
239             Other events you can register interest in are:
240              
241             =over
242              
243             =item prepare => $cb->($client, $handle)
244              
245             Will be fired after a client socket has been allocated. See C<on_prepare> in
246             AnyEvent::Handle for more details.
247              
248             =item connect => $cb->($client, $handle, $host, $port, $retry->())
249              
250             Will be fired when the client has successfully connected to the STOMP server.
251             See C<on_connect> in AnyEvent::Handle for more details.
252              
253             =item frame => $cb->($client, $type, $body, $headers)
254              
255             Will be fired whenever any frame is received.
256              
257             =item connect_error => $cb->($client, $errmsg)
258              
259             Will be fired if the attempt to connect to the STOMP server fails. See
260             C<on_connect_error> in AnyEvent::Handle for more details.
261              
262             =item io_error => $cb->($client, $errmsg)
263              
264             Will be fired if an I/O error occurs. See C<on_error> in AnyEvent::Handle
265             for more details.
266              
267             =back
268              
269             =cut
270              
271             sub _receive_frame {
272 0     0     my $self = shift;
273              
274 0           my $command;
275 0           my $headers = {};
276 0           my $body;
277              
278             $self->{handle}->unshift_read(regex => qr/\n*([^\n].*?)\n\n/s,
279             sub {
280 0     0     my $raw_headers = $_[1];
281 0 0         if ($raw_headers =~ s/^(.+)\n//) {
282 0           $command = $1;
283             }
284 0           foreach my $line (split(/\n/, $raw_headers)) {
285 0           my ($key, $value) = split(/\s*:\s*/, $line, 2);
286 0           $headers->{$key} = $value;
287             }
288 0           my @args;
289 0 0         if (my $content_length = $headers->{'content-length'}) {
290 0           @args = ('chunk' => $content_length + 1);
291             } else {
292 0           @args = ('regex' => qr/.*?\000\n*/s);
293             }
294             $self->{handle}->unshift_read(@args, sub {
295 0           $body = $_[1];
296 0           $body =~ s/\000\n*$//;
297              
298 0 0 0       if ($self->{ack} eq 'auto' && defined $headers->{'message-id'}) {
299 0           $self->send_frame('ACK', undef,
300             {'message-id' => $headers->{'message-id'}});
301             }
302              
303 0           $self->event($command, $body, $headers);
304 0           $self->event('frame', $command, $body, $headers);
305 0           });
306 0           });
307             }
308              
309             =head2 Acknowledging frames
310              
311             You can acknowledge a frame received via the ack() method:
312              
313             $client->ack($id, $transaction);
314              
315             The transaction is optional.
316              
317             =cut
318              
319             sub ack {
320 0     0 1   my $self = shift;
321 0           my ($id, $transaction) = @_;
322              
323 0 0         croak 'Missing ID' unless $id;
324              
325 0           my $headers = { 'message-id' => $id };
326 0 0         $headers->{transaction} = $transaction if defined $transaction;
327              
328 0           $self->send_frame('ACK', undef, $headers);
329             }
330              
331             =head2 Closing a session
332              
333             When done with a session, you need to explicitly call the destroy() method.
334             It will also send a DISCONNECT message on your behalf before closing.
335             Attempting to let the object fall out-of scope is not sufficient.
336              
337             $client->destroy;
338              
339             =cut
340              
341             sub destroy {
342 0     0 0   my ($self) = shift;
343 0 0         $self->send_frame('DISCONNECT') if defined undef $self->{handle};
344 0           undef $self->{handle};
345             }
346              
347             =head1 SEE ALSO
348              
349             AnyEvent, AnyEvent::Handle, Object::Event, STOMP Protocol
350             L<http://stomp.codehaus.org/Protocol>
351              
352             =head1 AUTHORS AND CONTRIBUTORS
353              
354             Fulko.Hew (L<fulko.hew@gmail.com>) is the current maintainer.
355              
356             Michael S. Fischer (L<michael+cpan@dynamine.net>) wrote the original version.
357              
358             =head1 COPYRIGHT AND LICENSE
359              
360             (C) 2014 SITA INC Canada, Inc.
361             (C) 2010 Yahoo! Inc.
362              
363             This program is free software; you can redistribute it and/or modify it
364             under the terms of either: the GNU General Public License as published
365             by the Free Software Foundation; or the Artistic License.
366              
367             See http://dev.perl.org/licenses/ for more information.
368              
369             =cut
370              
371             1;
372              
373             __END__
374              
375             # vim:syn=perl:sw=4:ts=4:et:ai