File Coverage

blib/lib/Net/STOMP/Client/Connection.pm
Criterion Covered Total %
statement 64 141 45.3
branch 16 78 20.5
condition 0 13 0.0
subroutine 14 17 82.3
pod 1 1 100.0
total 95 250 38.0


line stmt bran cond sub pod time code
1             #+##############################################################################
2             # #
3             # File: Net/STOMP/Client/Connection.pm #
4             # #
5             # Description: Connection support for Net::STOMP::Client #
6             # #
7             #-##############################################################################
8              
9             #
10             # module definition
11             #
12              
13             package Net::STOMP::Client::Connection;
14 2     2   644 use strict;
  2         11  
  2         66  
15 2     2   12 use warnings;
  2         5  
  2         353  
16             our $VERSION = "2.5";
17             our $REVISION = sprintf("%d.%02d", q$Revision: 2.8 $ =~ /(\d+)\.(\d+)/);
18              
19             #
20             # used modules
21             #
22              
23 2     2   1269 use IO::Socket::IP qw();
  2         84229  
  2         74  
24 2     2   19 use List::Util qw(shuffle);
  2         5  
  2         273  
25 2     2   1316 use Net::STOMP::Client::Peer qw();
  2         7  
  2         71  
26 2     2   589 use No::Worries::Die qw(dief);
  2         9370  
  2         20  
27 2     2   1448 use No::Worries::File qw(file_read);
  2         32925  
  2         18  
28 2     2   1720 use No::Worries::Log qw(log_debug);
  2         24479  
  2         20  
29 2     2   359 use Params::Validate qw(validate :types);
  2         4  
  2         413  
30 2     2   16 use Time::HiRes qw();
  2         5  
  2         3719  
