File Coverage

blib/lib/Net/Stomp.pm
Criterion Covered Total %
statement 272 308 88.3
branch 88 116 75.8
condition 27 44 61.3
subroutine 32 34 94.1
pod 13 13 100.0
total 432 515 83.8


line stmt bran cond sub pod time code
1             package Net::Stomp;
2 11     11   404103 use strict;
  11         103  
  11         324  
3 11     11   56 use warnings;
  11         20  
  11         257  
4 11     11   52 use IO::Select;
  11         20  
  11         222  
5 11     11   4922 use Net::Stomp::Frame;
  11         34  
  11         81  
6 11     11   463 use Carp qw(longmess);
  11         21  
  11         635  
7 11     11   65 use base 'Class::Accessor::Fast';
  11         23  
  11         798  
8 11     11   64 use Log::Any;
  11         28  
  11         86  
9             our $VERSION = '0.61';
10              
11             __PACKAGE__->mk_accessors( qw(
12             current_host failover hostname hosts port select serial session_id socket ssl
13             ssl_options socket_options subscriptions _connect_headers bufsize
14             reconnect_on_fork logger connect_delay
15             reconnect_attempts initial_reconnect_attempts timeout receipt_timeout
16             ) );
17              
18             sub _logconfess {
19 2     2   15 my ($self,@etc) = @_;
20 2         655 my $m = longmess(@etc);
21 2         52 $self->logger->fatal($m);
22 2         213 die $m;
23             }
24             sub _logdie {
25 34     34   830 my ($self,@etc) = @_;
26 34         575 $self->logger->fatal(@etc);
27 34         2354 die "@etc";
28             }
29              
30             sub new {
31 28     28 1 160151 my $class = shift;
32 28         208 my $self = $class->SUPER::new(@_);
33              
34 28 50       985 $self->bufsize(8192) unless $self->bufsize;
35 28 50       1334 $self->connect_delay(5) unless defined $self->connect_delay;
36 28 100       631 $self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork;
37 28 100       1260 $self->reconnect_attempts(0) unless defined $self->reconnect_attempts;
38 28 100       1187 $self->initial_reconnect_attempts(1) unless defined $self->initial_reconnect_attempts;
39 28 50       1176 $self->socket_options({}) unless defined $self->socket_options;
40              
41 28 50       1218 $self->logger(Log::Any->get_logger) unless $self->logger;
42              
43 28         4266 $self->{_framebuf} = "";
44              
45             # We are not subscribed to anything at the start
46 28         591 $self->subscriptions( {} );
47              
48 28         301 $self->select( IO::Select->new );
49 28         822 my @hosts = ();
50              
51             # failover://tcp://primary:61616
52             # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
53              
54 28 100       529 if ($self->failover) {
    100          
55 6         122 my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix;
56              
57 6 100       102 $self->_logconfess("Unable to parse failover uri: " . $self->failover)
58             unless $uris;
59              
60 5         24 foreach my $host (split(/,/,$uris)) {
61 7 100       37 $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || $self->_logconfess("Unable to parse failover component: '$host'");
62 6         22 my ($hostname, $port) = ($1, $2);
63              
64 6         26 push(@hosts, {hostname => $hostname, port => $port});
65             }
66             } elsif ($self->hosts) {
67             ## @hosts is used inside the while loop later to decide whether we have
68             ## cycled through all setup hosts.
69 19         566 @hosts = @{$self->hosts};
  19         328  
70             }
71 26 100       630 $self->hosts(\@hosts) if @hosts;
72              
73 26         290 $self->_get_connection_retrying(1);
74              
75 22         125 return $self;
76             }
77              
78             sub _get_connection_retrying {
79 36     36   89 my ($self,$initial) = @_;
80              
81 36         65 my $tries=0;
82 36         81 while(not eval { $self->_get_connection; 1 }) {
  60         2811  
  31         127  
83 29         86 my $err = $@;$err =~ s{\n\z}{}sm;
  29         162  
84 29         57 ++$tries;
85 29 100       85 if($self->_should_stop_trying($initial,$tries)) {
86             # We've cycled enough. Die now.
87 5         24 $self->_logdie("Failed to connect: $err; giving up");
88             }
89 24         404 $self->logger->warn("Failed to connect: $err; retrying");
90 24         1542 sleep($self->connect_delay);
91             }
92             }
93              
94             sub _should_stop_trying {
95 29     29   67 my ($self,$initial,$tries) = @_;
96              
97 29 100       646 my $max_tries = $initial
98             ? $self->initial_reconnect_attempts
99             : $self->reconnect_attempts;
100              
101 29 100       201 return unless $max_tries > 0; # 0 means forever
102              
103 17 100       307 if (defined $self->hosts) {
104 14         69 $max_tries *= @{$self->hosts}; # try at least once per host
  14         221  
105             }
106 17         103 return $tries >= $max_tries;
107             }
108              
109             my $socket_class;
110             sub _get_connection {
111 52     52   101 my $self = shift;
112 52 100       1192 if (my $hosts = $self->hosts) {
113 48 100 100     1117 if (defined $self->current_host && ($self->current_host < $#{$hosts} ) ) {
  33         932  
114 13         221 $self->current_host($self->current_host+1);
115             } else {
116 35         699 $self->current_host(0);
117             }
118 48         1081 my $h = $hosts->[$self->current_host];
119 48         1041 $self->hostname($h->{hostname});
120 48         1094 $self->port($h->{port});
121 48         1149 $self->ssl($h->{ssl});
122 48   100     1390 $self->ssl_options($h->{ssl_options} || {});
123             }
124 52         505 my $socket = $self->_get_socket;
125 52 100       1055 $self->_logdie("Error connecting to " . $self->hostname . ':' . $self->port . ": $!")
126             unless $socket;
127              
128 23         416 $self->select->remove($self->socket);
129              
130 23         1048 $self->select->add($socket);
131 23         528 $self->socket($socket);
132 23         195 $self->{_pid} = $$;
133             }
134              
135             sub _get_socket {
136 0     0   0 my ($self) = @_;
137 0         0 my $socket;
138              
139 0         0 my $timeout = $self->timeout;
140 0 0       0 $timeout = 5 unless defined $timeout;
141              
142             my %sockopts = (
143             Timeout => $timeout,
144 0         0 %{ $self->socket_options },
  0         0  
145             PeerAddr => $self->hostname,
146             PeerPort => $self->port,
147             Proto => 'tcp',
148             );
149 0         0 my $keep_alive = delete $sockopts{keep_alive};
150              
151 0 0       0 if ( $self->ssl ) {
152 0         0 eval { require IO::Socket::SSL };
  0         0  
153 0 0       0 $self->_logdie(
154             "You should install the IO::Socket::SSL module for SSL support in Net::Stomp"
155             ) if $@;
156 0 0       0 %sockopts = ( %sockopts, %{ $self->ssl_options || {} } );
  0         0  
157 0         0 $self->logger->trace('opening IO::Socket::SSL',\%sockopts);
158 0         0 $socket = IO::Socket::SSL->new(%sockopts);
159             } else {
160             $socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" }
161 0   0     0 || do { require IO::Socket::INET; "IO::Socket::INET" };
      0        
162 0         0 $self->logger->trace("opening $socket_class",\%sockopts);
163 0         0 $socket = $socket_class->new(%sockopts);
164 0 0       0 binmode($socket) if $socket;
165             }
166 0 0       0 if ($keep_alive) {
167 0         0 require Socket;
168 0 0       0 if (Socket->can('SO_KEEPALIVE')) {
169 0         0 $socket->setsockopt(Socket::SOL_SOCKET(),Socket::SO_KEEPALIVE(),1);
170             }
171             else {
172 0         0 $self->logger->warn(q{TCP keep-alive was requested, but the Socket module does not export the SO_KEEPALIVE constant, so we couldn't enable it});
173             }
174             }
175              
176 0         0 return $socket;
177             }
178              
179             sub connect {
180 11     11 1 1317 my ( $self, $conf ) = @_;
181              
182 11         235 $self->logger->trace('connecting');
183 11         672 my $frame = Net::Stomp::Frame->new(
184             { command => 'CONNECT', headers => $conf } );
185 11         63 $self->send_frame($frame);
186 11         48 $frame = $self->receive_frame;
187              
188 11 50 33     233 if ($frame && $frame->command eq 'CONNECTED') {
189 11         251 $self->logger->trace('connected');
190             # Setting initial values for session id, as given from
191             # the stomp server
192 11         704 $self->session_id( $frame->headers->{session} );
193 11         499 $self->_connect_headers( $conf );
194             }
195             else {
196 0   0     0 $self->logger->warn('failed to connect',{ %{$frame // {}} });
  0         0  
197             }
198              
199 11         119 return $frame;
200             }
201              
202             sub _close_socket {
203 14     14   34 my ($self) = @_;
204 14 50       255 return unless $self->socket;
205 14         312 $self->logger->trace('closing socket');
206 14         848 $self->socket->close;
207 14         346 $self->select->remove($self->socket);
208             }
209              
210             sub disconnect {
211 1     1 1 18936 my $self = shift;
212 1         33 $self->logger->trace('disconnecting');
213 1         31 my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
214 1         5 $self->send_frame($frame);
215 1         5 $self->_close_socket;
216 1         25 return 1;
217             }
218              
219             sub _reconnect {
220 10     10   28 my $self = shift;
221 10         42 $self->_close_socket;
222              
223 10         404 $self->logger->warn("reconnecting");
224 10         495 $self->_get_connection_retrying(0);
225             # Both ->connect and ->subscribe can call _reconnect. It *should*
226             # work out fine in the end, worst scenario we send a few subscribe
227             # frame more than once
228 9         173 $self->connect( $self->_connect_headers );
229 9         21 for my $sub(keys %{$self->subscriptions}) {
  9         163  
230 8         210 $self->subscribe($self->subscriptions->{$sub});
231             }
232             }
233              
234             sub can_read {
235 0     0 1 0 my ( $self, $conf ) = @_;
236              
237             # If there is any data left in the framebuffer that we haven't read, return
238             # 'true'. But we don't want to spin endlessly, so only return true the
239             # first time. (Anything touching the _framebuf should update this flag when
240             # it does something.
241 0 0 0     0 if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
242 0         0 $self->{_framebuf_changed} = 0;
243 0         0 return 1;
244             }
245              
246 0   0     0 $conf ||= {};
247 0 0       0 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
248 0   0     0 return $self->select->can_read($timeout) || 0;
249             }
250              
251             sub send {
252 19     19 1 63102 my ( $self, $conf ) = @_;
253 19         79 $conf = { %$conf };
254 19         551 $self->logger->trace('sending',$conf);
255 19         3291 my $body = $conf->{body};
256 19         53 delete $conf->{body};
257 19         169 my $frame = Net::Stomp::Frame->new(
258             { command => 'SEND', headers => $conf, body => $body } );
259 19         111 $self->send_frame($frame);
260 18         120 return 1;
261             }
262              
263             sub send_with_receipt {
264 9     9 1 35272 my ( $self, $conf ) = @_;
265 9         50 $conf = { %$conf };
266              
267             # send the message
268 9         39 my $receipt_id = $self->_get_next_transaction;
269 9         239 $self->logger->debug('sending with receipt',{ receipt => $receipt_id });
270 9         503 $conf->{receipt} = $receipt_id;
271 9 100       123 my $receipt_timeout = exists $conf->{timeout} ? delete $conf->{timeout} : $self->receipt_timeout;
272 9         58 $self->send($conf);
273              
274 9         188 $self->logger->trace('waiting for receipt',$conf);
275             # check the receipt
276 9 100       393 my $receipt_frame = $self->receive_frame({
277             ( defined $receipt_timeout ?
278             ( timeout => $receipt_timeout )
279             : () ),
280             });
281              
282 9 100       62 if (@_ > 2) {
283 4         16 $_[2] = $receipt_frame;
284             }
285              
286 9 100 100     147 if ( $receipt_frame
      100        
287             && $receipt_frame->command eq 'RECEIPT'
288             && $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
289             {
290 2         108 $self->logger->debug('got good receipt',{ %{$receipt_frame} });
  2         21  
291 2         28 return 1;
292             } else {
293 7 100       190 $self->logger->debug('got bad receipt',{ %{$receipt_frame || {} } });
  7         77  
294 7         120 return 0;
295             }
296             }
297              
298             sub send_transactional {
299 4     4 1 46107 my ( $self, $conf ) = @_;
300              
301 4         26 $conf = { %$conf };
302             # begin the transaction
303 4         25 my $transaction_id = $self->_get_next_transaction;
304 4         123 $self->logger->debug('starting transaction',{ transaction => $transaction_id });
305 4         185 my $begin_frame
306             = Net::Stomp::Frame->new(
307             { command => 'BEGIN', headers => { transaction => $transaction_id } }
308             );
309 4         24 $self->send_frame($begin_frame);
310              
311 4         12 $conf->{transaction} = $transaction_id;
312 4         5 my $receipt_frame;
313 4         16 my $ret = $self->send_with_receipt($conf,$receipt_frame);
314              
315 4 50       14 if (@_ > 2) {
316 0         0 $_[2] = $receipt_frame;
317             }
318              
319 4 100       9 if ( $ret ) {
320             # success, commit the transaction
321 1         20 $self->logger->debug('committing transaction',{ transaction => $transaction_id });
322 1         17 my $frame_commit = Net::Stomp::Frame->new(
323             { command => 'COMMIT',
324             headers => { transaction => $transaction_id }
325             }
326             );
327 1         5 $self->send_frame($frame_commit);
328             } else {
329 3         57 $self->logger->debug('rolling back transaction',{ transaction => $transaction_id });
330             # some failure, abort transaction
331 3         46 my $frame_abort = Net::Stomp::Frame->new(
332             { command => 'ABORT',
333             headers => { transaction => $transaction_id }
334             }
335             );
336 3         10 $self->send_frame($frame_abort);
337             }
338 4         26 return $ret;
339             }
340              
341             sub _sub_key {
342 13     13   35 my ($conf) = @_;
343              
344 13 100       43 if ($conf->{id}) { return "id-".$conf->{id} }
  2         9  
345             return "dest-".$conf->{destination}
346 11         40 }
347              
348             sub subscribe {
349 11     11 1 11953 my ( $self, $conf ) = @_;
350 11         264 $self->logger->trace('subscribing',$conf);
351 11         2801 my $frame = Net::Stomp::Frame->new(
352             { command => 'SUBSCRIBE', headers => $conf } );
353 11         51 $self->send_frame($frame);
354 11         201 my $subs = $self->subscriptions;
355 11         81 $subs->{_sub_key($conf)} = $conf;
356 11         43 return 1;
357             }
358              
359             sub unsubscribe {
360 2     2 1 11278 my ( $self, $conf ) = @_;
361 2         55 $self->logger->trace('unsubscribing',$conf);
362 2         36 my $frame = Net::Stomp::Frame->new(
363             { command => 'UNSUBSCRIBE', headers => $conf } );
364 2         10 $self->send_frame($frame);
365 2         40 my $subs = $self->subscriptions;
366 2         11 delete $subs->{_sub_key($conf)};
367 2         6 return 1;
368             }
369              
370             sub ack {
371 2     2 1 21800 my ( $self, $conf ) = @_;
372 2         8 $conf = { %$conf };
373 2         54 my $id = $conf->{frame}->headers->{'message-id'};
374 2         11 delete $conf->{frame};
375 2         37 $self->logger->trace('acking',{ 'message-id' => $id, %$conf });
376 2         43 my $frame = Net::Stomp::Frame->new(
377             { command => 'ACK', headers => { 'message-id' => $id, %$conf } } );
378 2         8 $self->send_frame($frame);
379 2         10 return 1;
380             }
381              
382             sub nack {
383 2     2 1 6499 my ( $self, $conf ) = @_;
384 2         8 $conf = { %$conf };
385 2         54 my $id = $conf->{frame}->headers->{'message-id'};
386 2         42 $self->logger->trace('nacking',{ 'message-id' => $id, %$conf });
387 2         35 delete $conf->{frame};
388 2         12 my $frame = Net::Stomp::Frame->new(
389             { command => 'NACK', headers => { 'message-id' => $id, %$conf } } );
390 2         10 $self->send_frame($frame);
391 2         9 return 1;
392             }
393              
394             sub send_frame {
395 48     48 1 124 my ( $self, $frame ) = @_;
396             # see if we're connected before we try to syswrite()
397 48 100       154 if (not $self->_connected) {
398 5         34 $self->_reconnect;
399 4 50       28 if (not $self->_connected) {
400 0         0 $self->_logdie(q{wasn't connected; couldn't _reconnect()});
401             }
402             }
403             # keep writing until we finish, or get an error
404 47         141 my $to_write = my $frame_string = $frame->as_string;
405 47         73 my $written;
406 47         117 while (length($to_write)) {
407 58         1124 local $SIG{PIPE}='IGNORE'; # just in case writing to a closed
408             # socket kills us
409 58         1483 $written = $self->socket->syswrite($to_write);
410 58 100       699 last unless defined $written;
411 57         930 substr($to_write,0,$written,'');
412             }
413 47 100       162 if (not defined $written) {
414 1         25 $self->logger->warn("error writing frame <<$frame_string>>: $!");
415             }
416 47 100 100     386 unless (defined $written && $self->_connected) {
417 2         12 $self->_reconnect;
418 2         8 $self->send_frame($frame);
419             }
420 47         115 return;
421             }
422              
423             sub _read_data {
424 138     138   255 my ($self, $timeout) = @_;
425              
426 138 100       2588 return unless $self->select->can_read($timeout);
427             my $len = $self->socket->sysread($self->{_framebuf},
428             $self->bufsize,
429 106   100     2902 length($self->{_framebuf} || ''));
430              
431 106 100 100     3503 if (defined $len && $len>0) {
432 103         202 $self->{_framebuf_changed} = 1;
433             }
434             else {
435 3 100       10 if (!defined $len) {
436 2         38 $self->logger->warn("error reading frame: $!");
437             }
438             # EOF or error detected - connection is gone. We have to reset
439             # the framebuf in case we had a partial frame in there that
440             # will never arrive.
441 3         197 $self->_close_socket;
442 3         78 $self->{_framebuf} = "";
443 3         6 delete $self->{_command};
444 3         9 delete $self->{_headers};
445             }
446 106         333 return $len;
447             }
448              
449             sub _read_headers {
450 143     143   238 my ($self) = @_;
451              
452 143 100       319 return 1 if $self->{_headers};
453 133 100       491 if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
454 31         84 $self->{_framebuf_changed} = 1;
455 31         77 my $raw_headers = $1;
456 31 50       150 if ($raw_headers =~ s/^(.+)\n//) {
457 31         102 $self->{_command} = $1;
458             }
459 31         130 foreach my $line (split(/\n/, $raw_headers)) {
460 36         203 my ($key, $value) = split(/\s*:\s*/, $line, 2);
461             $self->{_headers}{$key} = $value
462 36 50       185 unless defined $self->{_headers}{$key};
463             }
464 31         118 return 1;
465             }
466 102         289 return 0;
467             }
468              
469             sub _read_body {
470 67     67   125 my ($self) = @_;
471              
472 67         116 my $h = $self->{_headers};
473 67 100       280 if ($h->{'content-length'}) {
    100          
474 37 100       89 if (length($self->{_framebuf}) > $h->{'content-length'}) {
475 5         8 $self->{_framebuf_changed} = 1;
476             my $body = substr($self->{_framebuf},
477             0,
478 5         15 $h->{'content-length'},
479             '' );
480              
481             # Trim the trailer off the frame.
482 5         24 $self->{_framebuf} =~ s/^.*?\000\n*//s;
483             return Net::Stomp::Frame->new({
484             command => delete $self->{_command},
485             headers => delete $self->{_headers},
486 5         40 body => $body
487             });
488             }
489             } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
490             # No content-length header.
491              
492 26         113 my $body = $1;
493 26         51 $self->{_framebuf_changed} = 1;
494             return Net::Stomp::Frame->new({
495             command => delete $self->{_command},
496             headers => delete $self->{_headers},
497 26         175 body => $body });
498             }
499              
500 36         75 return 0;
501             }
502              
503             # this method is to stop the pointless warnings being thrown when trying to
504             # call peername() on a closed socket, i.e.
505             # getpeername() on closed socket GEN125 at
506             # /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258.
507             #
508             # solution taken from:
509             # http://objectmix.com/perl/80545-warning-getpeername.html
510             sub _connected {
511 164     164   275 my $self = shift;
512              
513 164 100 100     651 return if $self->{_pid} != $$ and $self->reconnect_on_fork;
514              
515 163         261 my $connected;
516             {
517 163         214 local $^W = 0;
  163         578  
518 163         3149 $connected = $self->socket->connected;
519             }
520 163         1583 return $connected;
521             }
522              
523             sub receive_frame {
524 66     66 1 47912 my ($self, $conf) = @_;
525              
526 66         1531 $self->logger->trace('waiting to receive frame',$conf);
527 66 100       2952 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
528              
529 66 100       430 unless ($self->_connected) {
530 3         14 $self->_reconnect;
531             }
532              
533 66         119 my $done = 0;
534 66         185 while ( not $done = $self->_read_headers ) {
535 102 100       292 return undef unless $self->_read_data($timeout);
536             }
537 41         114 while ( not $done = $self->_read_body ) {
538 36 100       68 return undef unless $self->_read_data($timeout);
539             }
540              
541 31         120 return $done;
542             }
543              
544             sub _get_next_transaction {
545 13     13   25 my $self = shift;
546 13   100     376 my $serial = $self->serial || 0;
547 13         117 $serial++;
548 13         242 $self->serial($serial);
549              
550 13   100     295 return ($self->session_id||'nosession') . '-' . $serial;
551             }
552              
553             1;
554              
555             __END__