File Coverage

blib/lib/Net/Stomp.pm
Criterion Covered Total %
statement 275 311 88.4
branch 90 120 75.0
condition 27 47 57.4
subroutine 32 34 94.1
pod 13 13 100.0
total 437 525 83.2


line stmt bran cond sub pod time code
1             package Net::Stomp;
2 12     12   725635 use strict;
  12         20  
  12         410  
3 12     12   54 use warnings;
  12         22  
  12         539  
4 12     12   421 use IO::Select;
  12         1406  
  12         324  
5 12     12   5125 use Net::Stomp::Frame;
  12         37  
  12         81  
6 12     12   512 use Carp qw(longmess);
  12         20  
  12         834  
7 12     12   75 use base 'Class::Accessor::Fast';
  12         21  
  12         951  
8 12     12   655 use Log::Any;
  12         11886  
  12         96  
9             our $VERSION = '0.62';
10              
11             __PACKAGE__->mk_accessors( qw(
12             current_host failover hostname hosts port select serial session_id socket ssl
13             ssl_options socket_options subscriptions _connect_headers bufsize
14             reconnect_on_fork logger connect_delay
15             reconnect_attempts initial_reconnect_attempts timeout receipt_timeout
16             ) );
17              
18             sub _logconfess {
19 2     2   14 my ($self,@etc) = @_;
20 2         865 my $m = longmess(@etc);
21 2         82 $self->logger->fatal($m);
22 2         255 die $m;
23             }
24             sub _logdie {
25 34     34   788 my ($self,@etc) = @_;
26 34         525 $self->logger->fatal(@etc);
27 34         2394 die "@etc";
28             }
29              
30             sub new {
31 28     28 1 3041175 my $class = shift;
32 28         269 my $self = $class->SUPER::new(@_);
33              
34 28 50       1181 $self->bufsize(8192) unless $self->bufsize;
35 28 50       1470 $self->connect_delay(5) unless defined $self->connect_delay;
36 28 100       807 $self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork;
37 28 100       1266 $self->reconnect_attempts(0) unless defined $self->reconnect_attempts;
38 28 100       1371 $self->initial_reconnect_attempts(1) unless defined $self->initial_reconnect_attempts;
39 28 50       1240 $self->socket_options({}) unless defined $self->socket_options;
40              
41 28 50       1357 $self->logger(Log::Any->get_logger) unless $self->logger;
42              
43 28         4671 $self->{_framebuf} = "";
44              
45             # We are not subscribed to anything at the start
46 28         647 $self->subscriptions( {} );
47              
48 28         391 $self->select( IO::Select->new );
49 28         886 my @hosts = ();
50              
51             # failover://tcp://primary:61616
52             # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
53              
54 28 100       579 if ($self->failover) {
    100          
55 6         182 my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix;
56              
57 6 100       146 $self->_logconfess("Unable to parse failover uri: " . $self->failover)
58             unless $uris;
59              
60 5         8 my %host_options;
61 5 50       124 $host_options{'ssl'} = $self->ssl if $self->ssl;
62 5 50       137 $host_options{'ssl_options'} = $self->ssl_options if $self->ssl_options;
63 5         42 foreach my $host (split(/,/,$uris)) {
64 7 100       43 $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || $self->_logconfess("Unable to parse failover component: '$host'");
65 6         20 my ($hostname, $port) = ($1, $2);
66              
67 6         36 push(@hosts, {%host_options, hostname => $hostname, port => $port});
68             }
69             } elsif ($self->hosts) {
70             ## @hosts is used inside the while loop later to decide whether we have
71             ## cycled through all setup hosts.
72 19         526 @hosts = @{$self->hosts};
  19         337  
73             }
74 26 100       603 $self->hosts(\@hosts) if @hosts;
75              
76 26         219 $self->_get_connection_retrying(1);
77              
78 22         149 return $self;
79             }
80              
81             sub _get_connection_retrying {
82 36     36   86 my ($self,$initial) = @_;
83              
84 36         61 my $tries=0;
85 36         73 while(not eval { $self->_get_connection; 1 }) {
  60         2473  
  31         128  
86 29         63 my $err = $@;$err =~ s{\n\z}{}sm;
  29         166  
87 29         44 ++$tries;
88 29 100       79 if($self->_should_stop_trying($initial,$tries)) {
89             # We've cycled enough. Die now.
90 5         18 $self->_logdie("Failed to connect: $err; giving up");
91             }
92 24         418 $self->logger->warn("Failed to connect: $err; retrying");
93 24         1545 sleep($self->connect_delay);
94             }
95             }
96              
97             sub _should_stop_trying {
98 29     29   62 my ($self,$initial,$tries) = @_;
99              
100 29 100       908 my $max_tries = $initial
101             ? $self->initial_reconnect_attempts
102             : $self->reconnect_attempts;
103              
104 29 100       203 return unless $max_tries > 0; # 0 means forever
105              
106 17 100       289 if (defined $self->hosts) {
107 14         65 $max_tries *= @{$self->hosts}; # try at least once per host
  14         215  
108             }
109 17         95 return $tries >= $max_tries;
110             }
111              
112             my $socket_class;
113             sub _get_connection {
114 52     52   92 my $self = shift;
115 52 100       1098 if (my $hosts = $self->hosts) {
116 48 100 100     1147 if (defined $self->current_host && ($self->current_host < $#{$hosts} ) ) {
  33         838  
117 13         303 $self->current_host($self->current_host+1);
118             } else {
119 35         667 $self->current_host(0);
120             }
121 48         990 my $h = $hosts->[$self->current_host];
122 48         903 $self->hostname($h->{hostname});
123 48         1148 $self->port($h->{port});
124 48         1155 $self->ssl($h->{ssl});
125 48   100     1284 $self->ssl_options($h->{ssl_options} || {});
126             }
127 52         433 my $socket = $self->_get_socket;
128 52 100       1107 $self->_logdie("Error connecting to " . $self->hostname . ':' . $self->port . ": $!")
129             unless $socket;
130              
131 23         478 $self->select->remove($self->socket);
132              
133 23         954 $self->select->add($socket);
134 23         523 $self->socket($socket);
135 23         285 $self->{_pid} = $$;
136             }
137              
138             sub _get_socket {
139 0     0   0 my ($self) = @_;
140 0         0 my $socket;
141              
142 0         0 my $timeout = $self->timeout;
143 0 0       0 $timeout = 5 unless defined $timeout;
144              
145             my %sockopts = (
146             Timeout => $timeout,
147 0         0 %{ $self->socket_options },
  0         0  
148             PeerAddr => $self->hostname,
149             PeerPort => $self->port,
150             Proto => 'tcp',
151             );
152 0         0 my $keep_alive = delete $sockopts{keep_alive};
153              
154 0 0       0 if ( $self->ssl ) {
155 0         0 eval { require IO::Socket::SSL };
  0         0  
156 0 0       0 $self->_logdie(
157             "You should install the IO::Socket::SSL module for SSL support in Net::Stomp"
158             ) if $@;
159 0 0       0 %sockopts = ( %sockopts, %{ $self->ssl_options || {} } );
  0         0  
160 0         0 $self->logger->trace('opening IO::Socket::SSL',\%sockopts);
161 0         0 $socket = IO::Socket::SSL->new(%sockopts);
162             } else {
163             $socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" }
164 0   0     0 || do { require IO::Socket::INET; "IO::Socket::INET" };
      0        
165 0         0 $self->logger->trace("opening $socket_class",\%sockopts);
166 0         0 $socket = $socket_class->new(%sockopts);
167 0 0       0 binmode($socket) if $socket;
168             }
169 0 0 0     0 if ($socket && $keep_alive) {
170 0         0 require Socket;
171 0 0       0 if (Socket->can('SO_KEEPALIVE')) {
172 0         0 $socket->setsockopt(Socket::SOL_SOCKET(),Socket::SO_KEEPALIVE(),1);
173             }
174             else {
175 0         0 $self->logger->warn(q{TCP keep-alive was requested, but the Socket module does not export the SO_KEEPALIVE constant, so we couldn't enable it});
176             }
177             }
178              
179 0         0 return $socket;
180             }
181              
182             sub connect {
183 11     11 1 1452 my ( $self, $conf ) = @_;
184              
185 11         174 $self->logger->trace('connecting');
186 11         563 my $frame = Net::Stomp::Frame->new(
187             { command => 'CONNECT', headers => $conf } );
188 11         70 $self->send_frame($frame);
189 11         36 $frame = $self->receive_frame;
190              
191 11 50 33     180 if ($frame && $frame->command eq 'CONNECTED') {
192 11         207 $self->logger->trace('connected');
193             # Setting initial values for session id, as given from
194             # the stomp server
195 11         670 $self->session_id( $frame->headers->{session} );
196 11         458 $self->_connect_headers( $conf );
197             }
198             else {
199 0   0     0 $self->logger->warn('failed to connect',{ %{$frame // {}} });
  0         0  
200             }
201              
202 11         90 return $frame;
203             }
204              
205             sub _close_socket {
206 14     14   57 my ($self) = @_;
207 14 50       274 return unless $self->socket;
208 14         261 $self->logger->trace('closing socket');
209 14         766 $self->socket->close;
210 14         340 $self->select->remove($self->socket);
211             }
212              
213             sub disconnect {
214 1     1 1 19322 my $self = shift;
215 1         33 $self->logger->trace('disconnecting');
216 1         35 my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } );
217 1         7 $self->send_frame($frame);
218 1         4 $self->_close_socket;
219 1         21 return 1;
220             }
221              
222             sub _reconnect {
223 10     10   19 my $self = shift;
224 10         89 $self->_close_socket;
225              
226 10         367 $self->logger->warn("reconnecting");
227 10         525 $self->_get_connection_retrying(0);
228             # Both ->connect and ->subscribe can call _reconnect. It *should*
229             # work out fine in the end, worst scenario we send a few subscribe
230             # frame more than once
231 9         153 $self->connect( $self->_connect_headers );
232 9         18 for my $sub(keys %{$self->subscriptions}) {
  9         178  
233 8         178 $self->subscribe($self->subscriptions->{$sub});
234             }
235             }
236              
237             sub can_read {
238 0     0 1 0 my ( $self, $conf ) = @_;
239              
240             # If there is any data left in the framebuffer that we haven't read, return
241             # 'true'. But we don't want to spin endlessly, so only return true the
242             # first time. (Anything touching the _framebuf should update this flag when
243             # it does something.
244 0 0 0     0 if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
245 0         0 $self->{_framebuf_changed} = 0;
246 0         0 return 1;
247             }
248              
249 0   0     0 $conf ||= {};
250 0 0       0 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
251 0   0     0 return $self->select->can_read($timeout) || 0;
252             }
253              
254             sub send {
255 19     19 1 68263 my ( $self, $conf ) = @_;
256 19         81 $conf = { %$conf };
257 19         624 $self->logger->trace('sending',$conf);
258 19         3033 my $body = $conf->{body};
259 19         43 delete $conf->{body};
260 19         165 my $frame = Net::Stomp::Frame->new(
261             { command => 'SEND', headers => $conf, body => $body } );
262 19         104 $self->send_frame($frame);
263 18         86 return 1;
264             }
265              
266             sub send_with_receipt {
267 9     9 1 34668 my ( $self, $conf ) = @_;
268 9         68 $conf = { %$conf };
269              
270             # send the message
271 9         39 my $receipt_id = $self->_get_next_transaction;
272 9         273 $self->logger->debug('sending with receipt',{ receipt => $receipt_id });
273 9         537 $conf->{receipt} = $receipt_id;
274 9 100       159 my $receipt_timeout = exists $conf->{timeout} ? delete $conf->{timeout} : $self->receipt_timeout;
275 9         72 $self->send($conf);
276              
277 9         166 $self->logger->trace('waiting for receipt',$conf);
278             # check the receipt
279 9 100       312 my $receipt_frame = $self->receive_frame({
280             ( defined $receipt_timeout ?
281             ( timeout => $receipt_timeout )
282             : () ),
283             });
284              
285 9 100       71 if (@_ > 2) {
286 4         12 $_[2] = $receipt_frame;
287             }
288              
289 9 100 100     197 if ( $receipt_frame
      100        
290             && $receipt_frame->command eq 'RECEIPT'
291             && $receipt_frame->headers->{'receipt-id'} eq $receipt_id )
292             {
293 2         189 $self->logger->debug('got good receipt',{ %{$receipt_frame} });
  2         27  
294 2         37 return 1;
295             } else {
296 7 100       206 $self->logger->debug('got bad receipt',{ %{$receipt_frame || {} } });
  7         75  
297 7         107 return 0;
298             }
299             }
300              
301             sub send_transactional {
302 4     4 1 61227 my ( $self, $conf ) = @_;
303              
304 4         17 $conf = { %$conf };
305             # begin the transaction
306 4         31 my $transaction_id = $self->_get_next_transaction;
307 4         181 $self->logger->debug('starting transaction',{ transaction => $transaction_id });
308 4         250 my $begin_frame
309             = Net::Stomp::Frame->new(
310             { command => 'BEGIN', headers => { transaction => $transaction_id } }
311             );
312 4         31 $self->send_frame($begin_frame);
313              
314 4         14 $conf->{transaction} = $transaction_id;
315 4         6 my $receipt_frame;
316 4         20 my $ret = $self->send_with_receipt($conf,$receipt_frame);
317              
318 4 50       15 if (@_ > 2) {
319 0         0 $_[2] = $receipt_frame;
320             }
321              
322 4 100       12 if ( $ret ) {
323             # success, commit the transaction
324 1         36 $self->logger->debug('committing transaction',{ transaction => $transaction_id });
325 1         21 my $frame_commit = Net::Stomp::Frame->new(
326             { command => 'COMMIT',
327             headers => { transaction => $transaction_id }
328             }
329             );
330 1         5 $self->send_frame($frame_commit);
331             } else {
332 3         78 $self->logger->debug('rolling back transaction',{ transaction => $transaction_id });
333             # some failure, abort transaction
334 3         63 my $frame_abort = Net::Stomp::Frame->new(
335             { command => 'ABORT',
336             headers => { transaction => $transaction_id }
337             }
338             );
339 3         13 $self->send_frame($frame_abort);
340             }
341 4         28 return $ret;
342             }
343              
344             sub _sub_key {
345 13     13   28 my ($conf) = @_;
346              
347 13 100       39 if ($conf->{id}) { return "id-".$conf->{id} }
  2         11  
348             return "dest-".$conf->{destination}
349 11         36 }
350              
351             sub subscribe {
352 11     11 1 12167 my ( $self, $conf ) = @_;
353 11         159 $self->logger->trace('subscribing',$conf);
354 11         2130 my $frame = Net::Stomp::Frame->new(
355             { command => 'SUBSCRIBE', headers => $conf } );
356 11         41 $self->send_frame($frame);
357 11         206 my $subs = $self->subscriptions;
358 11         70 $subs->{_sub_key($conf)} = $conf;
359 11         36 return 1;
360             }
361              
362             sub unsubscribe {
363 2     2 1 9760 my ( $self, $conf ) = @_;
364 2         80 $self->logger->trace('unsubscribing',$conf);
365 2         81 my $frame = Net::Stomp::Frame->new(
366             { command => 'UNSUBSCRIBE', headers => $conf } );
367 2         13 $self->send_frame($frame);
368 2         56 my $subs = $self->subscriptions;
369 2         13 delete $subs->{_sub_key($conf)};
370 2         7 return 1;
371             }
372              
373             sub ack {
374 2     2 1 26099 my ( $self, $conf ) = @_;
375 2         6 $conf = { %$conf };
376 2         109 my $id = $conf->{frame}->headers->{'message-id'};
377 2         16 delete $conf->{frame};
378 2         73 $self->logger->trace('acking',{ 'message-id' => $id, %$conf });
379 2         67 my $frame = Net::Stomp::Frame->new(
380             { command => 'ACK', headers => { 'message-id' => $id, %$conf } } );
381 2         12 $self->send_frame($frame);
382 2         13 return 1;
383             }
384              
385             sub nack {
386 2     2 1 6072 my ( $self, $conf ) = @_;
387 2         8 $conf = { %$conf };
388 2         78 my $id = $conf->{frame}->headers->{'message-id'};
389 2         61 $self->logger->trace('nacking',{ 'message-id' => $id, %$conf });
390 2         72 delete $conf->{frame};
391 2         43 my $frame = Net::Stomp::Frame->new(
392             { command => 'NACK', headers => { 'message-id' => $id, %$conf } } );
393 2         15 $self->send_frame($frame);
394 2         12 return 1;
395             }
396              
397             sub send_frame {
398 48     48 1 129 my ( $self, $frame ) = @_;
399             # see if we're connected before we try to syswrite()
400 48 100       119 if (not $self->_connected) {
401 5         42 $self->_reconnect;
402 4 50       18 if (not $self->_connected) {
403 0         0 $self->_logdie(q{wasn't connected; couldn't _reconnect()});
404             }
405             }
406             # keep writing until we finish, or get an error
407 47         148 my $to_write = my $frame_string = $frame->as_string;
408 47         75 my $written;
409 47         113 while (length($to_write)) {
410 58         684 local $SIG{PIPE}='IGNORE'; # just in case writing to a closed
411             # socket kills us
412 58         1087 $written = $self->socket->syswrite($to_write);
413 58 100       570 last unless defined $written;
414 57         519 substr($to_write,0,$written,'');
415             }
416 47 100       109 if (not defined $written) {
417 1         16 $self->logger->warn("error writing frame <<$frame_string>>: $!");
418             }
419 47 100 100     261 unless (defined $written && $self->_connected) {
420 2         10 $self->_reconnect;
421 2         5 $self->send_frame($frame);
422             }
423 47         111 return;
424             }
425              
426             sub _read_data {
427 138     138   267 my ($self, $timeout) = @_;
428              
429 138 100       2831 return unless $self->select->can_read($timeout);
430             my $len = $self->socket->sysread($self->{_framebuf},
431             $self->bufsize,
432 106   100     2990 length($self->{_framebuf} || ''));
433              
434 106 100 100     3733 if (defined $len && $len>0) {
435 103         207 $self->{_framebuf_changed} = 1;
436             }
437             else {
438 3 100       8 if (!defined $len) {
439 2         27 $self->logger->warn("error reading frame: $!");
440             }
441             # EOF or error detected - connection is gone. We have to reset
442             # the framebuf in case we had a partial frame in there that
443             # will never arrive.
444 3         102 $self->_close_socket;
445 3         53 $self->{_framebuf} = "";
446 3         6 delete $self->{_command};
447 3         6 delete $self->{_headers};
448             }
449 106         327 return $len;
450             }
451              
452             sub _read_headers {
453 143     143   303 my ($self) = @_;
454              
455 143 100       342 return 1 if $self->{_headers};
456 133 100       536 if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
457 31         52 $self->{_framebuf_changed} = 1;
458 31         88 my $raw_headers = $1;
459 31 50       150 if ($raw_headers =~ s/^(.+)\n//) {
460 31         93 $self->{_command} = $1;
461             }
462 31         124 foreach my $line (split(/\n/, $raw_headers)) {
463 36         221 my ($key, $value) = split(/\s*:\s*/, $line, 2);
464             $self->{_headers}{$key} = $value
465 36 50       186 unless defined $self->{_headers}{$key};
466             }
467 31         97 return 1;
468             }
469 102         289 return 0;
470             }
471              
472             sub _read_body {
473 67     67   150 my ($self) = @_;
474              
475 67         111 my $h = $self->{_headers};
476 67 100       227 if ($h->{'content-length'}) {
    100          
477 37 100       95 if (length($self->{_framebuf}) > $h->{'content-length'}) {
478 5         8 $self->{_framebuf_changed} = 1;
479             my $body = substr($self->{_framebuf},
480             0,
481 5         14 $h->{'content-length'},
482             '' );
483              
484             # Trim the trailer off the frame.
485 5         22 $self->{_framebuf} =~ s/^.*?\000\n*//s;
486             return Net::Stomp::Frame->new({
487             command => delete $self->{_command},
488             headers => delete $self->{_headers},
489 5         40 body => $body
490             });
491             }
492             } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
493             # No content-length header.
494              
495 26         50 my $body = $1;
496 26         95 $self->{_framebuf_changed} = 1;
497             return Net::Stomp::Frame->new({
498             command => delete $self->{_command},
499             headers => delete $self->{_headers},
500 26         170 body => $body });
501             }
502              
503 36         70 return 0;
504             }
505              
506             # this method is to stop the pointless warnings being thrown when trying to
507             # call peername() on a closed socket, i.e.
508             # getpeername() on closed socket GEN125 at
509             # /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258.
510             #
511             # solution taken from:
512             # http://objectmix.com/perl/80545-warning-getpeername.html
513             sub _connected {
514 164     164   231 my $self = shift;
515              
516 164 100 100     1087 return if $self->{_pid} != $$ and $self->reconnect_on_fork;
517              
518 163         235 my $connected;
519             {
520 163         193 local $^W = 0;
  163         591  
521 163         3529 $connected = $self->socket->connected;
522             }
523 163         1613 return $connected;
524             }
525              
526             sub receive_frame {
527 66     66 1 56848 my ($self, $conf) = @_;
528              
529 66         2135 $self->logger->trace('waiting to receive frame',$conf);
530 66 100       3670 my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout;
531              
532 66 100       436 unless ($self->_connected) {
533 3         15 $self->_reconnect;
534             }
535              
536 66         118 my $done = 0;
537 66         208 while ( not $done = $self->_read_headers ) {
538 102 100       256 return undef unless $self->_read_data($timeout);
539             }
540 41         140 while ( not $done = $self->_read_body ) {
541 36 100       67 return undef unless $self->_read_data($timeout);
542             }
543              
544 31         108 return $done;
545             }
546              
547             sub _get_next_transaction {
548 13     13   22 my $self = shift;
549 13   100     532 my $serial = $self->serial || 0;
550 13         192 $serial++;
551 13         352 $self->serial($serial);
552              
553 13   100     379 return ($self->session_id||'nosession') . '-' . $serial;
554             }
555              
556             1;
557              
558             __END__