31              
32             #+++############################################################################
33             # #
34             # private helpers #
35             # #
36             #---############################################################################
37              
38             #
39             # convert a URI (with no options) to a peer object
40             #
41              
42             sub _uri2peer ($) {
43 16     16   33 my($uri) = @_;
44              
45 16 100       92 if ($uri =~ m{ ^ (tcp|ssl|stomp|stomp\+ssl)
46             \:\/\/ \[? ([_a-z0-9\.\-\:]+?) \]? \: (\d+) \/? $ }ix) {
47 13         58 return(Net::STOMP::Client::Peer->new(
48             proto => $1,
49             host => $2,
50             port => $3,
51             ));
52             } else {
53 3         17 dief("unexpected server uri: %s", $uri);
54             }
55             }
56              
57             #
58             # set the default connection options
59             #
60              
61             sub _default_options ($) {
62 4     4   10 my($option) = @_;
63              
64 4         11 $option->{randomize} = 1;
65 4         11 $option->{sleep} = 0.01;
66 4         9 $option->{max_sleep} = 30;
67 4         9 $option->{multiplier} = 2;
68             }
69              
70             #
71             # parse an option string (we do not complain about unknown options)
72             #
73              
74             sub _parse_options ($$) {
75 1     1   4 my($option, $string) = @_;
76              
77 1 50       8 if ($string =~ /\b(backOffMultiplier=(\d+(\.\d+)?))\b/) {
78 0         0 $option->{multiplier} = $2;
79             }
80 1 50       5 if ($string =~ /\b(useExponentialBackOff=false)\b/) {
81 0         0 $option->{multiplier} = 0;
82             }
83 1 50       40 if ($string =~ /\b(randomize=false)\b/) {
84 1         4 $option->{randomize} = 0;
85             }
86 1 50       8 if ($string =~ /\b(initialReconnectDelay=(\d+))\b/) {
87 1         7 $option->{sleep} = $2 / 1000;
88             }
89 1 50       17 if ($string =~ /\b(maxReconnectDelay=(\d+))\b/) {
90 0         0 $option->{max_sleep} = $2 / 1000;
91             }
92 1 50       8 if ($string =~ /\b(maxReconnectAttempts=(\d+))\b/) {
93 1         5 $option->{max_attempt} = $2 + 1;
94             }
95             }
96              
97             #
98             # parse a connection URI
99             #
100             # supported URIs:
101             # - tcp://foo:12
102             # - file:/foo/bar
103             # - ActiveMQ failover URIs
104             #
105              
106             sub _parse_uri ($) {
107 12     12   3851 my($uri) = @_;
108 12         26 my(@peers, %option, @list);
109              
110 12         18 while (1) {
111 12 50       79 if ($uri =~ /^file:(.+)$/) {
    100          
    100          
112             # list of URIs stored in a file, one per line
113 0         0 @list = ();
114 0         0 foreach my $line (split(/\n/, file_read($1))) {
115 0         0 $line =~ s/\#.*//;
116 0         0 $line =~ s/\s+//g;
117 0 0       0 push(@list, $line) if length($line);
118             }
119 0 0       0 if (@list == 1) {
120             # if only one, allow failover syntax for it
121 0         0 $uri = shift(@list);
122             } else {
123             # otherwise, they must be simple URIs
124 0         0 @peers = map(_uri2peer($_), @list);
125 0         0 last;
126             }
127             } elsif ($uri =~ m{ ^ failover \: (?:\/\/)? \( ([_a-z0-9\.\-\:\/\,]+) \)
128             ( \? [_a-z0-9\.\=\-\&]+ ) ? $ }ix) {
129             # failover with options
130 3         13 _default_options(\%option);
131 3 100       16 _parse_options(\%option, $2) if $2;
132 3         18 @peers = map(_uri2peer($_), split(/,/, $1));
133 3         8 last;
134             } elsif ($uri =~ m{ ^ failover \: ([_a-z0-9\.\-\:\/\,]+) $ }ix) {
135             # failover without options
136 1         8 _default_options(\%option);
137 1         7 @peers = map(_uri2peer($_), split(/,/, $1));
138 1         4 last;
139             } else {
140             # otherwise this must be a simple URI
141 8         23 @peers = (_uri2peer($uri));
142 5         12 last;
143             }
144             }
145 9 50       26 dief("empty server uri: %s", $uri) unless @peers;
146 9         27 return(\@peers, \%option);
147             }
148              
149             #
150             # attempt to connect to one peer (low level)
151             #
152              
153             sub _attempt ($%) {
154 0     0     my($peer, %sockopt) = @_;
155 0           my($socket);
156              
157             # options sanity
158 0           $sockopt{Proto} = "tcp"; # yes, even SSL is TCP...
159 0           $sockopt{PeerAddr} = $peer->host();
160 0           $sockopt{PeerPort} = $peer->port();
161             # try to connect
162 0 0         if ($peer->proto() =~ /\b(ssl)\b/) {
163             # with SSL
164 0 0         unless ($IO::Socket::SSL::VERSION) {
165 0           eval { require IO::Socket::SSL };
  0            
166 0 0         return(sprintf("cannot load IO::Socket::SSL: %s", $@)) if $@;
167             }
168 0           $socket = IO::Socket::SSL->new(%sockopt);
169 0 0 0       return(sprintf("cannot SSL connect to %s:%d: %s", $peer->host(),
170             $peer->port(), IO::Socket::SSL::errstr()))
171             unless $socket and $socket->connected();
172             } else {
173             # with plain TCP
174 0           $socket = IO::Socket::IP->new(%sockopt);
175 0 0 0       return(sprintf("cannot connect to %s:%d: %s", $peer->host(),
176             $peer->port(), $!))
177             unless $socket and $socket->connected();
178 0 0         return(sprintf("cannot binmode(socket): %s", $!))
179             unless binmode($socket);
180             }
181             # so far so good...
182 0           @{ $peer }[3] = $socket->peerhost();
  0            
183 0           return($socket);
184             }
185              
186             #
187             # try to connect to a list of peers (high level)
188             #
189              
190             sub _try ($$$$) {
191 0     0     my($peers, $peeropt, $sockopt, $debug) = @_;
192 0           my(@list, $count, $result);
193              
194 0 0         dief("no peers given!") unless @{ $peers };
  0            
195 0           $count = 0;
196 0           while (1) {
197 0 0         @list = $peeropt->{randomize} ? shuffle(@{ $peers }) : @{ $peers };
  0            
  0            
198 0           foreach my $peer (@list) {
199 0           $result = _attempt($peer, %{ $sockopt });
  0            
200 0 0         if (ref($result)) {
201 0 0         log_debug("connect to %s ok: %s", $peer->uri(), $peer->addr())
202             if $debug =~ /\b(connection|all)\b/;
203 0           return($result, $peer);
204             } else {
205 0 0         log_debug("connect to %s failed: %s", $peer->uri(), $result)
206             if $debug =~ /\b(connection|all)\b/;
207             }
208 0           $count++;
209 0 0         if (defined($peeropt->{max_attempt})) {
210 0 0         last if $count >= $peeropt->{max_attempt};
211             }
212 0 0         if ($peeropt->{sleep}) {
213 0           Time::HiRes::sleep($peeropt->{sleep});
214 0 0         if ($peeropt->{multiplier}) {
215 0           $peeropt->{sleep} *= $peeropt->{multiplier};
216 0 0 0       if ($peeropt->{max_sleep} and
217             $peeropt->{sleep} > $peeropt->{max_sleep}) {
218 0           $peeropt->{sleep} = $peeropt->{max_sleep};
219 0           delete($peeropt->{multiplier});
220             }
221             }
222             }
223             }
224 0 0         if (defined($peeropt->{max_attempt})) {
225 0 0         last if $count >= $peeropt->{max_attempt};
226             }
227 0 0         last unless keys(%{ $peeropt });
  0            
228             }
229             # in case of failure, we only report the last error message...
230 0           dief($result);
231             }
232              
233             #+++############################################################################
234             # #
235             # public function #
236             # #
237             #---############################################################################
238              
239             my %new_options = (
240             "host" => {
241             optional => 1,
242             type => SCALAR,
243             regex => qr/^[a-z0-9\.\-\:]+$/,
244             },
245             "port" => {
246             optional => 1,
247             type => SCALAR,
248             regex => qr/^\d+$/,
249             },
250             "uri" => {
251             optional => 1,
252             type => SCALAR,
253             },
254             "sockopt" => {
255             optional => 1,
256             type => HASHREF,
257             },
258             "debug" => {
259             optional => 1,
260             type => UNDEF|SCALAR,
261             },
262             );
263              
264             sub new (@) {
265 0     0 1   my(%option, $proto, $peers, $peeropt);
266              
267 0           %option = validate(@_, \%new_options);
268 0   0       $option{sockopt} ||= {};
269 0   0       $option{debug} ||= "";
270 0 0         if (defined($option{uri})) {
271             # by URI
272             dief("unexpected server host: %s", $option{host})
273 0 0         if defined($option{host});
274             dief("unexpected server port: %s", $option{port})
275 0 0         if defined($option{port});
276 0           ($peers, $peeropt) = _parse_uri($option{uri});
277             } else {
278             # by host + port
279 0 0         dief("missing server host") unless defined($option{host});
280 0 0         dief("missing server port") unless defined($option{port});
281 0           $proto = "tcp";
282 0 0         $proto = "ssl" if grep(/^SSL_/, keys(%{ $option{sockopt} }));
  0            
283             $peers = [
284             Net::STOMP::Client::Peer->new(
285             proto => $proto,
286             host => $option{host},
287             port => $option{port},
288 0           ) ];
289 0           $peeropt = {};
290             }
291 0           return(_try($peers, $peeropt, $option{sockopt}, $option{debug}));
292             }
293              
294             1;
295              
296             __END__