| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | package Net::Stomp; | 
| 2 | 11 |  |  | 11 |  | 404103 | use strict; | 
|  | 11 |  |  |  |  | 103 |  | 
|  | 11 |  |  |  |  | 324 |  | 
| 3 | 11 |  |  | 11 |  | 56 | use warnings; | 
|  | 11 |  |  |  |  | 20 |  | 
|  | 11 |  |  |  |  | 257 |  | 
| 4 | 11 |  |  | 11 |  | 52 | use IO::Select; | 
|  | 11 |  |  |  |  | 20 |  | 
|  | 11 |  |  |  |  | 222 |  | 
| 5 | 11 |  |  | 11 |  | 4922 | use Net::Stomp::Frame; | 
|  | 11 |  |  |  |  | 34 |  | 
|  | 11 |  |  |  |  | 81 |  | 
| 6 | 11 |  |  | 11 |  | 463 | use Carp qw(longmess); | 
|  | 11 |  |  |  |  | 21 |  | 
|  | 11 |  |  |  |  | 635 |  | 
| 7 | 11 |  |  | 11 |  | 65 | use base 'Class::Accessor::Fast'; | 
|  | 11 |  |  |  |  | 23 |  | 
|  | 11 |  |  |  |  | 798 |  | 
| 8 | 11 |  |  | 11 |  | 64 | use Log::Any; | 
|  | 11 |  |  |  |  | 28 |  | 
|  | 11 |  |  |  |  | 86 |  | 
| 9 |  |  |  |  |  |  | our $VERSION = '0.61'; | 
| 10 |  |  |  |  |  |  |  | 
| 11 |  |  |  |  |  |  | __PACKAGE__->mk_accessors( qw( | 
| 12 |  |  |  |  |  |  | current_host failover hostname hosts port select serial session_id socket ssl | 
| 13 |  |  |  |  |  |  | ssl_options socket_options subscriptions _connect_headers bufsize | 
| 14 |  |  |  |  |  |  | reconnect_on_fork logger connect_delay | 
| 15 |  |  |  |  |  |  | reconnect_attempts initial_reconnect_attempts timeout receipt_timeout | 
| 16 |  |  |  |  |  |  | ) ); | 
| 17 |  |  |  |  |  |  |  | 
| 18 |  |  |  |  |  |  | sub _logconfess { | 
| 19 | 2 |  |  | 2 |  | 15 | my ($self,@etc) = @_; | 
| 20 | 2 |  |  |  |  | 655 | my $m = longmess(@etc); | 
| 21 | 2 |  |  |  |  | 52 | $self->logger->fatal($m); | 
| 22 | 2 |  |  |  |  | 213 | die $m; | 
| 23 |  |  |  |  |  |  | } | 
| 24 |  |  |  |  |  |  | sub _logdie { | 
| 25 | 34 |  |  | 34 |  | 830 | my ($self,@etc) = @_; | 
| 26 | 34 |  |  |  |  | 575 | $self->logger->fatal(@etc); | 
| 27 | 34 |  |  |  |  | 2354 | die "@etc"; | 
| 28 |  |  |  |  |  |  | } | 
| 29 |  |  |  |  |  |  |  | 
| 30 |  |  |  |  |  |  | sub new { | 
| 31 | 28 |  |  | 28 | 1 | 160151 | my $class = shift; | 
| 32 | 28 |  |  |  |  | 208 | my $self  = $class->SUPER::new(@_); | 
| 33 |  |  |  |  |  |  |  | 
| 34 | 28 | 50 |  |  |  | 985 | $self->bufsize(8192) unless $self->bufsize; | 
| 35 | 28 | 50 |  |  |  | 1334 | $self->connect_delay(5) unless defined $self->connect_delay; | 
| 36 | 28 | 100 |  |  |  | 631 | $self->reconnect_on_fork(1) unless defined $self->reconnect_on_fork; | 
| 37 | 28 | 100 |  |  |  | 1260 | $self->reconnect_attempts(0) unless defined $self->reconnect_attempts; | 
| 38 | 28 | 100 |  |  |  | 1187 | $self->initial_reconnect_attempts(1) unless defined $self->initial_reconnect_attempts; | 
| 39 | 28 | 50 |  |  |  | 1176 | $self->socket_options({}) unless defined $self->socket_options; | 
| 40 |  |  |  |  |  |  |  | 
| 41 | 28 | 50 |  |  |  | 1218 | $self->logger(Log::Any->get_logger) unless $self->logger; | 
| 42 |  |  |  |  |  |  |  | 
| 43 | 28 |  |  |  |  | 4266 | $self->{_framebuf} = ""; | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  | # We are not subscribed to anything at the start | 
| 46 | 28 |  |  |  |  | 591 | $self->subscriptions( {} ); | 
| 47 |  |  |  |  |  |  |  | 
| 48 | 28 |  |  |  |  | 301 | $self->select( IO::Select->new ); | 
| 49 | 28 |  |  |  |  | 822 | my @hosts = (); | 
| 50 |  |  |  |  |  |  |  | 
| 51 |  |  |  |  |  |  | # failover://tcp://primary:61616 | 
| 52 |  |  |  |  |  |  | # failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false | 
| 53 |  |  |  |  |  |  |  | 
| 54 | 28 | 100 |  |  |  | 529 | if ($self->failover) { | 
|  |  | 100 |  |  |  |  |  | 
| 55 | 6 |  |  |  |  | 122 | my ($uris, $opts) = $self->failover =~ m{^failover:(?://)? \(? (.*?) \)? (?: \? (.*?) ) ?$}ix; | 
| 56 |  |  |  |  |  |  |  | 
| 57 | 6 | 100 |  |  |  | 102 | $self->_logconfess("Unable to parse failover uri: " . $self->failover) | 
| 58 |  |  |  |  |  |  | unless $uris; | 
| 59 |  |  |  |  |  |  |  | 
| 60 | 5 |  |  |  |  | 24 | foreach my $host (split(/,/,$uris)) { | 
| 61 | 7 | 100 |  |  |  | 37 | $host =~ m{^\w+://([a-zA-Z0-9\-./]+):([0-9]+)$} || $self->_logconfess("Unable to parse failover component: '$host'"); | 
| 62 | 6 |  |  |  |  | 22 | my ($hostname, $port) = ($1, $2); | 
| 63 |  |  |  |  |  |  |  | 
| 64 | 6 |  |  |  |  | 26 | push(@hosts, {hostname => $hostname, port => $port}); | 
| 65 |  |  |  |  |  |  | } | 
| 66 |  |  |  |  |  |  | } elsif ($self->hosts) { | 
| 67 |  |  |  |  |  |  | ## @hosts is used inside the while loop later to decide whether we have | 
| 68 |  |  |  |  |  |  | ## cycled through all setup hosts. | 
| 69 | 19 |  |  |  |  | 566 | @hosts = @{$self->hosts}; | 
|  | 19 |  |  |  |  | 328 |  | 
| 70 |  |  |  |  |  |  | } | 
| 71 | 26 | 100 |  |  |  | 630 | $self->hosts(\@hosts) if @hosts; | 
| 72 |  |  |  |  |  |  |  | 
| 73 | 26 |  |  |  |  | 290 | $self->_get_connection_retrying(1); | 
| 74 |  |  |  |  |  |  |  | 
| 75 | 22 |  |  |  |  | 125 | return $self; | 
| 76 |  |  |  |  |  |  | } | 
| 77 |  |  |  |  |  |  |  | 
| 78 |  |  |  |  |  |  | sub _get_connection_retrying { | 
| 79 | 36 |  |  | 36 |  | 89 | my ($self,$initial) = @_; | 
| 80 |  |  |  |  |  |  |  | 
| 81 | 36 |  |  |  |  | 65 | my $tries=0; | 
| 82 | 36 |  |  |  |  | 81 | while(not eval { $self->_get_connection; 1 }) { | 
|  | 60 |  |  |  |  | 2811 |  | 
|  | 31 |  |  |  |  | 127 |  | 
| 83 | 29 |  |  |  |  | 86 | my $err = $@;$err =~ s{\n\z}{}sm; | 
|  | 29 |  |  |  |  | 162 |  | 
| 84 | 29 |  |  |  |  | 57 | ++$tries; | 
| 85 | 29 | 100 |  |  |  | 85 | if($self->_should_stop_trying($initial,$tries)) { | 
| 86 |  |  |  |  |  |  | # We've cycled enough. Die now. | 
| 87 | 5 |  |  |  |  | 24 | $self->_logdie("Failed to connect: $err; giving up"); | 
| 88 |  |  |  |  |  |  | } | 
| 89 | 24 |  |  |  |  | 404 | $self->logger->warn("Failed to connect: $err; retrying"); | 
| 90 | 24 |  |  |  |  | 1542 | sleep($self->connect_delay); | 
| 91 |  |  |  |  |  |  | } | 
| 92 |  |  |  |  |  |  | } | 
| 93 |  |  |  |  |  |  |  | 
| 94 |  |  |  |  |  |  | sub _should_stop_trying { | 
| 95 | 29 |  |  | 29 |  | 67 | my ($self,$initial,$tries) = @_; | 
| 96 |  |  |  |  |  |  |  | 
| 97 | 29 | 100 |  |  |  | 646 | my $max_tries = $initial | 
| 98 |  |  |  |  |  |  | ? $self->initial_reconnect_attempts | 
| 99 |  |  |  |  |  |  | : $self->reconnect_attempts; | 
| 100 |  |  |  |  |  |  |  | 
| 101 | 29 | 100 |  |  |  | 201 | return unless $max_tries > 0; # 0 means forever | 
| 102 |  |  |  |  |  |  |  | 
| 103 | 17 | 100 |  |  |  | 307 | if (defined $self->hosts) { | 
| 104 | 14 |  |  |  |  | 69 | $max_tries *= @{$self->hosts}; # try at least once per host | 
|  | 14 |  |  |  |  | 221 |  | 
| 105 |  |  |  |  |  |  | } | 
| 106 | 17 |  |  |  |  | 103 | return $tries >= $max_tries; | 
| 107 |  |  |  |  |  |  | } | 
| 108 |  |  |  |  |  |  |  | 
| 109 |  |  |  |  |  |  | my $socket_class; | 
| 110 |  |  |  |  |  |  | sub _get_connection { | 
| 111 | 52 |  |  | 52 |  | 101 | my $self = shift; | 
| 112 | 52 | 100 |  |  |  | 1192 | if (my $hosts = $self->hosts) { | 
| 113 | 48 | 100 | 100 |  |  | 1117 | if (defined $self->current_host && ($self->current_host < $#{$hosts} ) ) { | 
|  | 33 |  |  |  |  | 932 |  | 
| 114 | 13 |  |  |  |  | 221 | $self->current_host($self->current_host+1); | 
| 115 |  |  |  |  |  |  | } else { | 
| 116 | 35 |  |  |  |  | 699 | $self->current_host(0); | 
| 117 |  |  |  |  |  |  | } | 
| 118 | 48 |  |  |  |  | 1081 | my $h = $hosts->[$self->current_host]; | 
| 119 | 48 |  |  |  |  | 1041 | $self->hostname($h->{hostname}); | 
| 120 | 48 |  |  |  |  | 1094 | $self->port($h->{port}); | 
| 121 | 48 |  |  |  |  | 1149 | $self->ssl($h->{ssl}); | 
| 122 | 48 |  | 100 |  |  | 1390 | $self->ssl_options($h->{ssl_options} || {}); | 
| 123 |  |  |  |  |  |  | } | 
| 124 | 52 |  |  |  |  | 505 | my $socket = $self->_get_socket; | 
| 125 | 52 | 100 |  |  |  | 1055 | $self->_logdie("Error connecting to " . $self->hostname . ':' . $self->port . ": $!") | 
| 126 |  |  |  |  |  |  | unless $socket; | 
| 127 |  |  |  |  |  |  |  | 
| 128 | 23 |  |  |  |  | 416 | $self->select->remove($self->socket); | 
| 129 |  |  |  |  |  |  |  | 
| 130 | 23 |  |  |  |  | 1048 | $self->select->add($socket); | 
| 131 | 23 |  |  |  |  | 528 | $self->socket($socket); | 
| 132 | 23 |  |  |  |  | 195 | $self->{_pid} = $$; | 
| 133 |  |  |  |  |  |  | } | 
| 134 |  |  |  |  |  |  |  | 
| 135 |  |  |  |  |  |  | sub _get_socket { | 
| 136 | 0 |  |  | 0 |  | 0 | my ($self) = @_; | 
| 137 | 0 |  |  |  |  | 0 | my $socket; | 
| 138 |  |  |  |  |  |  |  | 
| 139 | 0 |  |  |  |  | 0 | my $timeout = $self->timeout; | 
| 140 | 0 | 0 |  |  |  | 0 | $timeout = 5 unless defined $timeout; | 
| 141 |  |  |  |  |  |  |  | 
| 142 |  |  |  |  |  |  | my %sockopts = ( | 
| 143 |  |  |  |  |  |  | Timeout  => $timeout, | 
| 144 | 0 |  |  |  |  | 0 | %{ $self->socket_options }, | 
|  | 0 |  |  |  |  | 0 |  | 
| 145 |  |  |  |  |  |  | PeerAddr => $self->hostname, | 
| 146 |  |  |  |  |  |  | PeerPort => $self->port, | 
| 147 |  |  |  |  |  |  | Proto    => 'tcp', | 
| 148 |  |  |  |  |  |  | ); | 
| 149 | 0 |  |  |  |  | 0 | my $keep_alive = delete $sockopts{keep_alive}; | 
| 150 |  |  |  |  |  |  |  | 
| 151 | 0 | 0 |  |  |  | 0 | if ( $self->ssl ) { | 
| 152 | 0 |  |  |  |  | 0 | eval { require IO::Socket::SSL }; | 
|  | 0 |  |  |  |  | 0 |  | 
| 153 | 0 | 0 |  |  |  | 0 | $self->_logdie( | 
| 154 |  |  |  |  |  |  | "You should install the IO::Socket::SSL module for SSL support in Net::Stomp" | 
| 155 |  |  |  |  |  |  | ) if $@; | 
| 156 | 0 | 0 |  |  |  | 0 | %sockopts = ( %sockopts, %{ $self->ssl_options || {} } ); | 
|  | 0 |  |  |  |  | 0 |  | 
| 157 | 0 |  |  |  |  | 0 | $self->logger->trace('opening IO::Socket::SSL',\%sockopts); | 
| 158 | 0 |  |  |  |  | 0 | $socket = IO::Socket::SSL->new(%sockopts); | 
| 159 |  |  |  |  |  |  | } else { | 
| 160 |  |  |  |  |  |  | $socket_class ||= eval { require IO::Socket::IP; IO::Socket::IP->VERSION('0.20'); "IO::Socket::IP" } | 
| 161 | 0 |  | 0 |  |  | 0 | || do { require IO::Socket::INET; "IO::Socket::INET" }; | 
|  |  |  | 0 |  |  |  |  | 
| 162 | 0 |  |  |  |  | 0 | $self->logger->trace("opening $socket_class",\%sockopts); | 
| 163 | 0 |  |  |  |  | 0 | $socket = $socket_class->new(%sockopts); | 
| 164 | 0 | 0 |  |  |  | 0 | binmode($socket) if $socket; | 
| 165 |  |  |  |  |  |  | } | 
| 166 | 0 | 0 |  |  |  | 0 | if ($keep_alive) { | 
| 167 | 0 |  |  |  |  | 0 | require Socket; | 
| 168 | 0 | 0 |  |  |  | 0 | if (Socket->can('SO_KEEPALIVE')) { | 
| 169 | 0 |  |  |  |  | 0 | $socket->setsockopt(Socket::SOL_SOCKET(),Socket::SO_KEEPALIVE(),1); | 
| 170 |  |  |  |  |  |  | } | 
| 171 |  |  |  |  |  |  | else { | 
| 172 | 0 |  |  |  |  | 0 | $self->logger->warn(q{TCP keep-alive was requested, but the Socket module does not export the SO_KEEPALIVE constant, so we couldn't enable it}); | 
| 173 |  |  |  |  |  |  | } | 
| 174 |  |  |  |  |  |  | } | 
| 175 |  |  |  |  |  |  |  | 
| 176 | 0 |  |  |  |  | 0 | return $socket; | 
| 177 |  |  |  |  |  |  | } | 
| 178 |  |  |  |  |  |  |  | 
| 179 |  |  |  |  |  |  | sub connect { | 
| 180 | 11 |  |  | 11 | 1 | 1317 | my ( $self, $conf ) = @_; | 
| 181 |  |  |  |  |  |  |  | 
| 182 | 11 |  |  |  |  | 235 | $self->logger->trace('connecting'); | 
| 183 | 11 |  |  |  |  | 672 | my $frame = Net::Stomp::Frame->new( | 
| 184 |  |  |  |  |  |  | { command => 'CONNECT', headers => $conf } ); | 
| 185 | 11 |  |  |  |  | 63 | $self->send_frame($frame); | 
| 186 | 11 |  |  |  |  | 48 | $frame = $self->receive_frame; | 
| 187 |  |  |  |  |  |  |  | 
| 188 | 11 | 50 | 33 |  |  | 233 | if ($frame && $frame->command eq 'CONNECTED') { | 
| 189 | 11 |  |  |  |  | 251 | $self->logger->trace('connected'); | 
| 190 |  |  |  |  |  |  | # Setting initial values for session id, as given from | 
| 191 |  |  |  |  |  |  | # the stomp server | 
| 192 | 11 |  |  |  |  | 704 | $self->session_id( $frame->headers->{session} ); | 
| 193 | 11 |  |  |  |  | 499 | $self->_connect_headers( $conf ); | 
| 194 |  |  |  |  |  |  | } | 
| 195 |  |  |  |  |  |  | else { | 
| 196 | 0 |  | 0 |  |  | 0 | $self->logger->warn('failed to connect',{ %{$frame // {}} }); | 
|  | 0 |  |  |  |  | 0 |  | 
| 197 |  |  |  |  |  |  | } | 
| 198 |  |  |  |  |  |  |  | 
| 199 | 11 |  |  |  |  | 119 | return $frame; | 
| 200 |  |  |  |  |  |  | } | 
| 201 |  |  |  |  |  |  |  | 
| 202 |  |  |  |  |  |  | sub _close_socket { | 
| 203 | 14 |  |  | 14 |  | 34 | my ($self) = @_; | 
| 204 | 14 | 50 |  |  |  | 255 | return unless $self->socket; | 
| 205 | 14 |  |  |  |  | 312 | $self->logger->trace('closing socket'); | 
| 206 | 14 |  |  |  |  | 848 | $self->socket->close; | 
| 207 | 14 |  |  |  |  | 346 | $self->select->remove($self->socket); | 
| 208 |  |  |  |  |  |  | } | 
| 209 |  |  |  |  |  |  |  | 
| 210 |  |  |  |  |  |  | sub disconnect { | 
| 211 | 1 |  |  | 1 | 1 | 18936 | my $self = shift; | 
| 212 | 1 |  |  |  |  | 33 | $self->logger->trace('disconnecting'); | 
| 213 | 1 |  |  |  |  | 31 | my $frame = Net::Stomp::Frame->new( { command => 'DISCONNECT' } ); | 
| 214 | 1 |  |  |  |  | 5 | $self->send_frame($frame); | 
| 215 | 1 |  |  |  |  | 5 | $self->_close_socket; | 
| 216 | 1 |  |  |  |  | 25 | return 1; | 
| 217 |  |  |  |  |  |  | } | 
| 218 |  |  |  |  |  |  |  | 
| 219 |  |  |  |  |  |  | sub _reconnect { | 
| 220 | 10 |  |  | 10 |  | 28 | my $self = shift; | 
| 221 | 10 |  |  |  |  | 42 | $self->_close_socket; | 
| 222 |  |  |  |  |  |  |  | 
| 223 | 10 |  |  |  |  | 404 | $self->logger->warn("reconnecting"); | 
| 224 | 10 |  |  |  |  | 495 | $self->_get_connection_retrying(0); | 
| 225 |  |  |  |  |  |  | # Both ->connect and ->subscribe can call _reconnect. It *should* | 
| 226 |  |  |  |  |  |  | # work out fine in the end, worst scenario we send a few subscribe | 
| 227 |  |  |  |  |  |  | # frame more than once | 
| 228 | 9 |  |  |  |  | 173 | $self->connect( $self->_connect_headers ); | 
| 229 | 9 |  |  |  |  | 21 | for my $sub(keys %{$self->subscriptions}) { | 
|  | 9 |  |  |  |  | 163 |  | 
| 230 | 8 |  |  |  |  | 210 | $self->subscribe($self->subscriptions->{$sub}); | 
| 231 |  |  |  |  |  |  | } | 
| 232 |  |  |  |  |  |  | } | 
| 233 |  |  |  |  |  |  |  | 
| 234 |  |  |  |  |  |  | sub can_read { | 
| 235 | 0 |  |  | 0 | 1 | 0 | my ( $self, $conf ) = @_; | 
| 236 |  |  |  |  |  |  |  | 
| 237 |  |  |  |  |  |  | # If there is any data left in the framebuffer that we haven't read, return | 
| 238 |  |  |  |  |  |  | # 'true'. But we don't want to spin endlessly, so only return true the | 
| 239 |  |  |  |  |  |  | # first time. (Anything touching the _framebuf should update this flag when | 
| 240 |  |  |  |  |  |  | # it does something. | 
| 241 | 0 | 0 | 0 |  |  | 0 | if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) { | 
| 242 | 0 |  |  |  |  | 0 | $self->{_framebuf_changed} = 0; | 
| 243 | 0 |  |  |  |  | 0 | return 1; | 
| 244 |  |  |  |  |  |  | } | 
| 245 |  |  |  |  |  |  |  | 
| 246 | 0 |  | 0 |  |  | 0 | $conf ||= {}; | 
| 247 | 0 | 0 |  |  |  | 0 | my $timeout = exists $conf->{timeout} ? $conf->{timeout} : $self->timeout; | 
| 248 | 0 |  | 0 |  |  | 0 | return $self->select->can_read($timeout) || 0; | 
| 249 |  |  |  |  |  |  | } | 
| 250 |  |  |  |  |  |  |  | 
| 251 |  |  |  |  |  |  | sub send { | 
| 252 | 19 |  |  | 19 | 1 | 63102 | my ( $self, $conf ) = @_; | 
| 253 | 19 |  |  |  |  | 79 | $conf = { %$conf }; | 
| 254 | 19 |  |  |  |  | 551 | $self->logger->trace('sending',$conf); | 
| 255 | 19 |  |  |  |  | 3291 | my $body = $conf->{body}; | 
| 256 | 19 |  |  |  |  | 53 | delete $conf->{body}; | 
| 257 | 19 |  |  |  |  | 169 | my $frame = Net::Stomp::Frame->new( | 
| 258 |  |  |  |  |  |  | { command => 'SEND', headers => $conf, body => $body } ); | 
| 259 | 19 |  |  |  |  | 111 | $self->send_frame($frame); | 
| 260 | 18 |  |  |  |  | 120 | return 1; | 
| 261 |  |  |  |  |  |  | } | 
| 262 |  |  |  |  |  |  |  | 
| 263 |  |  |  |  |  |  | sub send_with_receipt { | 
| 264 | 9 |  |  | 9 | 1 | 35272 | my ( $self, $conf ) = @_; | 
| 265 | 9 |  |  |  |  | 50 | $conf = { %$conf }; | 
| 266 |  |  |  |  |  |  |  | 
| 267 |  |  |  |  |  |  | # send the message | 
| 268 | 9 |  |  |  |  | 39 | my $receipt_id = $self->_get_next_transaction; | 
| 269 | 9 |  |  |  |  | 239 | $self->logger->debug('sending with receipt',{ receipt => $receipt_id }); | 
| 270 | 9 |  |  |  |  | 503 | $conf->{receipt} = $receipt_id; | 
| 271 | 9 | 100 |  |  |  | 123 | my $receipt_timeout = exists $conf->{timeout} ? delete $conf->{timeout} : $self->receipt_timeout; | 
| 272 | 9 |  |  |  |  | 58 | $self->send($conf); | 
| 273 |  |  |  |  |  |  |  | 
| 274 | 9 |  |  |  |  | 188 | $self->logger->trace('waiting for receipt',$conf); | 
| 275 |  |  |  |  |  |  | # check the receipt | 
| 276 | 9 | 100 |  |  |  | 393 | my $receipt_frame = $self->receive_frame({ | 
| 277 |  |  |  |  |  |  | ( defined $receipt_timeout ? | 
| 278 |  |  |  |  |  |  | ( timeout => $receipt_timeout ) | 
| 279 |  |  |  |  |  |  | : () ), | 
| 280 |  |  |  |  |  |  | }); | 
| 281 |  |  |  |  |  |  |  | 
| 282 | 9 | 100 |  |  |  | 62 | if (@_ > 2) { | 
| 283 | 4 |  |  |  |  | 16 | $_[2] = $receipt_frame; | 
| 284 |  |  |  |  |  |  | } | 
| 285 |  |  |  |  |  |  |  | 
| 286 | 9 | 100 | 100 |  |  | 147 | if (   $receipt_frame | 
|  |  |  | 100 |  |  |  |  | 
| 287 |  |  |  |  |  |  | && $receipt_frame->command eq 'RECEIPT' | 
| 288 |  |  |  |  |  |  | && $receipt_frame->headers->{'receipt-id'} eq $receipt_id ) | 
| 289 |  |  |  |  |  |  | { | 
| 290 | 2 |  |  |  |  | 108 | $self->logger->debug('got good receipt',{ %{$receipt_frame} }); | 
|  | 2 |  |  |  |  | 21 |  | 
| 291 | 2 |  |  |  |  | 28 | return 1; | 
| 292 |  |  |  |  |  |  | } else { | 
| 293 | 7 | 100 |  |  |  | 190 | $self->logger->debug('got bad receipt',{ %{$receipt_frame || {} } }); | 
|  | 7 |  |  |  |  | 77 |  | 
| 294 | 7 |  |  |  |  | 120 | return 0; | 
| 295 |  |  |  |  |  |  | } | 
| 296 |  |  |  |  |  |  | } | 
| 297 |  |  |  |  |  |  |  | 
| 298 |  |  |  |  |  |  | sub send_transactional { | 
| 299 | 4 |  |  | 4 | 1 | 46107 | my ( $self, $conf ) = @_; | 
| 300 |  |  |  |  |  |  |  | 
| 301 | 4 |  |  |  |  | 26 | $conf = { %$conf }; | 
| 302 |  |  |  |  |  |  | # begin the transaction | 
| 303 | 4 |  |  |  |  | 25 | my $transaction_id = $self->_get_next_transaction; | 
| 304 | 4 |  |  |  |  | 123 | $self->logger->debug('starting transaction',{ transaction => $transaction_id }); | 
| 305 | 4 |  |  |  |  | 185 | my $begin_frame | 
| 306 |  |  |  |  |  |  | = Net::Stomp::Frame->new( | 
| 307 |  |  |  |  |  |  | { command => 'BEGIN', headers => { transaction => $transaction_id } } | 
| 308 |  |  |  |  |  |  | ); | 
| 309 | 4 |  |  |  |  | 24 | $self->send_frame($begin_frame); | 
| 310 |  |  |  |  |  |  |  | 
| 311 | 4 |  |  |  |  | 12 | $conf->{transaction} = $transaction_id; | 
| 312 | 4 |  |  |  |  | 5 | my $receipt_frame; | 
| 313 | 4 |  |  |  |  | 16 | my $ret = $self->send_with_receipt($conf,$receipt_frame); | 
| 314 |  |  |  |  |  |  |  | 
| 315 | 4 | 50 |  |  |  | 14 | if (@_ > 2) { | 
| 316 | 0 |  |  |  |  | 0 | $_[2] = $receipt_frame; | 
| 317 |  |  |  |  |  |  | } | 
| 318 |  |  |  |  |  |  |  | 
| 319 | 4 | 100 |  |  |  | 9 | if ( $ret ) { | 
| 320 |  |  |  |  |  |  | # success, commit the transaction | 
| 321 | 1 |  |  |  |  | 20 | $self->logger->debug('committing transaction',{ transaction => $transaction_id }); | 
| 322 | 1 |  |  |  |  | 17 | my $frame_commit = Net::Stomp::Frame->new( | 
| 323 |  |  |  |  |  |  | {   command => 'COMMIT', | 
| 324 |  |  |  |  |  |  | headers => { transaction => $transaction_id } | 
| 325 |  |  |  |  |  |  | } | 
| 326 |  |  |  |  |  |  | ); | 
| 327 | 1 |  |  |  |  | 5 | $self->send_frame($frame_commit); | 
| 328 |  |  |  |  |  |  | } else { | 
| 329 | 3 |  |  |  |  | 57 | $self->logger->debug('rolling back transaction',{ transaction => $transaction_id }); | 
| 330 |  |  |  |  |  |  | # some failure, abort transaction | 
| 331 | 3 |  |  |  |  | 46 | my $frame_abort = Net::Stomp::Frame->new( | 
| 332 |  |  |  |  |  |  | {   command => 'ABORT', | 
| 333 |  |  |  |  |  |  | headers => { transaction => $transaction_id } | 
| 334 |  |  |  |  |  |  | } | 
| 335 |  |  |  |  |  |  | ); | 
| 336 | 3 |  |  |  |  | 10 | $self->send_frame($frame_abort); | 
| 337 |  |  |  |  |  |  | } | 
| 338 | 4 |  |  |  |  | 26 | return $ret; | 
| 339 |  |  |  |  |  |  | } | 
| 340 |  |  |  |  |  |  |  | 
| 341 |  |  |  |  |  |  | sub _sub_key { | 
| 342 | 13 |  |  | 13 |  | 35 | my ($conf) = @_; | 
| 343 |  |  |  |  |  |  |  | 
| 344 | 13 | 100 |  |  |  | 43 | if ($conf->{id}) { return "id-".$conf->{id} } | 
|  | 2 |  |  |  |  | 9 |  | 
| 345 |  |  |  |  |  |  | return "dest-".$conf->{destination} | 
| 346 | 11 |  |  |  |  | 40 | } | 
| 347 |  |  |  |  |  |  |  | 
| 348 |  |  |  |  |  |  | sub subscribe { | 
| 349 | 11 |  |  | 11 | 1 | 11953 | my ( $self, $conf ) = @_; | 
| 350 | 11 |  |  |  |  | 264 | $self->logger->trace('subscribing',$conf); | 
| 351 | 11 |  |  |  |  | 2801 | my $frame = Net::Stomp::Frame->new( | 
| 352 |  |  |  |  |  |  | { command => 'SUBSCRIBE', headers => $conf } ); | 
| 353 | 11 |  |  |  |  | 51 | $self->send_frame($frame); | 
| 354 | 11 |  |  |  |  | 201 | my $subs = $self->subscriptions; | 
| 355 | 11 |  |  |  |  | 81 | $subs->{_sub_key($conf)} = $conf; | 
| 356 | 11 |  |  |  |  | 43 | return 1; | 
| 357 |  |  |  |  |  |  | } | 
| 358 |  |  |  |  |  |  |  | 
| 359 |  |  |  |  |  |  | sub unsubscribe { | 
| 360 | 2 |  |  | 2 | 1 | 11278 | my ( $self, $conf ) = @_; | 
| 361 | 2 |  |  |  |  | 55 | $self->logger->trace('unsubscribing',$conf); | 
| 362 | 2 |  |  |  |  | 36 | my $frame = Net::Stomp::Frame->new( | 
| 363 |  |  |  |  |  |  | { command => 'UNSUBSCRIBE', headers => $conf } ); | 
| 364 | 2 |  |  |  |  | 10 | $self->send_frame($frame); | 
| 365 | 2 |  |  |  |  | 40 | my $subs = $self->subscriptions; | 
| 366 | 2 |  |  |  |  | 11 | delete $subs->{_sub_key($conf)}; | 
| 367 | 2 |  |  |  |  | 6 | return 1; | 
| 368 |  |  |  |  |  |  | } | 
| 369 |  |  |  |  |  |  |  | 
| 370 |  |  |  |  |  |  | sub ack { | 
| 371 | 2 |  |  | 2 | 1 | 21800 | my ( $self, $conf ) = @_; | 
| 372 | 2 |  |  |  |  | 8 | $conf = { %$conf }; | 
| 373 | 2 |  |  |  |  | 54 | my $id    = $conf->{frame}->headers->{'message-id'}; | 
| 374 | 2 |  |  |  |  | 11 | delete $conf->{frame}; | 
| 375 | 2 |  |  |  |  | 37 | $self->logger->trace('acking',{ 'message-id' => $id, %$conf }); | 
| 376 | 2 |  |  |  |  | 43 | my $frame = Net::Stomp::Frame->new( | 
| 377 |  |  |  |  |  |  | { command => 'ACK', headers => { 'message-id' => $id, %$conf } } ); | 
| 378 | 2 |  |  |  |  | 8 | $self->send_frame($frame); | 
| 379 | 2 |  |  |  |  | 10 | return 1; | 
| 380 |  |  |  |  |  |  | } | 
| 381 |  |  |  |  |  |  |  | 
| 382 |  |  |  |  |  |  | sub nack { | 
| 383 | 2 |  |  | 2 | 1 | 6499 | my ( $self, $conf ) = @_; | 
| 384 | 2 |  |  |  |  | 8 | $conf = { %$conf }; | 
| 385 | 2 |  |  |  |  | 54 | my $id    = $conf->{frame}->headers->{'message-id'}; | 
| 386 | 2 |  |  |  |  | 42 | $self->logger->trace('nacking',{ 'message-id' => $id, %$conf }); | 
| 387 | 2 |  |  |  |  | 35 | delete $conf->{frame}; | 
| 388 | 2 |  |  |  |  | 12 | my $frame = Net::Stomp::Frame->new( | 
| 389 |  |  |  |  |  |  | { command => 'NACK', headers => { 'message-id' => $id, %$conf } } ); | 
| 390 | 2 |  |  |  |  | 10 | $self->send_frame($frame); | 
| 391 | 2 |  |  |  |  | 9 | return 1; | 
| 392 |  |  |  |  |  |  | } | 
| 393 |  |  |  |  |  |  |  | 
| 394 |  |  |  |  |  |  | sub send_frame { | 
| 395 | 48 |  |  | 48 | 1 | 124 | my ( $self, $frame ) = @_; | 
| 396 |  |  |  |  |  |  | # see if we're connected before we try to syswrite() | 
| 397 | 48 | 100 |  |  |  | 154 | if (not $self->_connected) { | 
| 398 | 5 |  |  |  |  | 34 | $self->_reconnect; | 
| 399 | 4 | 50 |  |  |  | 28 | if (not $self->_connected) { | 
| 400 | 0 |  |  |  |  | 0 | $self->_logdie(q{wasn't connected; couldn't _reconnect()}); | 
| 401 |  |  |  |  |  |  | } | 
| 402 |  |  |  |  |  |  | } | 
| 403 |  |  |  |  |  |  | # keep writing until we finish, or get an error | 
| 404 | 47 |  |  |  |  | 141 | my $to_write = my $frame_string = $frame->as_string; | 
| 405 | 47 |  |  |  |  | 73 | my $written; | 
| 406 | 47 |  |  |  |  | 117 | while (length($to_write)) { | 
| 407 | 58 |  |  |  |  | 1124 | local $SIG{PIPE}='IGNORE'; # just in case writing to a closed | 
| 408 |  |  |  |  |  |  | # socket kills us | 
| 409 | 58 |  |  |  |  | 1483 | $written = $self->socket->syswrite($to_write); | 
| 410 | 58 | 100 |  |  |  | 699 | last unless defined $written; | 
| 411 | 57 |  |  |  |  | 930 | substr($to_write,0,$written,''); | 
| 412 |  |  |  |  |  |  | } | 
| 413 | 47 | 100 |  |  |  | 162 | if (not defined $written) { | 
| 414 | 1 |  |  |  |  | 25 | $self->logger->warn("error writing frame <<$frame_string>>: $!"); | 
| 415 |  |  |  |  |  |  | } | 
| 416 | 47 | 100 | 100 |  |  | 386 | unless (defined $written && $self->_connected) { | 
| 417 | 2 |  |  |  |  | 12 | $self->_reconnect; | 
| 418 | 2 |  |  |  |  | 8 | $self->send_frame($frame); | 
| 419 |  |  |  |  |  |  | } | 
| 420 | 47 |  |  |  |  | 115 | return; | 
| 421 |  |  |  |  |  |  | } | 
| 422 |  |  |  |  |  |  |  | 
| 423 |  |  |  |  |  |  | sub _read_data { | 
| 424 | 138 |  |  | 138 |  | 255 | my ($self, $timeout) = @_; | 
| 425 |  |  |  |  |  |  |  | 
| 426 | 138 | 100 |  |  |  | 2588 | return unless $self->select->can_read($timeout); | 
| 427 |  |  |  |  |  |  | my $len = $self->socket->sysread($self->{_framebuf}, | 
| 428 |  |  |  |  |  |  | $self->bufsize, | 
| 429 | 106 |  | 100 |  |  | 2902 | length($self->{_framebuf} || '')); | 
| 430 |  |  |  |  |  |  |  | 
| 431 | 106 | 100 | 100 |  |  | 3503 | if (defined $len && $len>0) { | 
| 432 | 103 |  |  |  |  | 202 | $self->{_framebuf_changed} = 1; | 
| 433 |  |  |  |  |  |  | } | 
| 434 |  |  |  |  |  |  | else { | 
| 435 | 3 | 100 |  |  |  | 10 | if (!defined $len) { | 
| 436 | 2 |  |  |  |  | 38 | $self->logger->warn("error reading frame: $!"); | 
| 437 |  |  |  |  |  |  | } | 
| 438 |  |  |  |  |  |  | # EOF or error detected - connection is gone. We have to reset | 
| 439 |  |  |  |  |  |  | # the framebuf in case we had a partial frame in there that | 
| 440 |  |  |  |  |  |  | # will never arrive. | 
| 441 | 3 |  |  |  |  | 197 | $self->_close_socket; | 
| 442 | 3 |  |  |  |  | 78 | $self->{_framebuf} = ""; | 
| 443 | 3 |  |  |  |  | 6 | delete $self->{_command}; | 
| 444 | 3 |  |  |  |  | 9 | delete $self->{_headers}; | 
| 445 |  |  |  |  |  |  | } | 
| 446 | 106 |  |  |  |  | 333 | return $len; | 
| 447 |  |  |  |  |  |  | } | 
| 448 |  |  |  |  |  |  |  | 
| 449 |  |  |  |  |  |  | sub _read_headers { | 
| 450 | 143 |  |  | 143 |  | 238 | my ($self) = @_; | 
| 451 |  |  |  |  |  |  |  | 
| 452 | 143 | 100 |  |  |  | 319 | return 1 if $self->{_headers}; | 
| 453 | 133 | 100 |  |  |  | 491 | if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) { | 
| 454 | 31 |  |  |  |  | 84 | $self->{_framebuf_changed} = 1; | 
| 455 | 31 |  |  |  |  | 77 | my $raw_headers = $1; | 
| 456 | 31 | 50 |  |  |  | 150 | if ($raw_headers =~ s/^(.+)\n//) { | 
| 457 | 31 |  |  |  |  | 102 | $self->{_command} = $1; | 
| 458 |  |  |  |  |  |  | } | 
| 459 | 31 |  |  |  |  | 130 | foreach my $line (split(/\n/, $raw_headers)) { | 
| 460 | 36 |  |  |  |  | 203 | my ($key, $value) = split(/\s*:\s*/, $line, 2); | 
| 461 |  |  |  |  |  |  | $self->{_headers}{$key} = $value | 
| 462 | 36 | 50 |  |  |  | 185 | unless defined $self->{_headers}{$key}; | 
| 463 |  |  |  |  |  |  | } | 
| 464 | 31 |  |  |  |  | 118 | return 1; | 
| 465 |  |  |  |  |  |  | } | 
| 466 | 102 |  |  |  |  | 289 | return 0; | 
| 467 |  |  |  |  |  |  | } | 
| 468 |  |  |  |  |  |  |  | 
| 469 |  |  |  |  |  |  | sub _read_body { | 
| 470 | 67 |  |  | 67 |  | 125 | my ($self) = @_; | 
| 471 |  |  |  |  |  |  |  | 
| 472 | 67 |  |  |  |  | 116 | my $h = $self->{_headers}; | 
| 473 | 67 | 100 |  |  |  | 280 | if ($h->{'content-length'}) { | 
|  |  | 100 |  |  |  |  |  | 
| 474 | 37 | 100 |  |  |  | 89 | if (length($self->{_framebuf}) > $h->{'content-length'}) { | 
| 475 | 5 |  |  |  |  | 8 | $self->{_framebuf_changed} = 1; | 
| 476 |  |  |  |  |  |  | my $body = substr($self->{_framebuf}, | 
| 477 |  |  |  |  |  |  | 0, | 
| 478 | 5 |  |  |  |  | 15 | $h->{'content-length'}, | 
| 479 |  |  |  |  |  |  | '' ); | 
| 480 |  |  |  |  |  |  |  | 
| 481 |  |  |  |  |  |  | # Trim the trailer off the frame. | 
| 482 | 5 |  |  |  |  | 24 | $self->{_framebuf} =~ s/^.*?\000\n*//s; | 
| 483 |  |  |  |  |  |  | return Net::Stomp::Frame->new({ | 
| 484 |  |  |  |  |  |  | command => delete $self->{_command}, | 
| 485 |  |  |  |  |  |  | headers => delete $self->{_headers}, | 
| 486 | 5 |  |  |  |  | 40 | body => $body | 
| 487 |  |  |  |  |  |  | }); | 
| 488 |  |  |  |  |  |  | } | 
| 489 |  |  |  |  |  |  | } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) { | 
| 490 |  |  |  |  |  |  | # No content-length header. | 
| 491 |  |  |  |  |  |  |  | 
| 492 | 26 |  |  |  |  | 113 | my $body = $1; | 
| 493 | 26 |  |  |  |  | 51 | $self->{_framebuf_changed} = 1; | 
| 494 |  |  |  |  |  |  | return Net::Stomp::Frame->new({ | 
| 495 |  |  |  |  |  |  | command => delete $self->{_command}, | 
| 496 |  |  |  |  |  |  | headers => delete $self->{_headers}, | 
| 497 | 26 |  |  |  |  | 175 | body => $body }); | 
| 498 |  |  |  |  |  |  | } | 
| 499 |  |  |  |  |  |  |  | 
| 500 | 36 |  |  |  |  | 75 | return 0; | 
| 501 |  |  |  |  |  |  | } | 
| 502 |  |  |  |  |  |  |  | 
| 503 |  |  |  |  |  |  | # this method is to stop the pointless warnings being thrown when trying to | 
| 504 |  |  |  |  |  |  | # call peername() on a closed socket, i.e. | 
| 505 |  |  |  |  |  |  | #   getpeername() on closed socket GEN125 at | 
| 506 |  |  |  |  |  |  | #   /opt/xt/xt-perl/lib/5.12.3/x86_64-linux/IO/Socket.pm line 258. | 
| 507 |  |  |  |  |  |  | # | 
| 508 |  |  |  |  |  |  | # solution taken from: | 
| 509 |  |  |  |  |  |  | # http://objectmix.com/perl/80545-warning-getpeername.html | 
| 510 |  |  |  |  |  |  | sub _connected { | 
| 511 | 164 |  |  | 164 |  | 275 | my $self = shift; | 
| 512 |  |  |  |  |  |  |  | 
| 513 | 164 | 100 | 100 |  |  | 651 | return if $self->{_pid} != $$ and $self->reconnect_on_fork; | 
| 514 |  |  |  |  |  |  |  | 
| 515 | 163 |  |  |  |  | 261 | my $connected; | 
| 516 |  |  |  |  |  |  | { | 
| 517 | 163 |  |  |  |  | 214 | local $^W = 0; | 
|  | 163 |  |  |  |  | 578 |  | 
| 518 | 163 |  |  |  |  | 3149 | $connected = $self->socket->connected; | 
| 519 |  |  |  |  |  |  | } | 
| 520 | 163 |  |  |  |  | 1583 | return $connected; | 
| 521 |  |  |  |  |  |  | } | 
| 522 |  |  |  |  |  |  |  | 
| 523 |  |  |  |  |  |  | sub receive_frame { | 
| 524 | 66 |  |  | 66 | 1 | 47912 | my ($self, $conf) = @_; | 
| 525 |  |  |  |  |  |  |  | 
| 526 | 66 |  |  |  |  | 1531 | $self->logger->trace('waiting to receive frame',$conf); | 
| 527 | 66 | 100 |  |  |  | 2952 | my $timeout = exists $conf->{timeout} ? $conf->{timeout} :  $self->timeout; | 
| 528 |  |  |  |  |  |  |  | 
| 529 | 66 | 100 |  |  |  | 430 | unless ($self->_connected) { | 
| 530 | 3 |  |  |  |  | 14 | $self->_reconnect; | 
| 531 |  |  |  |  |  |  | } | 
| 532 |  |  |  |  |  |  |  | 
| 533 | 66 |  |  |  |  | 119 | my $done = 0; | 
| 534 | 66 |  |  |  |  | 185 | while ( not $done = $self->_read_headers ) { | 
| 535 | 102 | 100 |  |  |  | 292 | return undef unless $self->_read_data($timeout); | 
| 536 |  |  |  |  |  |  | } | 
| 537 | 41 |  |  |  |  | 114 | while ( not $done = $self->_read_body ) { | 
| 538 | 36 | 100 |  |  |  | 68 | return undef unless $self->_read_data($timeout); | 
| 539 |  |  |  |  |  |  | } | 
| 540 |  |  |  |  |  |  |  | 
| 541 | 31 |  |  |  |  | 120 | return $done; | 
| 542 |  |  |  |  |  |  | } | 
| 543 |  |  |  |  |  |  |  | 
| 544 |  |  |  |  |  |  | sub _get_next_transaction { | 
| 545 | 13 |  |  | 13 |  | 25 | my $self = shift; | 
| 546 | 13 |  | 100 |  |  | 376 | my $serial = $self->serial || 0; | 
| 547 | 13 |  |  |  |  | 117 | $serial++; | 
| 548 | 13 |  |  |  |  | 242 | $self->serial($serial); | 
| 549 |  |  |  |  |  |  |  | 
| 550 | 13 |  | 100 |  |  | 295 | return ($self->session_id||'nosession') . '-' . $serial; | 
| 551 |  |  |  |  |  |  | } | 
| 552 |  |  |  |  |  |  |  | 
| 553 |  |  |  |  |  |  | 1; | 
| 554 |  |  |  |  |  |  |  | 
| 555 |  |  |  |  |  |  | __END__ |