File Coverage

blib/lib/Net/Stomp.pm
Criterion Covered Total %
statement 253 285 88.7
branch 86 114 75.4
condition 27 42 64.2
subroutine 32 34 94.1
pod 13 13 100.0
total 411 488 84.2


line stmt bran cond sub pod time code
1             package Net::Stomp;
2 11     11   18558 use strict;
  11         16  
  11         270  
3 11     11   39 use warnings;
  11         10  
  11         225  
4 11     11   35 use IO::Select;
  11         14  
  11         175  
5 11     11   3901 use Net::Stomp::Frame;
  11         19  
  11         51  
6 11     11   286 use Carp qw(longmess);
  11         12  
  11         585  
7 11     11   37 use base 'Class::Accessor::Fast';
  11         12  
  11         576  
8 11     11   3840 use Net::Stomp::StupidLogger;
  11         17  
  11         25095  
9             our $VERSION = '0.57';
10              
11             __PACKAGE__->mk_accessors( qw(
12             current_host failover hostname hosts port select serial session_id socket ssl
13             ssl_options socket_options subscriptions _connect_headers bufsize
14             reconnect_on_fork logger connect_delay
15             reconnect_attempts initial_reconnect_attempts timeout receipt_timeout
16             ) );
17              
18             sub _logconfess {
19 2     2   6 my ($self,@etc) = @_;
20 2         165 my $m = longmess(@etc);
21 2         754 $self->logger->fatal($m);
22 2         979 die $m;
23             }
24             sub _logdie {
25 34     34   282 my ($self,@etc) = @_;
26 34         56 $self->logger->fatal(@etc);
27 34         4058 die "@etc";
28             }
29              
30             sub new {
31 28     28 1 35 my $class = shift;
32 28         168 my $self = $class->SUPER::new(@_);
33              
34 28 50       297 $self->bufsize(8192) unless $self->bufsize;
35 28 50       352 $self->connect_delay(5) unless defined $self->connect_delay;
36 28 100       145 $self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork;
37 28 100       199 $self->reconnect_attempts(0) unless defined $self->reconnect_attempts;
38 28 100       218 $self->initial_reconnect_attempts(1) unless defined $self->initial_reconnect_attempts;
39 28 50       206 $self->socket_options({}) unless defined $self->socket_options;
40              
41 28 50       205 $self->logger(Net::Stomp::StupidLogger->new())
42             unless $self->logger;
43              
44 28         133 $self->{_framebuf} = "";
45              
46             # We are not subscribed to anything at the start
47 28         88 $self->subscriptions( {} );
48              
49 28         157 $self->select( IO::Select->new );
50 28         175 my @hosts = ();
51              
52             # failover://tcp://primary:61616
53             # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
54              
55 28 100       60 if ($self->failover) {
    100          
56 6         26 my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix;
57              
58 6 100       65 $self->_logconfess("Unable to parse failover uri: " . $self->failover)
59             unless $uris;
60              
61 5         13 foreach my $host (split(/,/,$uris)) {
62 7 100       26 $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || $self->_logconfess("Unable to parse failover component: '$host'");
63 6         11 my ($hostname, $port) = ($1, $2);
64              
65 6         19 push(@hosts, {hostname => $hostname, port => $port});
66             }
67             } elsif ($self->hosts) {
68             ## @hosts is used inside the while loop later to decide whether we have
69             ## cycled through all setup hosts.
70 19         158 @hosts = @{$self->hosts};
  19         35  
71             }
72 26 100       146 $self->hosts(\@hosts) if @hosts;
73              
74 26         92 $self->_get_connection_retrying(1);
75              
76 22         46 return $self;
77             }
78              
79             sub _get_connection_retrying {
80 36     36   43 my ($self,$initial) = @_;
81              
82 36         37 my $tries=0;
83 36         51 while(not eval { $self->_get_connection; 1 }) {
  60         204  
  31         90  
84 29         60 my $err = $@;$err =~ s{\n\z}{}sm;
  29         107  
85 29         27 ++$tries;
86 29 100       51 if($self->_should_stop_trying($initial,$tries)) {
87             # We've cycled enough. Die now.
88 5         15 $self->_logdie("Failed to connect: $err; giving up");
89             }
90 24         43 $self->logger->warn("Failed to connect: $err; retrying");
91 24         2409 sleep($self->connect_delay);
92             }
93             }
94              
95             sub _should_stop_trying {
96 29     29   34 my ($self,$initial,$tries) = @_;
97              
98 29 100       83 my $max_tries = $initial
99             ? $self->initial_reconnect_attempts
100             : $self->reconnect_attempts;
101              
102 29 100       135 return unless $max_tries > 0; # 0 means forever
103              
104 17 100       35 if (defined $self->hosts) {
105 14         45 $max_tries *= @{$self->hosts}; # try at least once per host
  14         23  
106             }
107 17         76 return $tries >= $max_tries;
108             }
109              
110             my $socket_class;
111             sub _get_connection {
112 52     52   50 my $self = shift;
113 52 100       85 if (my $hosts = $self->hosts) {
114 48 100 100     222 if (defined $self->current_host && ($self->current_host < $#{$hosts} ) ) {
  33         249  
115 13         21 $self->current_host($self->current_host+1);
116             } else {
117 35         129 $self->current_host(0);
118             }
119 48         239 my $h = $hosts->[$self->current_host];
120 48         164 $self->hostname($h->{hostname});
121 48         181 $self->port($h->{port});
122 48         224 $self->ssl($h->{ssl});
123 48   100     346 $self->ssl_options($h->{ssl_options} || {});
124             }
125 52         243 my $socket = $self->_get_socket;
126 52 100       298 $self->_logdie("Error connecting to " . $self->hostname . ':' . $self->port . ": $!")
127             unless $socket;
128              
129 23         52 $self->select->remove($self->socket);
130              
131 23         172 $self->select->add($socket);
132 23         118 $self->socket($socket);
133 23         90 $self->{_pid} = $$;
134             }
135              
136             sub _get_socket {
137 0     0   0 my ($self) = @_;
138 0         0 my $socket;
139              
140 0         0 my $timeout = $self->timeout;
141 0 0       0 $timeout = 5 unless defined $timeout;
142              
143             my %sockopts = (
144             Timeout => $timeout,
145 0         0 %{ $self->socket_options },
  0         0  
146             PeerAddr => $self->hostname,
147             PeerPort => $self->port,
148             Proto => 'tcp',
149             );
150 0         0 my $keep_alive = delete $sockopts{keep_alive};
151              
152 0 0       0 if ( $self->ssl ) {
153 0         0 eval { require IO::Socket::SSL };
  0         0  
154 0 0       0 $self->_logdie(
155             "You should install the IO::Socket::SSL module for SSL support in Net::Stomp"
156             ) if $@;
157 0 0       0 %sockopts = ( %sockopts, %{ $self->ssl_options || {} } );
  0         0  
158 0         0 $socket = IO::Socket::SSL->new(%sockopts);
159             } else {
160             $socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" }
161 0   0     0 || do { require IO::Socket::INET; "IO::Socket::INET" };
      0        
162 0         0 $socket = $socket_class->new(%sockopts);
163 0 0       0 binmode($socket) if $socket;
164             }
165 0 0       0 if ($keep_alive) {
166 0         0 require Socket;
167 0 0       0 if (Socket->can('SO_KEEPALIVE')) {
168 0         0 $socket->setsockopt(Socket::SOL_SOCKET(),Socket::SO_KEEPALIVE(),1);
169             }
170             else {
171 0         0 $self->logger->warn(q{TCP keep-alive was requested, but the Socket module does not export the SO_KEEPALIVE constant, so we couldn't enable it});
172             }
173             }
174              
175 0         0 return $socket;
176             }
177              
178             sub connect {
179 11     11 1 657 my ( $self, $conf ) = @_;
180              
181 11         52 my $frame = Net::Stomp::Frame->new(
182             { command => 'CONNECT', headers => $conf } );
183 11         36 $self->send_frame($frame);
184 11         21 $frame = $self->receive_frame;
185              
186 11 50 33     38 if ($frame && $frame->command eq 'CONNECTED') {
187             # Setting initial values for session id, as given from
188             # the stomp server
189 11         55 $self->session_id( $frame->headers->{session} );
190 11         70 $self->_connect_headers( $conf );
191             }
192              
193 11         63 return $frame;
194             }
195              
196             sub _close_socket {
197 14     14   17 my ($self) = @_;
198 14 50       22 return unless $self->socket;
199 14         61 $self->socket->close;
200 14         70 $self->select->remove($self->socket);
201             }
202              
203             sub disconnect {
204 1     1 1 14553 my $self = shift;
205 1         14 my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
206 1         5 $self->send_frame($frame);
207 1         6 $self->_close_socket;
208 1         14 return 1;
209             }
210              
211             sub _reconnect {
212 10     10   9 my $self = shift;
213 10         17 $self->_close_socket;
214              
215 10         54 $self->logger->warn("reconnecting");
216 10         715 $self->_get_connection_retrying(0);
217             # Both ->connect and ->subscribe can call _reconnect. It *should*
218             # work out fine in the end, worst scenario we send a few subscribe
219             # frame more than once
220 9         20 $self->connect( $self->_connect_headers );
221 9         10 for my $sub(keys %{$self->subscriptions}) {
  9         16  
222 8         28 $self->subscribe($self->subscriptions->{$sub});
223             }
224             }
225              
226             sub can_read {
227 0     0 1 0 my ( $self, $conf ) = @_;
228              
229             # If there is any data left in the framebuffer that we haven't read, return
230             # 'true'. But we don't want to spin endlessly, so only return true the
231             # first time. (Anything touching the _framebuf should update this flag when
232             # it does something.
233 0 0 0     0 if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
234 0         0 $self->{_framebuf_changed} = 0;
235 0         0 return 1;
236             }
237              
238 0   0     0 $conf ||= {};
239 0 0       0 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
240 0   0     0 return $self->select->can_read($timeout) || 0;
241             }
242              
243             sub send {
244 19     19 1 29038 my ( $self, $conf ) = @_;
245 19         50 $conf = { %$conf };
246 19         31 my $body = $conf->{body};
247 19         24 delete $conf->{body};
248 19         99 my $frame = Net::Stomp::Frame->new(
249             { command => 'SEND', headers => $conf, body => $body } );
250 19         46 $self->send_frame($frame);
251 18         51 return 1;
252             }
253              
254             sub send_with_receipt {
255 9     9 1 20864 my ( $self, $conf ) = @_;
256 9         28 $conf = { %$conf };
257              
258             # send the message
259 9         31 my $receipt_id = $self->_get_next_transaction;
260 9         58 $conf->{receipt} = $receipt_id;
261 9 100       31 my $receipt_timeout = exists $conf->{timeout} ? delete $conf->{timeout} : $self->receipt_timeout;
262 9         27 $self->send($conf);
263              
264             # check the receipt
265 9 100       34 my $receipt_frame = $self->receive_frame({
266             ( defined $receipt_timeout ?
267             ( timeout => $receipt_timeout )
268             : () ),
269             });
270              
271 9 100       35 if (@_ > 2) {
272 4         5 $_[2] = $receipt_frame;
273             }
274              
275 9 100 100     31 if ( $receipt_frame
      100        
276             && $receipt_frame->command eq 'RECEIPT'
277             && $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
278             {
279 2         26 return 1;
280             } else {
281 7         45 return 0;
282             }
283             }
284              
285             sub send_transactional {
286 4     4 1 22344 my ( $self, $conf ) = @_;
287              
288 4         11 $conf = { %$conf };
289             # begin the transaction
290 4         11 my $transaction_id = $self->_get_next_transaction;
291 4         48 my $begin_frame
292             = Net::Stomp::Frame->new(
293             { command => 'BEGIN', headers => { transaction => $transaction_id } }
294             );
295 4         10 $self->send_frame($begin_frame);
296              
297 4         4 $conf->{transaction} = $transaction_id;
298 4         3 my $receipt_frame;
299 4         7 my $ret = $self->send_with_receipt($conf,$receipt_frame);
300              
301 4 50       12 if (@_ > 2) {
302 0         0 $_[2] = $receipt_frame;
303             }
304              
305 4 100       5 if ( $ret ) {
306             # success, commit the transaction
307 1         4 my $frame_commit = Net::Stomp::Frame->new(
308             { command => 'COMMIT',
309             headers => { transaction => $transaction_id }
310             }
311             );
312 1         2 $self->send_frame($frame_commit);
313             } else {
314             # some failure, abort transaction
315 3         11 my $frame_abort = Net::Stomp::Frame->new(
316             { command => 'ABORT',
317             headers => { transaction => $transaction_id }
318             }
319             );
320 3         5 $self->send_frame($frame_abort);
321             }
322 4         12 return $ret;
323             }
324              
325             sub _sub_key {
326 13     13   9 my ($conf) = @_;
327              
328 13 100       25 if ($conf->{id}) { return "id-".$conf->{id} }
  2         7  
329             return "dest-".$conf->{destination}
330 11         21 }
331              
332             sub subscribe {
333 11     11 1 6263 my ( $self, $conf ) = @_;
334 11         31 my $frame = Net::Stomp::Frame->new(
335             { command => 'SUBSCRIBE', headers => $conf } );
336 11         22 $self->send_frame($frame);
337 11         20 my $subs = $self->subscriptions;
338 11         38 $subs->{_sub_key($conf)} = $conf;
339 11         20 return 1;
340             }
341              
342             sub unsubscribe {
343 2     2 1 6579 my ( $self, $conf ) = @_;
344 2         8 my $frame = Net::Stomp::Frame->new(
345             { command => 'UNSUBSCRIBE', headers => $conf } );
346 2         5 $self->send_frame($frame);
347 2         7 my $subs = $self->subscriptions;
348 2         7 delete $subs->{_sub_key($conf)};
349 2         3 return 1;
350             }
351              
352             sub ack {
353 2     2 1 15814 my ( $self, $conf ) = @_;
354 2         5 $conf = { %$conf };
355 2         5 my $id = $conf->{frame}->headers->{'message-id'};
356 2         7 delete $conf->{frame};
357 2         11 my $frame = Net::Stomp::Frame->new(
358             { command => 'ACK', headers => { 'message-id' => $id, %$conf } } );
359 2         6 $self->send_frame($frame);
360 2         5 return 1;
361             }
362              
363             sub nack {
364 2     2 1 3661 my ( $self, $conf ) = @_;
365 2         5 $conf = { %$conf };
366 2         6 my $id = $conf->{frame}->headers->{'message-id'};
367 2         7 delete $conf->{frame};
368 2         8 my $frame = Net::Stomp::Frame->new(
369             { command => 'NACK', headers => { 'message-id' => $id, %$conf } } );
370 2         5 $self->send_frame($frame);
371 2         6 return 1;
372             }
373              
374             sub send_frame {
375 48     48 1 49 my ( $self, $frame ) = @_;
376             # see if we're connected before we try to syswrite()
377 48 100       81 if (not $self->_connected) {
378 5         17 $self->_reconnect;
379 4 50       9 if (not $self->_connected) {
380 0         0 $self->_logdie(q{wasn't connected; couldn't _reconnect()});
381             }
382             }
383             # keep writing until we finish, or get an error
384 47         83 my $to_write = my $frame_string = $frame->as_string;
385 47         35 my $written;
386 47         75 while (length($to_write)) {
387 58         412 local $SIG{PIPE}='IGNORE'; # just in case writing to a closed
388             # socket kills us
389 58         98 $written = $self->socket->syswrite($to_write);
390 58 100       416 last unless defined $written;
391 57         286 substr($to_write,0,$written,'');
392             }
393 47 100       71 if (not defined $written) {
394 1         2 $self->logger->warn("error writing frame <<$frame_string>>: $!");
395             }
396 47 100 100     172 unless (defined $written && $self->_connected) {
397 2         4 $self->_reconnect;
398 2         4 $self->send_frame($frame);
399             }
400 47         88 return;
401             }
402              
403             sub _read_data {
404 138     138   112 my ($self, $timeout) = @_;
405              
406 138 100       196 return unless $self->select->can_read($timeout);
407             my $len = $self->socket->sysread($self->{_framebuf},
408             $self->bufsize,
409 106   100     743 length($self->{_framebuf} || ''));
410              
411 106 100 100     1348 if (defined $len && $len>0) {
412 103         97 $self->{_framebuf_changed} = 1;
413             }
414             else {
415 3 100       7 if (!defined $len) {
416 2         3 $self->logger->warn("error reading frame: $!");
417             }
418             # EOF or error detected - connection is gone. We have to reset
419             # the framebuf in case we had a partial frame in there that
420             # will never arrive.
421 3         111 $self->_close_socket;
422 3         15 $self->{_framebuf} = "";
423 3         2 delete $self->{_command};
424 3         3 delete $self->{_headers};
425             }
426 106         225 return $len;
427             }
428              
429             sub _read_headers {
430 143     143   108 my ($self) = @_;
431              
432 143 100       216 return 1 if $self->{_headers};
433 133 100       353 if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
434 31         33 $self->{_framebuf_changed} = 1;
435 31         52 my $raw_headers = $1;
436 31 50       105 if ($raw_headers =~ s/^(.+)\n//) {
437 31         58 $self->{_command} = $1;
438             }
439 31         92 foreach my $line (split(/\n/, $raw_headers)) {
440 36         141 my ($key, $value) = split(/\s*:\s*/, $line, 2);
441             $self->{_headers}{$key} = $value
442 36 50       136 unless defined $self->{_headers}{$key};
443             }
444 31         77 return 1;
445             }
446 102         158 return 0;
447             }
448              
449             sub _read_body {
450 67     67   50 my ($self) = @_;
451              
452 67         55 my $h = $self->{_headers};
453 67 100       181 if ($h->{'content-length'}) {
    100          
454 37 100       70 if (length($self->{_framebuf}) > $h->{'content-length'}) {
455 5         5 $self->{_framebuf_changed} = 1;
456             my $body = substr($self->{_framebuf},
457             0,
458 5         9 $h->{'content-length'},
459             '' );
460              
461             # Trim the trailer off the frame.
462 5         15 $self->{_framebuf} =~ s/^.*?\000\n*//s;
463             return Net::Stomp::Frame->new({
464             command => delete $self->{_command},
465             headers => delete $self->{_headers},
466 5         24 body => $body
467             });
468             }
469             } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
470             # No content-length header.
471              
472 26         44 my $body = $1;
473 26         27 $self->{_framebuf_changed} = 1;
474             return Net::Stomp::Frame->new({
475             command => delete $self->{_command},
476             headers => delete $self->{_headers},
477 26         113 body => $body });
478             }
479              
480 36         52 return 0;
481             }
482              
483             # this method is to stop the pointless warnings being thrown when trying to
484             # call peername() on a closed socket, i.e.
485             # getpeername() on closed socket GEN125 at
486             # /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258.
487             #
488             # solution taken from:
489             # http://objectmix.com/perl/80545-warning-getpeername.html
490             sub _connected {
491 164     164   144 my $self = shift;
492              
493 164 100 100     429 return if $self->{_pid} != $$ and $self->reconnect_on_fork;
494              
495 163         116 my $connected;
496             {
497 163         123 local $^W = 0;
  163         312  
498 163         253 $connected = $self->socket->connected;
499             }
500 163         862 return $connected;
501             }
502              
503             sub receive_frame {
504 66     66 1 23566 my ($self, $conf) = @_;
505              
506 66 100       206 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
507              
508 66 100       207 unless ($self->_connected) {
509 3         5 $self->_reconnect;
510             }
511              
512 66         57 my $done = 0;
513 66         110 while ( not $done = $self->_read_headers ) {
514 102 100       122 return undef unless $self->_read_data($timeout);
515             }
516 41         82 while ( not $done = $self->_read_body ) {
517 36 100       44 return undef unless $self->_read_data($timeout);
518             }
519              
520 31         73 return $done;
521             }
522              
523             sub _get_next_transaction {
524 13     13   12 my $self = shift;
525 13   100     31 my $serial = $self->serial || 0;
526 13         71 $serial++;
527 13         23 $self->serial($serial);
528              
529 13   100     59 return ($self->session_id||'nosession') . '-' . $serial;
530             }
531              
532             1;
533              
534             __END__