line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
1
|
|
|
|
|
|
|
package Kafka::IO; |
2
|
|
|
|
|
|
|
|
3
|
|
|
|
|
|
|
=head1 NAME |
4
|
|
|
|
|
|
|
|
5
|
|
|
|
|
|
|
Kafka::IO - Interface to network communication with the Apache Kafka server. |
6
|
|
|
|
|
|
|
|
7
|
|
|
|
|
|
|
=head1 VERSION |
8
|
|
|
|
|
|
|
|
9
|
|
|
|
|
|
|
This documentation refers to C version 1.06 . |
10
|
|
|
|
|
|
|
|
11
|
|
|
|
|
|
|
=cut |
12
|
|
|
|
|
|
|
|
13
|
|
|
|
|
|
|
|
14
|
|
|
|
|
|
|
|
15
|
15
|
|
|
15
|
|
176859
|
use 5.010; |
|
15
|
|
|
|
|
64
|
|
16
|
15
|
|
|
15
|
|
107
|
use strict; |
|
15
|
|
|
|
|
34
|
|
|
15
|
|
|
|
|
408
|
|
17
|
15
|
|
|
15
|
|
90
|
use warnings; |
|
15
|
|
|
|
|
31
|
|
|
15
|
|
|
|
|
983
|
|
18
|
|
|
|
|
|
|
|
19
|
|
|
|
|
|
|
|
20
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
our $DEBUG = 0; |
22
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
our $VERSION = '1.06'; |
24
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
|
26
|
|
|
|
|
|
|
|
27
|
15
|
|
|
15
|
|
94
|
use Carp; |
|
15
|
|
|
|
|
36
|
|
|
15
|
|
|
|
|
880
|
|
28
|
15
|
|
|
15
|
|
98
|
use Config; |
|
15
|
|
|
|
|
34
|
|
|
15
|
|
|
|
|
652
|
|
29
|
15
|
|
|
15
|
|
107
|
use Const::Fast; |
|
15
|
|
|
|
|
34
|
|
|
15
|
|
|
|
|
133
|
|
30
|
15
|
|
|
|
|
975
|
use Data::Validate::Domain qw( |
31
|
|
|
|
|
|
|
is_hostname |
32
|
15
|
|
|
15
|
|
7305
|
); |
|
15
|
|
|
|
|
155549
|
|
33
|
15
|
|
|
|
|
1546
|
use Data::Validate::IP qw( |
34
|
|
|
|
|
|
|
is_ipv4 |
35
|
|
|
|
|
|
|
is_ipv6 |
36
|
15
|
|
|
15
|
|
7705
|
); |
|
15
|
|
|
|
|
381124
|
|
37
|
15
|
|
|
|
|
1187
|
use Errno qw( |
38
|
|
|
|
|
|
|
EAGAIN |
39
|
|
|
|
|
|
|
ECONNRESET |
40
|
|
|
|
|
|
|
EINTR |
41
|
|
|
|
|
|
|
EWOULDBLOCK |
42
|
|
|
|
|
|
|
ETIMEDOUT |
43
|
15
|
|
|
15
|
|
3308
|
); |
|
15
|
|
|
|
|
9661
|
|
44
|
15
|
|
|
15
|
|
104
|
use Fcntl; |
|
15
|
|
|
|
|
35
|
|
|
15
|
|
|
|
|
3620
|
|
45
|
15
|
|
|
15
|
|
8777
|
use IO::Select; |
|
15
|
|
|
|
|
25480
|
|
|
15
|
|
|
|
|
766
|
|
46
|
15
|
|
|
|
|
839
|
use Params::Util qw( |
47
|
|
|
|
|
|
|
_STRING |
48
|
15
|
|
|
15
|
|
1958
|
); |
|
15
|
|
|
|
|
9054
|
|
49
|
15
|
|
|
|
|
126
|
use POSIX qw( |
50
|
|
|
|
|
|
|
ceil |
51
|
15
|
|
|
15
|
|
3159
|
); |
|
15
|
|
|
|
|
44082
|
|
52
|
15
|
|
|
|
|
1032
|
use Scalar::Util qw( |
53
|
|
|
|
|
|
|
dualvar |
54
|
15
|
|
|
15
|
|
10319
|
); |
|
15
|
|
|
|
|
35
|
|
55
|
15
|
|
|
|
|
1973
|
use Socket qw( |
56
|
|
|
|
|
|
|
AF_INET |
57
|
|
|
|
|
|
|
AF_INET6 |
58
|
|
|
|
|
|
|
IPPROTO_TCP |
59
|
|
|
|
|
|
|
MSG_DONTWAIT |
60
|
|
|
|
|
|
|
MSG_PEEK |
61
|
|
|
|
|
|
|
NI_NUMERICHOST |
62
|
|
|
|
|
|
|
NIx_NOSERV |
63
|
|
|
|
|
|
|
PF_INET |
64
|
|
|
|
|
|
|
PF_INET6 |
65
|
|
|
|
|
|
|
SOCK_STREAM |
66
|
|
|
|
|
|
|
SOL_SOCKET |
67
|
|
|
|
|
|
|
SO_ERROR |
68
|
|
|
|
|
|
|
SO_RCVTIMEO |
69
|
|
|
|
|
|
|
SO_SNDTIMEO |
70
|
|
|
|
|
|
|
getaddrinfo |
71
|
|
|
|
|
|
|
getnameinfo |
72
|
|
|
|
|
|
|
inet_aton |
73
|
|
|
|
|
|
|
inet_pton |
74
|
|
|
|
|
|
|
inet_ntop |
75
|
|
|
|
|
|
|
pack_sockaddr_in |
76
|
|
|
|
|
|
|
pack_sockaddr_in6 |
77
|
15
|
|
|
15
|
|
107
|
); |
|
15
|
|
|
|
|
68
|
|
78
|
15
|
|
|
|
|
910
|
use Sys::SigAction qw( |
79
|
|
|
|
|
|
|
set_sig_handler |
80
|
15
|
|
|
15
|
|
7464
|
); |
|
15
|
|
|
|
|
48387
|
|
81
|
15
|
|
|
15
|
|
119
|
use Time::HiRes (); |
|
15
|
|
|
|
|
37
|
|
|
15
|
|
|
|
|
312
|
|
82
|
15
|
|
|
15
|
|
1804
|
use Try::Tiny; |
|
15
|
|
|
|
|
6461
|
|
|
15
|
|
|
|
|
1034
|
|
83
|
|
|
|
|
|
|
|
84
|
15
|
|
|
|
|
2436
|
use Kafka qw( |
85
|
|
|
|
|
|
|
$ERROR_CANNOT_BIND |
86
|
|
|
|
|
|
|
$ERROR_CANNOT_RECV |
87
|
|
|
|
|
|
|
$ERROR_CANNOT_SEND |
88
|
|
|
|
|
|
|
$ERROR_MISMATCH_ARGUMENT |
89
|
|
|
|
|
|
|
$ERROR_INCOMPATIBLE_HOST_IP_VERSION |
90
|
|
|
|
|
|
|
$ERROR_NO_CONNECTION |
91
|
|
|
|
|
|
|
$IP_V4 |
92
|
|
|
|
|
|
|
$IP_V6 |
93
|
|
|
|
|
|
|
$KAFKA_SERVER_PORT |
94
|
|
|
|
|
|
|
$REQUEST_TIMEOUT |
95
|
15
|
|
|
15
|
|
101
|
); |
|
15
|
|
|
|
|
35
|
|
96
|
15
|
|
|
15
|
|
3822
|
use Kafka::Exceptions; |
|
15
|
|
|
|
|
50
|
|
|
15
|
|
|
|
|
955
|
|
97
|
15
|
|
|
|
|
47260
|
use Kafka::Internals qw( |
98
|
|
|
|
|
|
|
$MAX_SOCKET_REQUEST_BYTES |
99
|
|
|
|
|
|
|
debug_level |
100
|
|
|
|
|
|
|
format_message |
101
|
15
|
|
|
15
|
|
106
|
); |
|
15
|
|
|
|
|
32
|
|
102
|
|
|
|
|
|
|
|
103
|
|
|
|
|
|
|
|
104
|
|
|
|
|
|
|
|
105
|
|
|
|
|
|
|
=head1 SYNOPSIS |
106
|
|
|
|
|
|
|
|
107
|
|
|
|
|
|
|
use 5.010; |
108
|
|
|
|
|
|
|
use strict; |
109
|
|
|
|
|
|
|
use warnings; |
110
|
|
|
|
|
|
|
|
111
|
|
|
|
|
|
|
use Scalar::Util qw( |
112
|
|
|
|
|
|
|
blessed |
113
|
|
|
|
|
|
|
); |
114
|
|
|
|
|
|
|
use Try::Tiny; |
115
|
|
|
|
|
|
|
|
116
|
|
|
|
|
|
|
use Kafka::IO; |
117
|
|
|
|
|
|
|
|
118
|
|
|
|
|
|
|
my $io; |
119
|
|
|
|
|
|
|
try { |
120
|
|
|
|
|
|
|
$io = Kafka::IO->new( host => 'localhost' ); |
121
|
|
|
|
|
|
|
} catch { |
122
|
|
|
|
|
|
|
my $error = $_; |
123
|
|
|
|
|
|
|
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) { |
124
|
|
|
|
|
|
|
warn 'Error: (', $error->code, ') ', $error->message, "\n"; |
125
|
|
|
|
|
|
|
exit; |
126
|
|
|
|
|
|
|
} else { |
127
|
|
|
|
|
|
|
die $error; |
128
|
|
|
|
|
|
|
} |
129
|
|
|
|
|
|
|
}; |
130
|
|
|
|
|
|
|
|
131
|
|
|
|
|
|
|
# Closes and cleans up |
132
|
|
|
|
|
|
|
$io->close; |
133
|
|
|
|
|
|
|
undef $io; |
134
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
=head1 DESCRIPTION |
136
|
|
|
|
|
|
|
|
137
|
|
|
|
|
|
|
This module is private and should not be used directly. |
138
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
In order to achieve better performance, methods of this module do not |
140
|
|
|
|
|
|
|
perform arguments validation. |
141
|
|
|
|
|
|
|
|
142
|
|
|
|
|
|
|
The main features of the C class are: |
143
|
|
|
|
|
|
|
|
144
|
|
|
|
|
|
|
=over 3 |
145
|
|
|
|
|
|
|
|
146
|
|
|
|
|
|
|
=item * |
147
|
|
|
|
|
|
|
|
148
|
|
|
|
|
|
|
Provides an object oriented API for communication with Kafka. |
149
|
|
|
|
|
|
|
|
150
|
|
|
|
|
|
|
=item * |
151
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
This class allows you to create Kafka 0.9+ clients. |
153
|
|
|
|
|
|
|
|
154
|
|
|
|
|
|
|
=back |
155
|
|
|
|
|
|
|
|
156
|
|
|
|
|
|
|
=cut |
157
|
|
|
|
|
|
|
|
158
|
|
|
|
|
|
|
# Hard limit of IO operation retry attempts, to prevent high CPU usage in IO retry loop |
159
|
|
|
|
|
|
|
const my $MAX_RETRIES => 30; |
160
|
|
|
|
|
|
|
|
161
|
|
|
|
|
|
|
our $_hdr; |
162
|
|
|
|
|
|
|
|
163
|
|
|
|
|
|
|
#-- constructor ---------------------------------------------------------------- |
164
|
|
|
|
|
|
|
|
165
|
|
|
|
|
|
|
=head2 CONSTRUCTOR |
166
|
|
|
|
|
|
|
|
167
|
|
|
|
|
|
|
=head3 C |
168
|
|
|
|
|
|
|
|
169
|
|
|
|
|
|
|
Establishes TCP connection to given host and port, creates and returns C IO object. |
170
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
C takes arguments in key-value pairs. The following arguments are currently recognized: |
172
|
|
|
|
|
|
|
|
173
|
|
|
|
|
|
|
=over 3 |
174
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
=item C $host> |
176
|
|
|
|
|
|
|
|
177
|
|
|
|
|
|
|
C<$host> is Kafka host to connect to. It can be a host name or an IP-address in |
178
|
|
|
|
|
|
|
IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1'). |
179
|
|
|
|
|
|
|
|
180
|
|
|
|
|
|
|
=item C $port> |
181
|
|
|
|
|
|
|
|
182
|
|
|
|
|
|
|
Optional, default = C<$KAFKA_SERVER_PORT>. |
183
|
|
|
|
|
|
|
|
184
|
|
|
|
|
|
|
C<$port> is integer attribute denoting the port number of to access Apache Kafka. |
185
|
|
|
|
|
|
|
|
186
|
|
|
|
|
|
|
C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported |
187
|
|
|
|
|
|
|
from the L module. |
188
|
|
|
|
|
|
|
|
189
|
|
|
|
|
|
|
=item C $timeout> |
190
|
|
|
|
|
|
|
|
191
|
|
|
|
|
|
|
C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L module. |
192
|
|
|
|
|
|
|
|
193
|
|
|
|
|
|
|
Special behavior when C is set to C: |
194
|
|
|
|
|
|
|
|
195
|
|
|
|
|
|
|
=back |
196
|
|
|
|
|
|
|
|
197
|
|
|
|
|
|
|
=over 3 |
198
|
|
|
|
|
|
|
|
199
|
|
|
|
|
|
|
=item * |
200
|
|
|
|
|
|
|
|
201
|
|
|
|
|
|
|
Alarms are not used internally (namely when performing C). |
202
|
|
|
|
|
|
|
|
203
|
|
|
|
|
|
|
=item * |
204
|
|
|
|
|
|
|
|
205
|
|
|
|
|
|
|
Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations. |
206
|
|
|
|
|
|
|
|
207
|
|
|
|
|
|
|
=back |
208
|
|
|
|
|
|
|
|
209
|
|
|
|
|
|
|
=over 3 |
210
|
|
|
|
|
|
|
|
211
|
|
|
|
|
|
|
=item C $ip_version> |
212
|
|
|
|
|
|
|
|
213
|
|
|
|
|
|
|
Force version of IP protocol for resolving host name (or interpretation of passed address). |
214
|
|
|
|
|
|
|
|
215
|
|
|
|
|
|
|
Optional, undefined by default, which works in the following way: version of IP address |
216
|
|
|
|
|
|
|
is detected automatically, host name is resolved into IPv4 address. |
217
|
|
|
|
|
|
|
|
218
|
|
|
|
|
|
|
See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6> |
219
|
|
|
|
|
|
|
in C L. |
220
|
|
|
|
|
|
|
|
221
|
|
|
|
|
|
|
=back |
222
|
|
|
|
|
|
|
|
223
|
|
|
|
|
|
|
=cut |
224
|
|
|
|
|
|
|
sub new { |
225
|
13
|
|
|
13
|
1
|
16794
|
my ( $class, %p ) = @_; |
226
|
|
|
|
|
|
|
|
227
|
13
|
|
|
|
|
211
|
my $self = bless { |
228
|
|
|
|
|
|
|
host => '', |
229
|
|
|
|
|
|
|
timeout => $REQUEST_TIMEOUT, |
230
|
|
|
|
|
|
|
port => $KAFKA_SERVER_PORT, |
231
|
|
|
|
|
|
|
ip_version => undef, |
232
|
|
|
|
|
|
|
af => '', # Address family constant |
233
|
|
|
|
|
|
|
pf => '', # Protocol family constant |
234
|
|
|
|
|
|
|
ip => '', # Human-readable textual representation of the ip address |
235
|
|
|
|
|
|
|
}, $class; |
236
|
|
|
|
|
|
|
|
237
|
13
|
|
66
|
|
|
244
|
exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self; |
238
|
|
|
|
|
|
|
|
239
|
|
|
|
|
|
|
# we trust it: make it untainted |
240
|
13
|
|
|
|
|
160
|
( $self->{host} ) = $self->{host} =~ /\A(.+)\z/; |
241
|
13
|
|
|
|
|
124
|
( $self->{port} ) = $self->{port} =~ /\A(.+)\z/; |
242
|
|
|
|
|
|
|
|
243
|
13
|
|
|
|
|
74
|
$self->{socket} = undef; |
244
|
13
|
|
|
|
|
53
|
$self->{_io_select} = undef; |
245
|
13
|
|
|
|
|
38
|
my $error; |
246
|
|
|
|
|
|
|
try { |
247
|
13
|
|
|
13
|
|
2004
|
$self->_connect(); |
248
|
|
|
|
|
|
|
} catch { |
249
|
4
|
|
|
4
|
|
2027
|
$error = $_; |
250
|
13
|
|
|
|
|
212
|
}; |
251
|
|
|
|
|
|
|
|
252
|
13
|
100
|
|
|
|
550
|
$self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) ) |
253
|
|
|
|
|
|
|
if defined $error |
254
|
|
|
|
|
|
|
; |
255
|
9
|
|
|
|
|
4193
|
return $self; |
256
|
|
|
|
|
|
|
} |
257
|
|
|
|
|
|
|
|
258
|
|
|
|
|
|
|
#-- public attributes ---------------------------------------------------------- |
259
|
|
|
|
|
|
|
|
260
|
|
|
|
|
|
|
=head2 METHODS |
261
|
|
|
|
|
|
|
|
262
|
|
|
|
|
|
|
The following methods are provided by C class: |
263
|
|
|
|
|
|
|
|
264
|
|
|
|
|
|
|
=cut |
265
|
|
|
|
|
|
|
|
266
|
|
|
|
|
|
|
=head3 C<< send( $message <, $timeout> ) >> |
267
|
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
Sends a C<$message> to Kafka. |
269
|
|
|
|
|
|
|
|
270
|
|
|
|
|
|
|
The argument must be a bytes string. |
271
|
|
|
|
|
|
|
|
272
|
|
|
|
|
|
|
Use optional C<$timeout> argument to override default timeout for this request only. |
273
|
|
|
|
|
|
|
|
274
|
|
|
|
|
|
|
Returns the number of characters sent. |
275
|
|
|
|
|
|
|
|
276
|
|
|
|
|
|
|
=cut |
277
|
|
|
|
|
|
|
sub send { |
278
|
1
|
|
|
1
|
1
|
85
|
my ( $self, $message, $timeout ) = @_; |
279
|
1
|
50
|
|
|
|
8
|
$self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' ) |
280
|
|
|
|
|
|
|
unless defined( _STRING( $message ) ) |
281
|
|
|
|
|
|
|
; |
282
|
1
|
|
|
|
|
3
|
my $length = length( $message ); |
283
|
1
|
50
|
|
|
|
5
|
$self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' ) |
284
|
|
|
|
|
|
|
unless $length <= $MAX_SOCKET_REQUEST_BYTES |
285
|
|
|
|
|
|
|
; |
286
|
1
|
50
|
33
|
|
|
7
|
$timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout; |
287
|
1
|
50
|
|
|
|
6
|
$self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' ) |
288
|
|
|
|
|
|
|
unless $timeout > 0 |
289
|
|
|
|
|
|
|
; |
290
|
1
|
|
|
|
|
3
|
my $select = $self->{_io_select}; |
291
|
1
|
50
|
|
|
|
4
|
$self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select; |
292
|
|
|
|
|
|
|
|
293
|
1
|
50
|
|
|
|
6
|
$self->_debug_msg( $message, 'Request to', 'green' ) |
294
|
|
|
|
|
|
|
if $self->debug_level >= 2 |
295
|
|
|
|
|
|
|
; |
296
|
1
|
|
|
|
|
4
|
my $sent = 0; |
297
|
|
|
|
|
|
|
|
298
|
1
|
|
|
|
|
5
|
my $started = Time::HiRes::time(); |
299
|
1
|
|
|
|
|
4
|
my $until = $started + $timeout; |
300
|
|
|
|
|
|
|
|
301
|
1
|
|
|
|
|
2
|
my $error_code; |
302
|
|
|
|
|
|
|
my $errno; |
303
|
1
|
|
|
|
|
3
|
my $retries = 0; |
304
|
1
|
|
|
|
|
2
|
my $interrupts = 0; |
305
|
1
|
|
66
|
|
|
15
|
ATTEMPT: while ( $sent < $length && $retries++ < $MAX_RETRIES ) { |
306
|
1
|
|
|
|
|
6
|
my $remaining_time = $until - Time::HiRes::time(); |
307
|
1
|
50
|
|
|
|
5
|
last ATTEMPT if $remaining_time <= 0; # timeout expired |
308
|
|
|
|
|
|
|
|
309
|
1
|
|
|
|
|
5
|
undef $!; |
310
|
1
|
|
|
|
|
7
|
my $can_write = $select->can_write( $remaining_time ); |
311
|
1
|
|
|
|
|
64
|
$errno = $!; |
312
|
1
|
50
|
|
|
|
5
|
if ( $errno ) { |
313
|
0
|
0
|
|
|
|
0
|
if ( $errno == EINTR ) { |
314
|
0
|
|
|
|
|
0
|
undef $errno; |
315
|
0
|
|
|
|
|
0
|
--$retries; # this attempt does not count |
316
|
0
|
|
|
|
|
0
|
++$interrupts; |
317
|
0
|
|
|
|
|
0
|
next ATTEMPT; |
318
|
|
|
|
|
|
|
} |
319
|
|
|
|
|
|
|
|
320
|
0
|
|
|
|
|
0
|
$self->close; |
321
|
|
|
|
|
|
|
|
322
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
323
|
|
|
|
|
|
|
} |
324
|
|
|
|
|
|
|
|
325
|
1
|
50
|
|
|
|
84
|
if ( $can_write ) { |
326
|
|
|
|
|
|
|
# check for EOF on the first attempt only |
327
|
1
|
50
|
33
|
|
|
9
|
if ( $retries == 1 && $self->_is_close_wait ) { |
328
|
0
|
|
|
|
|
0
|
$self->close; |
329
|
0
|
|
|
|
|
0
|
$error_code = $ERROR_NO_CONNECTION; |
330
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
331
|
|
|
|
|
|
|
} |
332
|
|
|
|
|
|
|
|
333
|
1
|
|
|
|
|
24
|
undef $!; |
334
|
1
|
|
|
|
|
130
|
my $wrote = CORE::send( $self->{socket}, $message, MSG_DONTWAIT ); |
335
|
1
|
|
|
|
|
58
|
$errno = $!; |
336
|
|
|
|
|
|
|
|
337
|
1
|
50
|
33
|
|
|
14
|
if( defined $wrote && $wrote > 0 ) { |
338
|
1
|
|
|
|
|
2
|
$sent += $wrote; |
339
|
1
|
50
|
|
|
|
3
|
if ( $sent < $length ) { |
340
|
|
|
|
|
|
|
# remove written data from message |
341
|
0
|
|
|
|
|
0
|
$message = substr( $message, $wrote ); |
342
|
|
|
|
|
|
|
} |
343
|
|
|
|
|
|
|
} |
344
|
|
|
|
|
|
|
|
345
|
1
|
50
|
|
|
|
4
|
if( $errno ) { |
346
|
0
|
0
|
0
|
|
|
0
|
if( $errno == EINTR ) { |
|
|
0
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
347
|
0
|
|
|
|
|
0
|
undef $errno; |
348
|
0
|
|
|
|
|
0
|
--$retries; # this attempt does not count |
349
|
0
|
|
|
|
|
0
|
++$interrupts; |
350
|
0
|
|
|
|
|
0
|
next ATTEMPT; |
351
|
|
|
|
|
|
|
} elsif ( |
352
|
|
|
|
|
|
|
$errno != EAGAIN |
353
|
|
|
|
|
|
|
&& $errno != EWOULDBLOCK |
354
|
|
|
|
|
|
|
## on freebsd, if we got ECONNRESET, it's a timeout from the other side |
355
|
|
|
|
|
|
|
&& !( $errno == ECONNRESET && $^O eq 'freebsd' ) |
356
|
|
|
|
|
|
|
) { |
357
|
0
|
|
|
|
|
0
|
$self->close; |
358
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
359
|
|
|
|
|
|
|
} |
360
|
|
|
|
|
|
|
} |
361
|
|
|
|
|
|
|
|
362
|
1
|
50
|
|
|
|
5
|
last ATTEMPT unless defined $wrote; |
363
|
|
|
|
|
|
|
} |
364
|
|
|
|
|
|
|
} |
365
|
|
|
|
|
|
|
|
366
|
1
|
50
|
33
|
|
|
15
|
unless( !$errno && defined( $sent ) && $sent == $length ) |
|
|
|
33
|
|
|
|
|
367
|
|
|
|
|
|
|
{ |
368
|
|
|
|
|
|
|
$self->_error( |
369
|
|
|
|
|
|
|
$error_code // $ERROR_CANNOT_SEND, |
370
|
|
|
|
|
|
|
format_message( "Kafka::IO(%s)->send: ERRNO=%s ERROR='%s' (length=%s, sent=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)", |
371
|
|
|
|
|
|
|
$self->{host}, |
372
|
0
|
|
0
|
|
|
0
|
( $errno // 0 ) + 0, |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
373
|
|
|
|
|
|
|
( $errno // '' ) . '', |
374
|
|
|
|
|
|
|
$length, |
375
|
|
|
|
|
|
|
$sent, |
376
|
|
|
|
|
|
|
$timeout, |
377
|
|
|
|
|
|
|
$retries, |
378
|
|
|
|
|
|
|
$interrupts, |
379
|
|
|
|
|
|
|
Time::HiRes::time() - $started, |
380
|
|
|
|
|
|
|
) |
381
|
|
|
|
|
|
|
); |
382
|
|
|
|
|
|
|
} |
383
|
|
|
|
|
|
|
|
384
|
1
|
|
|
|
|
8
|
return $sent; |
385
|
|
|
|
|
|
|
} |
386
|
|
|
|
|
|
|
|
387
|
|
|
|
|
|
|
=head3 C<< receive( $length <, $timeout> ) >> |
388
|
|
|
|
|
|
|
|
389
|
|
|
|
|
|
|
Receives a message up to C<$length> size from Kafka. |
390
|
|
|
|
|
|
|
|
391
|
|
|
|
|
|
|
C<$length> argument must be a positive number. |
392
|
|
|
|
|
|
|
|
393
|
|
|
|
|
|
|
Use optional C<$timeout> argument to override default timeout for this call only. |
394
|
|
|
|
|
|
|
|
395
|
|
|
|
|
|
|
Returns a reference to the received message. |
396
|
|
|
|
|
|
|
|
397
|
|
|
|
|
|
|
=cut |
398
|
|
|
|
|
|
|
sub receive { |
399
|
1
|
|
|
1
|
1
|
1663
|
my ( $self, $length, $timeout ) = @_; |
400
|
1
|
50
|
|
|
|
6
|
$self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' ) |
401
|
|
|
|
|
|
|
unless $length > 0 |
402
|
|
|
|
|
|
|
; |
403
|
1
|
50
|
33
|
|
|
8
|
$timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout; |
404
|
1
|
50
|
|
|
|
6
|
$self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' ) |
405
|
|
|
|
|
|
|
unless $timeout > 0 |
406
|
|
|
|
|
|
|
; |
407
|
1
|
|
|
|
|
11
|
my $select = $self->{_io_select}; |
408
|
1
|
50
|
|
|
|
4
|
$self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select; |
409
|
|
|
|
|
|
|
|
410
|
1
|
|
|
|
|
9
|
my $message = ''; |
411
|
1
|
|
|
|
|
3
|
my $len_to_read = $length; |
412
|
|
|
|
|
|
|
|
413
|
1
|
|
|
|
|
6
|
my $started = Time::HiRes::time(); |
414
|
1
|
|
|
|
|
2
|
my $until = $started + $timeout; |
415
|
|
|
|
|
|
|
|
416
|
1
|
|
|
|
|
3
|
my $error_code; |
417
|
|
|
|
|
|
|
my $errno; |
418
|
1
|
|
|
|
|
2
|
my $retries = 0; |
419
|
1
|
|
|
|
|
2
|
my $interrupts = 0; |
420
|
1
|
|
66
|
|
|
15
|
ATTEMPT: while ( $len_to_read > 0 && $retries++ < $MAX_RETRIES ) { |
421
|
1
|
|
|
|
|
5
|
my $remaining_time = $until - Time::HiRes::time(); |
422
|
1
|
50
|
|
|
|
38
|
last if $remaining_time <= 0; # timeout expired |
423
|
|
|
|
|
|
|
|
424
|
1
|
|
|
|
|
5
|
undef $!; |
425
|
1
|
|
|
|
|
6
|
my $can_read = $select->can_read( $remaining_time ); |
426
|
1
|
|
|
|
|
48
|
$errno = $!; |
427
|
1
|
50
|
|
|
|
5
|
if ( $errno ) { |
428
|
0
|
0
|
|
|
|
0
|
if ( $errno == EINTR ) { |
429
|
0
|
|
|
|
|
0
|
undef $errno; |
430
|
0
|
|
|
|
|
0
|
--$retries; # this attempt does not count |
431
|
0
|
|
|
|
|
0
|
++$interrupts; |
432
|
0
|
|
|
|
|
0
|
next ATTEMPT; |
433
|
|
|
|
|
|
|
} |
434
|
|
|
|
|
|
|
|
435
|
0
|
|
|
|
|
0
|
$self->close; |
436
|
|
|
|
|
|
|
|
437
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
438
|
|
|
|
|
|
|
} |
439
|
|
|
|
|
|
|
|
440
|
1
|
50
|
|
|
|
4
|
if ( $can_read ) { |
441
|
1
|
|
|
|
|
2
|
my $buf = ''; |
442
|
1
|
|
|
|
|
3
|
undef $!; |
443
|
1
|
|
|
|
|
21
|
my $from_recv = CORE::recv( $self->{socket}, $buf, $len_to_read, MSG_DONTWAIT ); |
444
|
1
|
|
|
|
|
5
|
$errno = $!; |
445
|
|
|
|
|
|
|
|
446
|
1
|
50
|
33
|
|
|
20
|
if ( defined( $from_recv ) && length( $buf ) ) { |
447
|
1
|
|
|
|
|
3
|
$message .= $buf; |
448
|
1
|
|
|
|
|
2
|
$len_to_read = $length - length( $message ); |
449
|
1
|
|
|
|
|
3
|
--$retries; # this attempt was successful, don't count as a retry |
450
|
|
|
|
|
|
|
} |
451
|
1
|
50
|
|
|
|
3
|
if ( $errno ) { |
452
|
0
|
0
|
0
|
|
|
0
|
if ( $errno == EINTR ) { |
|
|
0
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
453
|
0
|
|
|
|
|
0
|
undef $errno; |
454
|
0
|
|
|
|
|
0
|
--$retries; # this attempt does not count |
455
|
0
|
|
|
|
|
0
|
++$interrupts; |
456
|
0
|
|
|
|
|
0
|
next ATTEMPT; |
457
|
|
|
|
|
|
|
} elsif ( |
458
|
|
|
|
|
|
|
$errno != EAGAIN |
459
|
|
|
|
|
|
|
&& $errno != EWOULDBLOCK |
460
|
|
|
|
|
|
|
## on freebsd, if we got ECONNRESET, it's a timeout from the other side |
461
|
|
|
|
|
|
|
&& !( $errno == ECONNRESET && $^O eq 'freebsd' ) |
462
|
|
|
|
|
|
|
) { |
463
|
0
|
|
|
|
|
0
|
$self->close; |
464
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
465
|
|
|
|
|
|
|
} |
466
|
|
|
|
|
|
|
} |
467
|
|
|
|
|
|
|
|
468
|
1
|
50
|
|
|
|
6
|
if ( length( $buf ) == 0 ) { |
469
|
0
|
0
|
0
|
|
|
0
|
if( defined( $from_recv ) && ! $errno ) { |
470
|
|
|
|
|
|
|
# no error and nothing received with select returning "can read" means EOF: other side closed socket |
471
|
0
|
0
|
|
|
|
0
|
$self->_debug_msg( 'EOF on receive attempt, closing socket' ) |
472
|
|
|
|
|
|
|
if $self->debug_level; |
473
|
0
|
|
|
|
|
0
|
$self->close; |
474
|
|
|
|
|
|
|
|
475
|
0
|
0
|
|
|
|
0
|
if( length( $message ) == 0 ) { |
476
|
|
|
|
|
|
|
# we did not receive anything yet, so we may (in some cases) reconnect and try again |
477
|
0
|
|
|
|
|
0
|
$error_code = $ERROR_NO_CONNECTION; |
478
|
|
|
|
|
|
|
} |
479
|
|
|
|
|
|
|
|
480
|
0
|
|
|
|
|
0
|
last ATTEMPT; |
481
|
|
|
|
|
|
|
} |
482
|
|
|
|
|
|
|
# we did not read anything on this attempt: wait a bit before the next one; should not happen, but just in case... |
483
|
0
|
0
|
|
|
|
0
|
if ( my $remaining_attempts = $MAX_RETRIES - $retries ) { |
484
|
0
|
|
|
|
|
0
|
$remaining_time = $until - Time::HiRes::time(); |
485
|
0
|
|
|
|
|
0
|
my $micro_seconds = int( $remaining_time * 1e6 / $remaining_attempts ); |
486
|
0
|
0
|
|
|
|
0
|
if ( $micro_seconds > 0 ) { |
487
|
0
|
0
|
|
|
|
0
|
$micro_seconds = 250_000 if $micro_seconds > 250_000; # prevent long sleeps if total remaining time is big |
488
|
0
|
0
|
|
|
|
0
|
$self->_debug_msg( format_message( 'sleeping (remaining attempts %d, time %.6f): %d microseconds', $remaining_attempts, $remaining_time, $micro_seconds ) ) |
489
|
|
|
|
|
|
|
if $self->debug_level; |
490
|
0
|
|
|
|
|
0
|
Time::HiRes::usleep( $micro_seconds ); |
491
|
|
|
|
|
|
|
} |
492
|
|
|
|
|
|
|
} |
493
|
|
|
|
|
|
|
} |
494
|
|
|
|
|
|
|
} |
495
|
|
|
|
|
|
|
} |
496
|
|
|
|
|
|
|
|
497
|
1
|
50
|
33
|
|
|
14
|
unless( !$errno && length( $message ) >= $length ) |
498
|
|
|
|
|
|
|
{ |
499
|
|
|
|
|
|
|
$self->_error( |
500
|
|
|
|
|
|
|
$error_code // $ERROR_CANNOT_RECV, |
501
|
|
|
|
|
|
|
format_message( "Kafka::IO(%s)->receive: ERRNO=%s ERROR='%s' (length=%s, received=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)", |
502
|
|
|
|
|
|
|
$self->{host}, |
503
|
0
|
|
0
|
|
|
0
|
( $errno // 0 ) + 0, |
|
|
|
0
|
|
|
|
|
|
|
|
0
|
|
|
|
|
504
|
|
|
|
|
|
|
( $errno // '' ) . '', |
505
|
|
|
|
|
|
|
$length, |
506
|
|
|
|
|
|
|
length( $message ), |
507
|
|
|
|
|
|
|
$timeout, |
508
|
|
|
|
|
|
|
$retries, |
509
|
|
|
|
|
|
|
$interrupts, |
510
|
|
|
|
|
|
|
Time::HiRes::time() - $started, |
511
|
|
|
|
|
|
|
), |
512
|
|
|
|
|
|
|
); |
513
|
|
|
|
|
|
|
} |
514
|
1
|
50
|
|
|
|
9
|
$self->_debug_msg( $message, 'Response from', 'yellow' ) |
515
|
|
|
|
|
|
|
if $self->debug_level >= 2; |
516
|
|
|
|
|
|
|
|
517
|
|
|
|
|
|
|
# returns tainted data |
518
|
1
|
|
|
|
|
7
|
return \$message; |
519
|
|
|
|
|
|
|
} |
520
|
|
|
|
|
|
|
|
521
|
|
|
|
|
|
|
=head3 C |
522
|
|
|
|
|
|
|
|
523
|
|
|
|
|
|
|
Closes connection to Kafka server. |
524
|
|
|
|
|
|
|
Returns true if those operations succeed and if no error was reported by any PerlIO layer. |
525
|
|
|
|
|
|
|
|
526
|
|
|
|
|
|
|
=cut |
527
|
|
|
|
|
|
|
sub close { |
528
|
1
|
|
|
1
|
1
|
4
|
my ( $self ) = @_; |
529
|
|
|
|
|
|
|
|
530
|
1
|
|
|
|
|
3
|
my $ret = 1; |
531
|
1
|
50
|
|
|
|
8
|
if ( $self->{socket} ) { |
532
|
1
|
|
|
|
|
89
|
$ret = CORE::close( $self->{socket} ); |
533
|
1
|
|
|
|
|
6
|
$self->{socket} = undef; |
534
|
1
|
|
|
|
|
8
|
$self->{_io_select} = undef; |
535
|
|
|
|
|
|
|
} |
536
|
|
|
|
|
|
|
|
537
|
1
|
|
|
|
|
4
|
return $ret; |
538
|
|
|
|
|
|
|
} |
539
|
|
|
|
|
|
|
|
540
|
|
|
|
|
|
|
sub _is_close_wait { |
541
|
1
|
|
|
1
|
|
3
|
my ( $self ) = @_; |
542
|
1
|
50
|
33
|
|
|
36
|
return 1 unless $self->{socket} && $self->{_io_select}; # closed already |
543
|
|
|
|
|
|
|
# http://stefan.buettcher.org/cs/conn_closed.html |
544
|
|
|
|
|
|
|
# socket is open; check if we can read, and if we can but recv() cannot peek, it means we got EOF |
545
|
1
|
50
|
|
|
|
7
|
return unless $self->{_io_select}->can_read( 0 ); # we cannot read, but may be able to write |
546
|
0
|
|
|
|
|
0
|
my $buf = ''; |
547
|
0
|
|
|
|
|
0
|
undef $!; |
548
|
0
|
|
|
|
|
0
|
my $status = CORE::recv( $self->{socket}, $buf, 1, MSG_DONTWAIT | MSG_PEEK ); # peek, do not remove data from queue |
549
|
|
|
|
|
|
|
# EOF when there is no error, status is defined, but result is empty |
550
|
0
|
|
0
|
|
|
0
|
return ! $! && defined $status && length( $buf ) == 0; |
551
|
|
|
|
|
|
|
} |
552
|
|
|
|
|
|
|
|
553
|
|
|
|
|
|
|
# The method verifies if we can connect to a Kafka broker. |
554
|
|
|
|
|
|
|
# This is evil: opens and immediately closes a NEW connection so do not use unless there is a strong reason for it. |
555
|
|
|
|
|
|
|
sub _is_alive { |
556
|
3
|
|
|
3
|
|
825775
|
my ( $self ) = @_; |
557
|
|
|
|
|
|
|
|
558
|
3
|
|
|
|
|
16
|
my $socket = $self->{socket}; |
559
|
3
|
100
|
|
|
|
21
|
return unless $socket; |
560
|
|
|
|
|
|
|
|
561
|
2
|
|
|
|
|
62
|
socket( my $tmp_socket, $self->{pf}, SOCK_STREAM, IPPROTO_TCP ); |
562
|
2
|
|
|
|
|
202
|
my $is_alive = connect( $tmp_socket, getpeername( $socket ) ); |
563
|
2
|
|
|
|
|
205
|
CORE::close( $tmp_socket ); |
564
|
|
|
|
|
|
|
|
565
|
2
|
|
|
|
|
28
|
return $is_alive; |
566
|
|
|
|
|
|
|
} |
567
|
|
|
|
|
|
|
|
568
|
|
|
|
|
|
|
#-- private attributes --------------------------------------------------------- |
569
|
|
|
|
|
|
|
|
570
|
|
|
|
|
|
|
#-- private methods ------------------------------------------------------------ |
571
|
|
|
|
|
|
|
|
572
|
|
|
|
|
|
|
# You need to have access to Kafka instance and be able to connect through TCP. |
573
|
|
|
|
|
|
|
# uses http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29 |
574
|
|
|
|
|
|
|
sub _connect { |
575
|
13
|
|
|
13
|
|
43
|
my ( $self ) = @_; |
576
|
|
|
|
|
|
|
|
577
|
13
|
|
|
|
|
39
|
$self->{socket} = undef; |
578
|
13
|
|
|
|
|
43
|
$self->{_io_select} = undef; |
579
|
|
|
|
|
|
|
|
580
|
13
|
|
|
|
|
37
|
my $name = $self->{host}; |
581
|
13
|
|
|
|
|
34
|
my $port = $self->{port}; |
582
|
13
|
|
|
|
|
37
|
my $timeout = $self->{timeout}; |
583
|
|
|
|
|
|
|
|
584
|
13
|
|
|
|
|
38
|
my $ip = ''; |
585
|
13
|
100
|
|
|
|
67
|
if ( $self->_get_family( $name ) ) { |
586
|
2
|
|
|
|
|
8
|
$ip = $self->{ip} = $name; |
587
|
|
|
|
|
|
|
} else { |
588
|
10
|
50
|
|
|
|
33
|
if ( defined $timeout ) { |
589
|
10
|
|
|
|
|
21
|
my $remaining; |
590
|
10
|
|
|
|
|
26
|
my $start = time(); |
591
|
|
|
|
|
|
|
|
592
|
10
|
50
|
|
|
|
67
|
$self->_debug_msg( format_message( "name = '%s', number of wallclock seconds = %s", $name, ceil( $timeout ) ) ) |
593
|
|
|
|
|
|
|
if $self->debug_level; |
594
|
|
|
|
|
|
|
|
595
|
|
|
|
|
|
|
# DNS lookup. |
596
|
10
|
|
|
|
|
23
|
local $@; |
597
|
0
|
|
|
0
|
|
0
|
my $h = set_sig_handler( 'ALRM', sub { die 'alarm clock restarted' }, |
598
|
|
|
|
|
|
|
{ |
599
|
10
|
|
|
|
|
159
|
mask => [ 'ALRM' ], |
600
|
|
|
|
|
|
|
safe => 0, # perl 5.8+ uses safe signal delivery so we need unsafe signal for timeout to work |
601
|
|
|
|
|
|
|
} |
602
|
|
|
|
|
|
|
); |
603
|
10
|
|
|
|
|
1998
|
eval { |
604
|
10
|
|
|
|
|
129
|
$remaining = alarm( ceil( $timeout ) ); |
605
|
10
|
|
|
|
|
58
|
$ip = $self->_gethostbyname( $name ); |
606
|
10
|
|
|
|
|
15000256
|
alarm 0; |
607
|
|
|
|
|
|
|
}; |
608
|
10
|
|
|
|
|
46
|
alarm 0; # race condition protection |
609
|
10
|
|
|
|
|
28
|
my $error = $@; |
610
|
10
|
|
|
|
|
82
|
undef $h; |
611
|
|
|
|
|
|
|
|
612
|
10
|
50
|
|
|
|
718
|
$self->_debug_msg( format_message( "_connect: ip = '%s', error = '%s', \$? = %s, \$! = '%s'", $ip, $error, $?, $! ) ) |
613
|
|
|
|
|
|
|
if $self->debug_level; |
614
|
|
|
|
|
|
|
|
615
|
10
|
50
|
|
|
|
42
|
die $error if $error; |
616
|
10
|
100
|
|
|
|
94
|
die( format_message( "gethostbyname %s: \$? = '%s', \$! = '%s'\n", $name, $?, $! ) ) unless $ip; |
617
|
|
|
|
|
|
|
|
618
|
8
|
|
|
|
|
26
|
my $elapsed = time() - $start; |
619
|
|
|
|
|
|
|
# $SIG{ALRM} restored automatically, but we need to restart previous alarm manually |
620
|
|
|
|
|
|
|
|
621
|
8
|
50
|
|
|
|
39
|
$self->_debug_msg( format_message( '_connect: %s (remaining) - %s (elapsed) = %s', $remaining, $elapsed, $remaining - $elapsed ) ) |
622
|
|
|
|
|
|
|
if $self->debug_level; |
623
|
8
|
100
|
|
|
|
90
|
if ( $remaining ) { |
624
|
2
|
100
|
|
|
|
13
|
if ( $remaining - $elapsed > 0 ) { |
625
|
1
|
50
|
|
|
|
6
|
$self->_debug_msg( '_connect: remaining - elapsed > 0 (to alarm restart)' ) |
626
|
|
|
|
|
|
|
if $self->debug_level; |
627
|
1
|
|
|
|
|
19
|
alarm( ceil( $remaining - $elapsed ) ); |
628
|
|
|
|
|
|
|
} else { |
629
|
1
|
50
|
|
|
|
7
|
$self->_debug_msg( '_connect: remaining - elapsed < 0 (to alarm function call)' ) |
630
|
|
|
|
|
|
|
if $self->debug_level; |
631
|
|
|
|
|
|
|
# $SIG{ALRM}->(); |
632
|
1
|
|
|
|
|
89
|
kill ALRM => $$; |
633
|
|
|
|
|
|
|
} |
634
|
2
|
50
|
|
|
|
18
|
$self->_debug_msg( "_connect: after alarm 'recalled'" ) |
635
|
|
|
|
|
|
|
if $self->debug_level; |
636
|
|
|
|
|
|
|
} |
637
|
|
|
|
|
|
|
} else { |
638
|
0
|
|
|
|
|
0
|
$ip = $self->_gethostbyname( $name ); |
639
|
0
|
0
|
|
|
|
0
|
die( format_message( "could not resolve host name to IP address: %s\n", $name ) ) unless $ip; |
640
|
|
|
|
|
|
|
} |
641
|
|
|
|
|
|
|
} |
642
|
|
|
|
|
|
|
|
643
|
|
|
|
|
|
|
# Create socket. |
644
|
10
|
50
|
|
|
|
1396
|
socket( my $connection, $self->{pf}, SOCK_STREAM, scalar getprotobyname( 'tcp' ) ) or die( "socket: $!\n" ); |
645
|
|
|
|
|
|
|
|
646
|
|
|
|
|
|
|
# Set autoflushing. |
647
|
10
|
|
|
|
|
86
|
my $file_handle = select( $connection ); $| = 1; select $file_handle; |
|
10
|
|
|
|
|
54
|
|
|
10
|
|
|
|
|
77
|
|
648
|
|
|
|
|
|
|
|
649
|
|
|
|
|
|
|
# Set FD_CLOEXEC. |
650
|
10
|
50
|
|
|
|
66
|
my $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl: $!\n"; |
651
|
10
|
50
|
|
|
|
72
|
fcntl( $connection, F_SETFL, $flags | FD_CLOEXEC ) or die "fnctl: $!\n"; |
652
|
|
|
|
|
|
|
|
653
|
10
|
50
|
|
|
|
52
|
$flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0. |
654
|
10
|
50
|
|
|
|
56
|
fcntl( $connection, F_SETFL, $flags | O_NONBLOCK ) or die "fcntl F_SETFL O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0. |
655
|
|
|
|
|
|
|
|
656
|
|
|
|
|
|
|
# Connect returns immediately because of O_NONBLOCK. |
657
|
|
|
|
|
|
|
my $sockaddr = $self->{af} eq AF_INET |
658
|
|
|
|
|
|
|
? pack_sockaddr_in( $port, inet_aton( $ip ) ) |
659
|
10
|
100
|
|
|
|
165
|
: pack_sockaddr_in6( $port, inet_pton( $self->{af}, $ip ) ) |
660
|
|
|
|
|
|
|
; |
661
|
10
|
100
|
66
|
|
|
1667
|
connect( $connection, $sockaddr ) || $!{EINPROGRESS} || die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ); |
662
|
|
|
|
|
|
|
|
663
|
|
|
|
|
|
|
# Reset O_NONBLOCK. |
664
|
9
|
50
|
|
|
|
318
|
$flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0. |
665
|
9
|
50
|
|
|
|
53
|
fcntl( $connection, F_SETFL, $flags & ~ O_NONBLOCK ) or die "fcntl F_SETFL not O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0. |
666
|
|
|
|
|
|
|
|
667
|
|
|
|
|
|
|
# Use select() to poll for completion or error. When connect succeeds we can write. |
668
|
9
|
|
|
|
|
26
|
my $vec = ''; |
669
|
9
|
|
|
|
|
53
|
vec( $vec, fileno( $connection ), 1 ) = 1; |
670
|
9
|
|
33
|
|
|
135
|
select( undef, $vec, undef, $timeout // $REQUEST_TIMEOUT ); |
671
|
9
|
50
|
|
|
|
44
|
unless ( vec( $vec, fileno( $connection ), 1 ) ) { |
672
|
|
|
|
|
|
|
# If no response yet, impose our own timeout. |
673
|
0
|
|
|
|
|
0
|
$! = ETIMEDOUT; |
674
|
0
|
|
|
|
|
0
|
die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ); |
675
|
|
|
|
|
|
|
} |
676
|
|
|
|
|
|
|
|
677
|
|
|
|
|
|
|
# This is how we see whether it connected or there was an error. Document Unix, are you kidding?! |
678
|
9
|
|
|
|
|
123
|
$! = unpack( 'L', getsockopt( $connection, SOL_SOCKET, SO_ERROR ) ); |
679
|
9
|
50
|
|
|
|
55
|
die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ) if $!; |
680
|
|
|
|
|
|
|
|
681
|
|
|
|
|
|
|
# Set timeout on all reads and writes. |
682
|
|
|
|
|
|
|
# |
683
|
|
|
|
|
|
|
# Note the difference between Perl's sysread() and read() calls: sysread() |
684
|
|
|
|
|
|
|
# queries the kernel exactly once, with max delay specified here. read() |
685
|
|
|
|
|
|
|
# queries the kernel repeatedly until there's a read error (such as this |
686
|
|
|
|
|
|
|
# timeout), EOF, or a full buffer. So when using read() with a timeout of one |
687
|
|
|
|
|
|
|
# second, if the remote server sends 1 byte repeatedly at 1 second intervals, |
688
|
|
|
|
|
|
|
# read() will read the whole buffer very slowly and sysread() will return only |
689
|
|
|
|
|
|
|
# the first byte. The print() and syswrite() calls are similarly different. |
690
|
|
|
|
|
|
|
# <> is of course similar to read() but delimited by newlines instead of buffer |
691
|
|
|
|
|
|
|
# sizes. |
692
|
9
|
|
33
|
|
|
62
|
my $timeval = _get_timeval( $timeout // $REQUEST_TIMEOUT ); |
693
|
9
|
|
50
|
|
|
321
|
setsockopt( $connection, SOL_SOCKET, SO_SNDTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_SNDTIMEO: $!\n"; |
694
|
9
|
|
50
|
|
|
57
|
setsockopt( $connection, SOL_SOCKET, SO_RCVTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_RCVTIMEO: $!\n"; |
695
|
|
|
|
|
|
|
|
696
|
9
|
|
|
|
|
27
|
$self->{socket} = $connection; |
697
|
9
|
|
|
|
|
137
|
my $s = $self->{_io_select} = IO::Select->new; |
698
|
9
|
|
|
|
|
227
|
$s->add( $self->{socket} ); |
699
|
|
|
|
|
|
|
|
700
|
9
|
|
|
|
|
718
|
return $connection; |
701
|
|
|
|
|
|
|
} |
702
|
|
|
|
|
|
|
|
703
|
|
|
|
|
|
|
# Packing timeval |
704
|
|
|
|
|
|
|
# uses http://trinitum.org/wp/packing-timeval/ |
705
|
|
|
|
|
|
|
sub _get_timeval { |
706
|
9
|
|
|
9
|
|
74
|
my $timeout = shift; |
707
|
|
|
|
|
|
|
|
708
|
9
|
|
|
|
|
32
|
my $intval = int( $timeout ); # sec |
709
|
9
|
|
|
|
|
40
|
my $fraction = int( ( $timeout - $intval ) * 1_000_000 ); # ms |
710
|
|
|
|
|
|
|
|
711
|
9
|
50
|
33
|
|
|
217
|
if ( $Config{osname} eq 'netbsd' && _major_osvers() >= 6 && $Config{longsize} == 4 ) { |
|
|
|
33
|
|
|
|
|
712
|
0
|
0
|
|
|
|
0
|
if ( defined $Config{use64bitint} ) { |
713
|
0
|
|
|
|
|
0
|
$timeout = pack( 'QL', int( $timeout ), $fraction ); |
714
|
|
|
|
|
|
|
} else { |
715
|
|
|
|
|
|
|
$timeout = pack( |
716
|
|
|
|
|
|
|
'LLL', |
717
|
|
|
|
|
|
|
( |
718
|
0
|
0
|
|
|
|
0
|
$Config{byteorder} eq '1234' |
719
|
|
|
|
|
|
|
? ( $timeout, 0, $fraction ) |
720
|
|
|
|
|
|
|
: ( 0, $timeout, $fraction ) |
721
|
|
|
|
|
|
|
) |
722
|
|
|
|
|
|
|
); |
723
|
|
|
|
|
|
|
} |
724
|
|
|
|
|
|
|
} else { |
725
|
9
|
|
|
|
|
76
|
$timeout = pack( 'L!L!', $timeout, $fraction ); |
726
|
|
|
|
|
|
|
} |
727
|
|
|
|
|
|
|
|
728
|
9
|
|
|
|
|
36
|
return $timeout; |
729
|
|
|
|
|
|
|
} |
730
|
|
|
|
|
|
|
|
731
|
|
|
|
|
|
|
sub _major_osvers { |
732
|
0
|
|
|
0
|
|
0
|
my $osvers = $Config{osvers}; |
733
|
0
|
|
|
|
|
0
|
my ( $major_osvers ) = $osvers =~ /^(\d+)/; |
734
|
0
|
|
|
|
|
0
|
$major_osvers += 0; |
735
|
|
|
|
|
|
|
|
736
|
0
|
|
|
|
|
0
|
return $major_osvers; |
737
|
|
|
|
|
|
|
} |
738
|
|
|
|
|
|
|
|
739
|
|
|
|
|
|
|
sub _gethostbyname { |
740
|
8
|
|
|
8
|
|
31
|
my ( $self, $name ) = @_; |
741
|
|
|
|
|
|
|
|
742
|
8
|
|
|
|
|
26
|
my $is_v4_fqdn = 1; |
743
|
8
|
|
|
|
|
30
|
$self->{ip} = ''; |
744
|
|
|
|
|
|
|
|
745
|
8
|
|
|
|
|
23
|
my $ip_version = $self->{ip_version}; |
746
|
8
|
100
|
100
|
|
|
60
|
if ( defined( $ip_version ) && $ip_version == $IP_V6 ) { |
747
|
1
|
|
|
|
|
104
|
my ( $err, @addrs ) = getaddrinfo( |
748
|
|
|
|
|
|
|
$name, |
749
|
|
|
|
|
|
|
'', # not interested in the service name |
750
|
|
|
|
|
|
|
{ |
751
|
|
|
|
|
|
|
family => AF_INET6, |
752
|
|
|
|
|
|
|
socktype => SOCK_STREAM, |
753
|
|
|
|
|
|
|
protocol => IPPROTO_TCP, |
754
|
|
|
|
|
|
|
}, |
755
|
|
|
|
|
|
|
); |
756
|
1
|
50
|
|
|
|
8
|
return( $self->{ip} ) if $err; |
757
|
|
|
|
|
|
|
|
758
|
1
|
|
|
|
|
3
|
$is_v4_fqdn = 0; |
759
|
1
|
|
|
|
|
18
|
for my $addr ( @addrs ) { |
760
|
1
|
|
|
|
|
21
|
my ( $err, $ipaddr ) = getnameinfo( $addr->{addr}, NI_NUMERICHOST, NIx_NOSERV ); |
761
|
1
|
50
|
|
|
|
5
|
next if $err; |
762
|
|
|
|
|
|
|
|
763
|
1
|
|
|
|
|
4
|
$self->{af} = AF_INET6; |
764
|
1
|
|
|
|
|
4
|
$self->{pf} = PF_INET6; |
765
|
1
|
|
|
|
|
4
|
$self->{ip} = $ipaddr; |
766
|
1
|
|
|
|
|
6
|
last; |
767
|
|
|
|
|
|
|
} |
768
|
|
|
|
|
|
|
} |
769
|
|
|
|
|
|
|
|
770
|
8
|
50
|
66
|
|
|
60
|
if ( $is_v4_fqdn && ( !defined( $ip_version ) || $ip_version == $IP_V4 ) ) { |
|
|
|
66
|
|
|
|
|
771
|
7
|
100
|
|
|
|
240094
|
if ( my $ipaddr = gethostbyname( $name ) ) { |
772
|
5
|
|
|
|
|
62
|
$self->{ip} = inet_ntop( $self->{af}, $ipaddr ); |
773
|
|
|
|
|
|
|
} |
774
|
|
|
|
|
|
|
} |
775
|
|
|
|
|
|
|
|
776
|
8
|
|
|
|
|
50
|
return $self->{ip}; |
777
|
|
|
|
|
|
|
} |
778
|
|
|
|
|
|
|
|
779
|
|
|
|
|
|
|
sub _get_family { |
780
|
13
|
|
|
13
|
|
50
|
my ( $self, $name ) = @_; |
781
|
|
|
|
|
|
|
|
782
|
13
|
|
|
|
|
31
|
my $is_ip; |
783
|
13
|
|
100
|
|
|
102
|
my $ip_version = $self->{ip_version} // 0; |
784
|
13
|
100
|
33
|
|
|
113
|
if ( ( ( $is_ip = is_ipv6( $name ) ) && !$ip_version ) || $ip_version == $IP_V6 ) { |
|
|
100
|
66
|
|
|
|
|
|
|
50
|
100
|
|
|
|
|
|
|
|
100
|
|
|
|
|
785
|
2
|
100
|
66
|
|
|
116
|
$self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) ) |
|
|
|
33
|
|
|
|
|
786
|
|
|
|
|
|
|
if |
787
|
|
|
|
|
|
|
$ip_version |
788
|
|
|
|
|
|
|
&& ( |
789
|
|
|
|
|
|
|
( !$is_ip && is_ipv4( $name ) ) |
790
|
|
|
|
|
|
|
|| ( $is_ip && $ip_version == $IP_V4 ) |
791
|
|
|
|
|
|
|
) |
792
|
|
|
|
|
|
|
; |
793
|
|
|
|
|
|
|
|
794
|
1
|
|
|
|
|
30
|
$self->{af} = AF_INET6; |
795
|
1
|
|
|
|
|
5
|
$self->{pf} = PF_INET6; |
796
|
|
|
|
|
|
|
} elsif ( ( ( $is_ip = is_ipv4( $name ) ) && !$ip_version ) || $ip_version == $IP_V4 ) { |
797
|
3
|
50
|
33
|
|
|
270
|
$self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) ) |
|
|
|
66
|
|
|
|
|
798
|
|
|
|
|
|
|
if |
799
|
|
|
|
|
|
|
$ip_version |
800
|
|
|
|
|
|
|
&& ( |
801
|
|
|
|
|
|
|
( !$is_ip && is_ipv6( $name ) ) |
802
|
|
|
|
|
|
|
|| ( $is_ip && $ip_version == $IP_V6 ) |
803
|
|
|
|
|
|
|
) |
804
|
|
|
|
|
|
|
; |
805
|
|
|
|
|
|
|
|
806
|
3
|
|
|
|
|
31
|
$self->{af} = AF_INET; |
807
|
3
|
|
|
|
|
10
|
$self->{pf} = PF_INET; |
808
|
|
|
|
|
|
|
} elsif ( !$ip_version ) { |
809
|
8
|
|
|
|
|
727
|
$self->{af} = AF_INET; |
810
|
8
|
|
|
|
|
23
|
$self->{pf} = PF_INET; |
811
|
|
|
|
|
|
|
} |
812
|
|
|
|
|
|
|
|
813
|
12
|
|
|
|
|
57
|
return $is_ip; |
814
|
|
|
|
|
|
|
} |
815
|
|
|
|
|
|
|
|
816
|
|
|
|
|
|
|
# Show additional debugging information |
817
|
|
|
|
|
|
|
sub _debug_msg { |
818
|
0
|
|
|
0
|
|
0
|
my ( $self, $message, $header, $colour ) = @_; |
819
|
|
|
|
|
|
|
|
820
|
0
|
0
|
|
|
|
0
|
if ( $header ) { |
821
|
0
|
0
|
|
|
|
0
|
unless ( $_hdr ) { |
822
|
0
|
|
|
|
|
0
|
require Data::HexDump::Range; |
823
|
0
|
|
|
|
|
0
|
$_hdr = Data::HexDump::Range->new( |
824
|
|
|
|
|
|
|
FORMAT => 'ANSI', # 'ANSI'|'ASCII'|'HTML' |
825
|
|
|
|
|
|
|
COLOR => 'bw', # 'bw' | 'cycle' |
826
|
|
|
|
|
|
|
OFFSET_FORMAT => 'hex', # 'hex' | 'dec' |
827
|
|
|
|
|
|
|
DATA_WIDTH => 16, # 16 | 20 | ... |
828
|
|
|
|
|
|
|
DISPLAY_RANGE_NAME => 0, |
829
|
|
|
|
|
|
|
# MAXIMUM_RANGE_NAME_SIZE => 16, |
830
|
|
|
|
|
|
|
DISPLAY_COLUMN_NAMES => 1, |
831
|
|
|
|
|
|
|
DISPLAY_RULER => 1, |
832
|
|
|
|
|
|
|
DISPLAY_OFFSET => 1, |
833
|
|
|
|
|
|
|
# DISPLAY_CUMULATIVE_OFFSET => 1, |
834
|
|
|
|
|
|
|
DISPLAY_ZERO_SIZE_RANGE_WARNING => 0, |
835
|
|
|
|
|
|
|
DISPLAY_ZERO_SIZE_RANGE => 1, |
836
|
|
|
|
|
|
|
DISPLAY_RANGE_NAME => 0, |
837
|
|
|
|
|
|
|
# DISPLAY_RANGE_SIZE => 1, |
838
|
|
|
|
|
|
|
DISPLAY_ASCII_DUMP => 1, |
839
|
|
|
|
|
|
|
DISPLAY_HEX_DUMP => 1, |
840
|
|
|
|
|
|
|
# DISPLAY_DEC_DUMP => 1, |
841
|
|
|
|
|
|
|
# COLOR_NAMES => {}, |
842
|
|
|
|
|
|
|
ORIENTATION => 'horizontal', |
843
|
|
|
|
|
|
|
); |
844
|
|
|
|
|
|
|
} |
845
|
|
|
|
|
|
|
|
846
|
|
|
|
|
|
|
say STDERR |
847
|
0
|
|
|
|
|
0
|
"# $header ", $self->{host}, ':', $self->{port}, "\n", |
848
|
|
|
|
|
|
|
'# Hex Stream: ', unpack( 'H*', $message ), "\n", |
849
|
|
|
|
|
|
|
$_hdr->dump( |
850
|
|
|
|
|
|
|
[ |
851
|
|
|
|
|
|
|
[ 'data', length( $message ), $colour ], |
852
|
|
|
|
|
|
|
], |
853
|
|
|
|
|
|
|
$message |
854
|
|
|
|
|
|
|
) |
855
|
|
|
|
|
|
|
; |
856
|
|
|
|
|
|
|
} else { |
857
|
0
|
|
|
|
|
0
|
say STDERR format_message( '[%s] %s', scalar( localtime ), $message ); |
858
|
|
|
|
|
|
|
} |
859
|
|
|
|
|
|
|
|
860
|
0
|
|
|
|
|
0
|
return; |
861
|
|
|
|
|
|
|
} |
862
|
|
|
|
|
|
|
|
863
|
|
|
|
|
|
|
# Handler for errors |
864
|
|
|
|
|
|
|
sub _error { |
865
|
88
|
|
|
88
|
|
134842
|
my $self = shift; |
866
|
88
|
|
|
|
|
436
|
my %args = throw_args( @_ ); |
867
|
88
|
50
|
|
|
|
422
|
$self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) ) |
868
|
|
|
|
|
|
|
if $self->debug_level; |
869
|
88
|
|
|
|
|
859
|
Kafka::Exception::IO->throw( %args ); |
870
|
|
|
|
|
|
|
} |
871
|
|
|
|
|
|
|
|
872
|
|
|
|
|
|
|
|
873
|
|
|
|
|
|
|
|
874
|
|
|
|
|
|
|
1; |
875
|
|
|
|
|
|
|
|
876
|
|
|
|
|
|
|
__END__ |