File Coverage

lib/Kafka/IO.pm
Criterion Covered Total %
statement 261 332 78.6
branch 80 170 47.0
condition 47 128 36.7
subroutine 34 37 91.8
pod 4 4 100.0
total 426 671 63.4


line stmt bran cond sub pod time code
1             package Kafka::IO;
2              
3             =head1 NAME
4              
5             Kafka::IO - Interface to network communication with the Apache Kafka server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::IO> version 1.08 .
10              
11             =cut
12              
13              
14              
15 16     16   171274 use 5.010;
  16         55  
16 16     16   86 use strict;
  16         123  
  16         338  
17 16     16   137 use warnings;
  16         30  
  16         997  
18              
19             our $DEBUG = 0;
20              
21             our $VERSION = 'v1.08';
22              
23 16     16   104 use Carp;
  16         48  
  16         924  
24 16     16   91 use Config;
  16         33  
  16         745  
25 16     16   90 use Const::Fast;
  16         29  
  16         127  
26 16         982 use Data::Validate::Domain qw(
27             is_hostname
28 16     16   6795 );
  16         152696  
29 16         1301 use Data::Validate::IP qw(
30             is_ipv4
31             is_ipv6
32 16     16   6942 );
  16         383299  
33 16         1820 use Errno qw(
34             EAGAIN
35             ECONNRESET
36             EINTR
37             EWOULDBLOCK
38             ETIMEDOUT
39 16     16   1251 );
  16         2767  
40 16     16   106 use Fcntl;
  16         31  
  16         3753  
41 16     16   9131 use IO::Select;
  16         27323  
  16         804  
42 16         809 use Params::Util qw(
43             _STRING
44 16     16   1662 );
  16         8185  
45 16         145 use POSIX qw(
46             ceil
47 16     16   93 );
  16         28  
48 16         999 use Scalar::Util qw(
49             dualvar
50 16     16   12279 );
  16         26  
51 16         1744 use Socket qw(
52             AF_INET
53             AF_INET6
54             IPPROTO_TCP
55             MSG_DONTWAIT
56             MSG_PEEK
57             NI_NUMERICHOST
58             NIx_NOSERV
59             PF_INET
60             PF_INET6
61             SOCK_STREAM
62             SOL_SOCKET
63             SO_ERROR
64             SO_RCVTIMEO
65             SO_SNDTIMEO
66             getaddrinfo
67             getnameinfo
68             inet_aton
69             inet_pton
70             inet_ntop
71             pack_sockaddr_in
72             pack_sockaddr_in6
73 16     16   96 );
  16         27  
74 16         1494 use Sys::SigAction qw(
75             set_sig_handler
76 16     16   7830 );
  16         52839  
77 16     16   117 use Time::HiRes ();
  16         27  
  16         315  
78 16     16   1691 use Try::Tiny;
  16         6402  
  16         1095  
79              
80 16         2237 use Kafka qw(
81             $ERROR_CANNOT_BIND
82             $ERROR_CANNOT_RECV
83             $ERROR_CANNOT_SEND
84             $ERROR_MISMATCH_ARGUMENT
85             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
86             $ERROR_NO_CONNECTION
87             $IP_V4
88             $IP_V6
89             $KAFKA_SERVER_PORT
90             $REQUEST_TIMEOUT
91 16     16   105 );
  16         96  
92 16     16   4123 use Kafka::Exceptions;
  16         44  
  16         758  
93 16         55599 use Kafka::Internals qw(
94             $MAX_SOCKET_REQUEST_BYTES
95             debug_level
96             format_message
97 16     16   103 );
  16         30  
98              
99              
100              
101             =head1 SYNOPSIS
102              
103             use 5.010;
104             use strict;
105             use warnings;
106              
107             use Scalar::Util qw(
108             blessed
109             );
110             use Try::Tiny;
111              
112             use Kafka::IO;
113              
114             my $io;
115             try {
116             $io = Kafka::IO->new( host => 'localhost' );
117             } catch {
118             my $error = $_;
119             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
120             warn 'Error: (', $error->code, ') ', $error->message, "\n";
121             exit;
122             } else {
123             die $error;
124             }
125             };
126              
127             # Closes and cleans up
128             $io->close;
129             undef $io;
130              
131             =head1 DESCRIPTION
132              
133             This module is private and should not be used directly.
134              
135             In order to achieve better performance, methods of this module do not
136             perform arguments validation.
137              
138             The main features of the C<Kafka::IO> class are:
139              
140             =over 3
141              
142             =item *
143              
144             Provides an object oriented API for communication with Kafka.
145              
146             =item *
147              
148             This class allows you to create Kafka 0.9+ clients.
149              
150             =back
151              
152             =cut
153              
154             # Hard limit of IO operation retry attempts, to prevent high CPU usage in IO retry loop
155             const my $MAX_RETRIES => 30;
156              
157             our $_hdr;
158              
159             #-- constructor ----------------------------------------------------------------
160              
161             =head2 CONSTRUCTOR
162              
163             =head3 C<new>
164              
165             Establishes TCP connection to given host and port, creates and returns C<Kafka::IO> IO object.
166              
167             C<new()> takes arguments in key-value pairs. The following arguments are currently recognized:
168              
169             =over 3
170              
171             =item C<host =E<gt> $host>
172              
173             C<$host> is Kafka host to connect to. It can be a host name or an IP-address in
174             IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1').
175              
176             =item C<port =E<gt> $port>
177              
178             Optional, default = C<$KAFKA_SERVER_PORT>.
179              
180             C<$port> is integer attribute denoting the port number of to access Apache Kafka.
181              
182             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported
183             from the L<Kafka|Kafka> module.
184              
185             =item C<timeout =E<gt> $timeout>
186              
187             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L<Kafka|Kafka> module.
188              
189             Special behavior when C<timeout> is set to C<undef>:
190              
191             =back
192              
193             =over 3
194              
195             =item *
196              
197             Alarms are not used internally (namely when performing C<gethostbyname>).
198              
199             =item *
200              
201             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
202              
203             =back
204              
205             =over 3
206              
207             =item C<ip_version =E<gt> $ip_version>
208              
209             Force version of IP protocol for resolving host name (or interpretation of passed address).
210              
211             Optional, undefined by default, which works in the following way: version of IP address
212             is detected automatically, host name is resolved into IPv4 address.
213              
214             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
215             in C<Kafka> L<EXPORT|Kafka/EXPORT>.
216              
217             =back
218              
219             =cut
220             sub new {
221 13     13 1 11958 my ( $class, %p ) = @_;
222              
223 13         169 my $self = bless {
224             host => '',
225             timeout => $REQUEST_TIMEOUT,
226             port => $KAFKA_SERVER_PORT,
227             ip_version => undef,
228             af => '', # Address family constant
229             pf => '', # Protocol family constant
230             ip => '', # Human-readable textual representation of the ip address
231             }, $class;
232              
233 13   66     176 exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
234              
235             # we trust it: make it untainted
236 13         122 ( $self->{host} ) = $self->{host} =~ /\A(.+)\z/;
237 13         59 ( $self->{port} ) = $self->{port} =~ /\A(.+)\z/;
238              
239 13         47 $self->{socket} = undef;
240 13         29 $self->{_io_select} = undef;
241 13         25 my $error;
242             try {
243 13     13   1312 $self->_connect();
244             } catch {
245 4     4   1449 $error = $_;
246 13         178 };
247              
248 13 100       230 $self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) )
249             if defined $error
250             ;
251 9         555 return $self;
252             }
253              
254             #-- public attributes ----------------------------------------------------------
255              
256             =head2 METHODS
257              
258             The following methods are provided by C<Kafka::IO> class:
259              
260             =cut
261              
262             =head3 C<< send( $message <, $timeout> ) >>
263              
264             Sends a C<$message> to Kafka.
265              
266             The argument must be a bytes string.
267              
268             Use optional C<$timeout> argument to override default timeout for this request only.
269              
270             Returns the number of characters sent.
271              
272             =cut
273             sub send {
274 1     1 1 99 my ( $self, $message, $timeout ) = @_;
275 1 50       65 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
276             unless defined( _STRING( $message ) )
277             ;
278 1         6 my $length = length( $message );
279 1 50       4 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
280             unless $length <= $MAX_SOCKET_REQUEST_BYTES
281             ;
282 1 50 33     7 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
283 1 50       4 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
284             unless $timeout > 0
285             ;
286 1         4 my $select = $self->{_io_select};
287 1 50       3 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
288              
289 1 50       5 $self->_debug_msg( $message, 'Request to', 'green' )
290             if $self->debug_level >= 2
291             ;
292 1         3 my $sent = 0;
293              
294 1         11 my $started = Time::HiRes::time();
295 1         5 my $until = $started + $timeout;
296              
297 1         2 my $error_code;
298             my $errno;
299 1         2 my $retries = 0;
300 1         3 my $interrupts = 0;
301 1   66     17 ATTEMPT: while ( $sent < $length && $retries++ < $MAX_RETRIES ) {
302 1         6 my $remaining_time = $until - Time::HiRes::time();
303 1 50       3 last ATTEMPT if $remaining_time <= 0; # timeout expired
304              
305 1         3 undef $!;
306 1         7 my $can_write = $select->can_write( $remaining_time );
307 1         64 $errno = $!;
308 1 50       5 if ( $errno ) {
309 0 0       0 if ( $errno == EINTR ) {
310 0         0 undef $errno;
311 0         0 --$retries; # this attempt does not count
312 0         0 ++$interrupts;
313 0         0 next ATTEMPT;
314             }
315              
316 0         0 $self->close;
317              
318 0         0 last ATTEMPT;
319             }
320              
321 1 50       4 if ( $can_write ) {
322             # check for EOF on the first attempt only
323 1 50 33     18 if ( $retries == 1 && $self->_is_close_wait ) {
324 0         0 $self->close;
325 0         0 $error_code = $ERROR_NO_CONNECTION;
326 0         0 last ATTEMPT;
327             }
328              
329 1         25 undef $!;
330 1         62 my $wrote = CORE::send( $self->{socket}, $message, MSG_DONTWAIT );
331 1         6 $errno = $!;
332              
333 1 50 33     18 if( defined $wrote && $wrote > 0 ) {
334 1         3 $sent += $wrote;
335 1 50       4 if ( $sent < $length ) {
336             # remove written data from message
337 0         0 $message = substr( $message, $wrote );
338             }
339             }
340              
341 1 50       2 if( $errno ) {
342 0 0 0     0 if( $errno == EINTR ) {
    0 0        
      0        
343 0         0 undef $errno;
344 0         0 --$retries; # this attempt does not count
345 0         0 ++$interrupts;
346 0         0 next ATTEMPT;
347             } elsif (
348             $errno != EAGAIN
349             && $errno != EWOULDBLOCK
350             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
351             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
352             ) {
353 0         0 $self->close;
354 0         0 last ATTEMPT;
355             }
356             }
357              
358 1 50       5 last ATTEMPT unless defined $wrote;
359             }
360             }
361              
362 1 50 33     36 unless( !$errno && defined( $sent ) && $sent == $length )
      33        
363             {
364             $self->_error(
365             $error_code // $ERROR_CANNOT_SEND,
366             format_message( "Kafka::IO(%s)->send: ERRNO=%s ERROR='%s' (length=%s, sent=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
367             $self->{host},
368 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
369             ( $errno // '<none>' ) . '',
370             $length,
371             $sent,
372             $timeout,
373             $retries,
374             $interrupts,
375             Time::HiRes::time() - $started,
376             )
377             );
378             }
379              
380 1         8 return $sent;
381             }
382              
383             =head3 C<< receive( $length <, $timeout> ) >>
384              
385             Receives a message up to C<$length> size from Kafka.
386              
387             C<$length> argument must be a positive number.
388              
389             Use optional C<$timeout> argument to override default timeout for this call only.
390              
391             Returns a reference to the received message.
392              
393             =cut
394             sub receive {
395 1     1 1 1356 my ( $self, $length, $timeout ) = @_;
396 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
397             unless $length > 0
398             ;
399 1 50 33     9 $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
400 1 50       5 $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
401             unless $timeout > 0
402             ;
403 1         3 my $select = $self->{_io_select};
404 1 50       4 $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) unless $select;
405              
406 1         12 my $message = '';
407 1         3 my $len_to_read = $length;
408              
409 1         13 my $started = Time::HiRes::time();
410 1         3 my $until = $started + $timeout;
411              
412 1         2 my $error_code;
413             my $errno;
414 1         3 my $retries = 0;
415 1         3 my $interrupts = 0;
416 1   66     22 ATTEMPT: while ( $len_to_read > 0 && $retries++ < $MAX_RETRIES ) {
417 1         5 my $remaining_time = $until - Time::HiRes::time();
418 1 50       4 last if $remaining_time <= 0; # timeout expired
419              
420 1         2 undef $!;
421 1         5 my $can_read = $select->can_read( $remaining_time );
422 1         42 $errno = $!;
423 1 50       4 if ( $errno ) {
424 0 0       0 if ( $errno == EINTR ) {
425 0         0 undef $errno;
426 0         0 --$retries; # this attempt does not count
427 0         0 ++$interrupts;
428 0         0 next ATTEMPT;
429             }
430              
431 0         0 $self->close;
432              
433 0         0 last ATTEMPT;
434             }
435              
436 1 50       3 if ( $can_read ) {
437 1         12 my $buf = '';
438 1         4 undef $!;
439 1         21 my $from_recv = CORE::recv( $self->{socket}, $buf, $len_to_read, MSG_DONTWAIT );
440 1         4 $errno = $!;
441              
442 1 50 33     15 if ( defined( $from_recv ) && length( $buf ) ) {
443 1         4 $message .= $buf;
444 1         2 $len_to_read = $length - length( $message );
445 1         2 --$retries; # this attempt was successful, don't count as a retry
446             }
447 1 50       3 if ( $errno ) {
448 0 0 0     0 if ( $errno == EINTR ) {
    0 0        
      0        
449 0         0 undef $errno;
450 0         0 --$retries; # this attempt does not count
451 0         0 ++$interrupts;
452 0         0 next ATTEMPT;
453             } elsif (
454             $errno != EAGAIN
455             && $errno != EWOULDBLOCK
456             ## on freebsd, if we got ECONNRESET, it's a timeout from the other side
457             && !( $errno == ECONNRESET && $^O eq 'freebsd' )
458             ) {
459 0         0 $self->close;
460 0         0 last ATTEMPT;
461             }
462             }
463              
464 1 50       5 if ( length( $buf ) == 0 ) {
465 0 0 0     0 if( defined( $from_recv ) && ! $errno ) {
466             # no error and nothing received with select returning "can read" means EOF: other side closed socket
467 0 0       0 $self->_debug_msg( 'EOF on receive attempt, closing socket' )
468             if $self->debug_level;
469 0         0 $self->close;
470              
471 0 0       0 if( length( $message ) == 0 ) {
472             # we did not receive anything yet, so we may (in some cases) reconnect and try again
473 0         0 $error_code = $ERROR_NO_CONNECTION;
474             }
475              
476 0         0 last ATTEMPT;
477             }
478             # we did not read anything on this attempt: wait a bit before the next one; should not happen, but just in case...
479 0 0       0 if ( my $remaining_attempts = $MAX_RETRIES - $retries ) {
480 0         0 $remaining_time = $until - Time::HiRes::time();
481 0         0 my $micro_seconds = int( $remaining_time * 1e6 / $remaining_attempts );
482 0 0       0 if ( $micro_seconds > 0 ) {
483 0 0       0 $micro_seconds = 250_000 if $micro_seconds > 250_000; # prevent long sleeps if total remaining time is big
484 0 0       0 $self->_debug_msg( format_message( 'sleeping (remaining attempts %d, time %.6f): %d microseconds', $remaining_attempts, $remaining_time, $micro_seconds ) )
485             if $self->debug_level;
486 0         0 Time::HiRes::usleep( $micro_seconds );
487             }
488             }
489             }
490             }
491             }
492              
493 1 50 33     14 unless( !$errno && length( $message ) >= $length )
494             {
495             $self->_error(
496             $error_code // $ERROR_CANNOT_RECV,
497             format_message( "Kafka::IO(%s)->receive: ERRNO=%s ERROR='%s' (length=%s, received=%s, timeout=%s, retries=%s, interrupts=%s, secs=%.6f)",
498             $self->{host},
499 0   0     0 ( $errno // 0 ) + 0,
      0        
      0        
500             ( $errno // '<none>' ) . '',
501             $length,
502             length( $message ),
503             $timeout,
504             $retries,
505             $interrupts,
506             Time::HiRes::time() - $started,
507             ),
508             );
509             }
510 1 50       5 $self->_debug_msg( $message, 'Response from', 'yellow' )
511             if $self->debug_level >= 2;
512              
513             # returns tainted data
514 1         7 return \$message;
515             }
516              
517             =head3 C<< try_receive( $length <, $timeout> ) >>
518             Receives a message up to C<$length> size from Kafka.
519              
520             C<$length> argument must be a positive number.
521              
522             Use optional C<$timeout> argument to override default timeout for this call only.
523              
524             Returns a reference to the received message.
525              
526             =cut
527              
528             *try_receive = \&receive;
529              
530             =head3 C<close>
531              
532             Closes connection to Kafka server.
533             Returns true if those operations succeed and if no error was reported by any PerlIO layer.
534              
535             =cut
536             sub close {
537 1     1 1 3 my ( $self ) = @_;
538              
539 1         2 my $ret = 1;
540 1 50       4 if ( $self->{socket} ) {
541 1         60 $ret = CORE::close( $self->{socket} );
542 1         117 $self->{socket} = undef;
543 1         7 $self->{_io_select} = undef;
544             }
545              
546 1         4 return $ret;
547             }
548              
549             sub _is_close_wait {
550 1     1   3 my ( $self ) = @_;
551 1 50 33     22 return 1 unless $self->{socket} && $self->{_io_select}; # closed already
552             # http://stefan.buettcher.org/cs/conn_closed.html
553             # socket is open; check if we can read, and if we can but recv() cannot peek, it means we got EOF
554 1 50       6 return unless $self->{_io_select}->can_read( 0 ); # we cannot read, but may be able to write
555 0         0 my $buf = '';
556 0         0 undef $!;
557 0         0 my $status = CORE::recv( $self->{socket}, $buf, 1, MSG_DONTWAIT | MSG_PEEK ); # peek, do not remove data from queue
558             # EOF when there is no error, status is defined, but result is empty
559 0   0     0 return ! $! && defined $status && length( $buf ) == 0;
560             }
561              
562             # The method verifies if we can connect to a Kafka broker.
563             # This is evil: opens and immediately closes a NEW connection so do not use unless there is a strong reason for it.
564             sub _is_alive {
565 3     3   624897 my ( $self ) = @_;
566              
567 3         9 my $socket = $self->{socket};
568 3 100       20 return unless $socket;
569              
570 2         98 socket( my $tmp_socket, $self->{pf}, SOCK_STREAM, IPPROTO_TCP );
571 2         219 my $is_alive = connect( $tmp_socket, getpeername( $socket ) );
572 2         55 CORE::close( $tmp_socket );
573              
574 2         25 return $is_alive;
575             }
576              
577             #-- private attributes ---------------------------------------------------------
578              
579             #-- private methods ------------------------------------------------------------
580              
581             # You need to have access to Kafka instance and be able to connect through TCP.
582             # uses http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29
583             sub _connect {
584 13     13   27 my ( $self ) = @_;
585              
586 13         38 $self->{socket} = undef;
587 13         20 $self->{_io_select} = undef;
588              
589 13         32 my $name = $self->{host};
590 13         24 my $port = $self->{port};
591 13         22 my $timeout = $self->{timeout};
592              
593 13         30 my $ip = '';
594 13 100       56 if ( $self->_get_family( $name ) ) {
595 2         3 $ip = $self->{ip} = $name;
596             } else {
597 10 50       23 if ( defined $timeout ) {
598 10         18 my $remaining;
599 10         13 my $start = time();
600              
601 10 50       38 $self->_debug_msg( format_message( "name = '%s', number of wallclock seconds = %s", $name, ceil( $timeout ) ) )
602             if $self->debug_level;
603              
604             # DNS lookup.
605 10         14 local $@;
606 0     0   0 my $h = set_sig_handler( 'ALRM', sub { die 'alarm clock restarted' },
607             {
608 10         107 mask => [ 'ALRM' ],
609             safe => 0, # perl 5.8+ uses safe signal delivery so we need unsafe signal for timeout to work
610             }
611             );
612 10         1437 eval {
613 10         106 $remaining = alarm( ceil( $timeout ) );
614 10         41 $ip = $self->_gethostbyname( $name );
615 10         15000260 alarm 0;
616             };
617 10         81 alarm 0; # race condition protection
618 10         29 my $error = $@;
619 10         93 undef $h;
620              
621 10 50       816 $self->_debug_msg( format_message( "_connect: ip = '%s', error = '%s', \$? = %s, \$! = '%s'", $ip, $error, $?, $! ) )
622             if $self->debug_level;
623              
624 10 50       28 die $error if $error;
625 10 100       35 die( format_message( "gethostbyname %s: \$? = '%s', \$! = '%s'\n", $name, $?, $! ) ) unless $ip;
626              
627 8         15 my $elapsed = time() - $start;
628             # $SIG{ALRM} restored automatically, but we need to restart previous alarm manually
629              
630 8 50       16 $self->_debug_msg( format_message( '_connect: %s (remaining) - %s (elapsed) = %s', $remaining, $elapsed, $remaining - $elapsed ) )
631             if $self->debug_level;
632 8 100       21 if ( $remaining ) {
633 2 100       9 if ( $remaining - $elapsed > 0 ) {
634 1 50       3 $self->_debug_msg( '_connect: remaining - elapsed > 0 (to alarm restart)' )
635             if $self->debug_level;
636 1         9 alarm( ceil( $remaining - $elapsed ) );
637             } else {
638 1 50       32 $self->_debug_msg( '_connect: remaining - elapsed < 0 (to alarm function call)' )
639             if $self->debug_level;
640             # $SIG{ALRM}->();
641 1         52 kill ALRM => $$;
642             }
643 2 50       18 $self->_debug_msg( "_connect: after alarm 'recalled'" )
644             if $self->debug_level;
645             }
646             } else {
647 0         0 $ip = $self->_gethostbyname( $name );
648 0 0       0 die( format_message( "could not resolve host name to IP address: %s\n", $name ) ) unless $ip;
649             }
650             }
651              
652             # Create socket.
653 10 50       1077 socket( my $connection, $self->{pf}, SOCK_STREAM, scalar getprotobyname( 'tcp' ) ) or die( "socket: $!\n" );
654              
655             # Set autoflushing.
656 10         73 my $file_handle = select( $connection ); $| = 1; select $file_handle;
  10         38  
  10         27  
657              
658             # Set FD_CLOEXEC.
659 10 50       71 my $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl: $!\n";
660 10 50       93 fcntl( $connection, F_SETFL, $flags | FD_CLOEXEC ) or die "fnctl: $!\n";
661              
662 10 50       71 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
663 10 50       68 fcntl( $connection, F_SETFL, $flags | O_NONBLOCK ) or die "fcntl F_SETFL O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
664              
665             # Connect returns immediately because of O_NONBLOCK.
666             my $sockaddr = $self->{af} eq AF_INET
667             ? pack_sockaddr_in( $port, inet_aton( $ip ) )
668 10 100       124 : pack_sockaddr_in6( $port, inet_pton( $self->{af}, $ip ) )
669             ;
670 10 100 66     1024 connect( $connection, $sockaddr ) || $!{EINPROGRESS} || die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
671              
672             # Reset O_NONBLOCK.
673 9 50       259 $flags = fcntl( $connection, F_GETFL, 0 ) or die "fcntl F_GETFL: $!\n"; # 0 for error, 0e0 for 0.
674 9 50       65 fcntl( $connection, F_SETFL, $flags & ~ O_NONBLOCK ) or die "fcntl F_SETFL not O_NONBLOCK: $!\n"; # 0 for error, 0e0 for 0.
675              
676             # Use select() to poll for completion or error. When connect succeeds we can write.
677 9         39 my $vec = '';
678 9         44 vec( $vec, fileno( $connection ), 1 ) = 1;
679 9   33     96 select( undef, $vec, undef, $timeout // $REQUEST_TIMEOUT );
680 9 50       38 unless ( vec( $vec, fileno( $connection ), 1 ) ) {
681             # If no response yet, impose our own timeout.
682 0         0 $! = ETIMEDOUT;
683 0         0 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) );
684             }
685              
686             # This is how we see whether it connected or there was an error. Document Unix, are you kidding?!
687 9         103 $! = unpack( 'L', getsockopt( $connection, SOL_SOCKET, SO_ERROR ) );
688 9 50       40 die( format_message( "connect ip = %s, port = %s: %s\n", $ip, $port, $! ) ) if $!;
689              
690             # Set timeout on all reads and writes.
691             #
692             # Note the difference between Perl's sysread() and read() calls: sysread()
693             # queries the kernel exactly once, with max delay specified here. read()
694             # queries the kernel repeatedly until there's a read error (such as this
695             # timeout), EOF, or a full buffer. So when using read() with a timeout of one
696             # second, if the remote server sends 1 byte repeatedly at 1 second intervals,
697             # read() will read the whole buffer very slowly and sysread() will return only
698             # the first byte. The print() and syswrite() calls are similarly different.
699             # <> is of course similar to read() but delimited by newlines instead of buffer
700             # sizes.
701 9   33     33 my $timeval = _get_timeval( $timeout // $REQUEST_TIMEOUT );
702 9   50     77 setsockopt( $connection, SOL_SOCKET, SO_SNDTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_SNDTIMEO: $!\n";
703 9   50     65 setsockopt( $connection, SOL_SOCKET, SO_RCVTIMEO, $timeval ) // die "setsockopt SOL_SOCKET, SO_RCVTIMEO: $!\n";
704              
705 9         23 $self->{socket} = $connection;
706 9         95 my $s = $self->{_io_select} = IO::Select->new;
707 9         141 $s->add( $self->{socket} );
708              
709 9         452 return $connection;
710             }
711              
712             # Packing timeval
713             # uses http://trinitum.org/wp/packing-timeval/
714             sub _get_timeval {
715 9     9   16 my $timeout = shift;
716              
717 9         23 my $intval = int( $timeout ); # sec
718 9         21 my $fraction = int( ( $timeout - $intval ) * 1_000_000 ); # ms
719              
720 9 50 33     157 if ( $Config{osname} eq 'netbsd' && _major_osvers() >= 6 && $Config{longsize} == 4 ) {
      33        
721 0 0       0 if ( defined $Config{use64bitint} ) {
722 0         0 $timeout = pack( 'QL', int( $timeout ), $fraction );
723             } else {
724             $timeout = pack(
725             'LLL',
726             (
727 0 0       0 $Config{byteorder} eq '1234'
728             ? ( $timeout, 0, $fraction )
729             : ( 0, $timeout, $fraction )
730             )
731             );
732             }
733             } else {
734 9         39 $timeout = pack( 'L!L!', $timeout, $fraction );
735             }
736              
737 9         23 return $timeout;
738             }
739              
740             sub _major_osvers {
741 0     0   0 my $osvers = $Config{osvers};
742 0         0 my ( $major_osvers ) = $osvers =~ /^(\d+)/;
743 0         0 $major_osvers += 0;
744              
745 0         0 return $major_osvers;
746             }
747              
748             sub _gethostbyname {
749 8     8   20 my ( $self, $name ) = @_;
750              
751 8         12 my $is_v4_fqdn = 1;
752 8         30 $self->{ip} = '';
753              
754 8         12 my $ip_version = $self->{ip_version};
755 8 100 100     31 if ( defined( $ip_version ) && $ip_version == $IP_V6 ) {
756 1         98 my ( $err, @addrs ) = getaddrinfo(
757             $name,
758             '', # not interested in the service name
759             {
760             family => AF_INET6,
761             socktype => SOCK_STREAM,
762             protocol => IPPROTO_TCP,
763             },
764             );
765 1 50       7 return( $self->{ip} ) if $err;
766              
767 1         3 $is_v4_fqdn = 0;
768 1         4 for my $addr ( @addrs ) {
769 1         21 my ( $err, $ipaddr ) = getnameinfo( $addr->{addr}, NI_NUMERICHOST, NIx_NOSERV );
770 1 50       6 next if $err;
771              
772 1         2 $self->{af} = AF_INET6;
773 1         3 $self->{pf} = PF_INET6;
774 1         3 $self->{ip} = $ipaddr;
775 1         3 last;
776             }
777             }
778              
779 8 50 66     46 if ( $is_v4_fqdn && ( !defined( $ip_version ) || $ip_version == $IP_V4 ) ) {
      66        
780 7 100       1582825 if ( my $ipaddr = gethostbyname( $name ) ) {
781 5         51 $self->{ip} = inet_ntop( $self->{af}, $ipaddr );
782             }
783             }
784              
785 8         64 return $self->{ip};
786             }
787              
788             sub _get_family {
789 13     13   25 my ( $self, $name ) = @_;
790              
791 13         19 my $is_ip;
792 13   100     52 my $ip_version = $self->{ip_version} // 0;
793 13 100 33     83 if ( ( ( $is_ip = is_ipv6( $name ) ) && !$ip_version ) || $ip_version == $IP_V6 ) {
    100 66        
    50 100        
      100        
794 2 100 66     70 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      33        
795             if
796             $ip_version
797             && (
798             ( !$is_ip && is_ipv4( $name ) )
799             || ( $is_ip && $ip_version == $IP_V4 )
800             )
801             ;
802              
803 1         16 $self->{af} = AF_INET6;
804 1         3 $self->{pf} = PF_INET6;
805             } elsif ( ( ( $is_ip = is_ipv4( $name ) ) && !$ip_version ) || $ip_version == $IP_V4 ) {
806 3 50 33     167 $self->_error( $ERROR_INCOMPATIBLE_HOST_IP_VERSION, format_message( 'ip_version = %s, host = %s', $ip_version, $name ) )
      66        
807             if
808             $ip_version
809             && (
810             ( !$is_ip && is_ipv6( $name ) )
811             || ( $is_ip && $ip_version == $IP_V6 )
812             )
813             ;
814              
815 3         20 $self->{af} = AF_INET;
816 3         6 $self->{pf} = PF_INET;
817             } elsif ( !$ip_version ) {
818 8         453 $self->{af} = AF_INET;
819 8         12 $self->{pf} = PF_INET;
820             }
821              
822 12         48 return $is_ip;
823             }
824              
825             # Show additional debugging information
826             sub _debug_msg {
827 0     0   0 my ( $self, $message, $header, $colour ) = @_;
828              
829 0 0       0 if ( $header ) {
830 0 0       0 unless ( $_hdr ) {
831 0         0 require Data::HexDump::Range;
832 0         0 $_hdr = Data::HexDump::Range->new(
833             FORMAT => 'ANSI', # 'ANSI'|'ASCII'|'HTML'
834             COLOR => 'bw', # 'bw' | 'cycle'
835             OFFSET_FORMAT => 'hex', # 'hex' | 'dec'
836             DATA_WIDTH => 16, # 16 | 20 | ...
837             DISPLAY_RANGE_NAME => 0,
838             # MAXIMUM_RANGE_NAME_SIZE => 16,
839             DISPLAY_COLUMN_NAMES => 1,
840             DISPLAY_RULER => 1,
841             DISPLAY_OFFSET => 1,
842             # DISPLAY_CUMULATIVE_OFFSET => 1,
843             DISPLAY_ZERO_SIZE_RANGE_WARNING => 0,
844             DISPLAY_ZERO_SIZE_RANGE => 1,
845             DISPLAY_RANGE_NAME => 0,
846             # DISPLAY_RANGE_SIZE => 1,
847             DISPLAY_ASCII_DUMP => 1,
848             DISPLAY_HEX_DUMP => 1,
849             # DISPLAY_DEC_DUMP => 1,
850             # COLOR_NAMES => {},
851             ORIENTATION => 'horizontal',
852             );
853             }
854              
855             say STDERR
856 0         0 "# $header ", $self->{host}, ':', $self->{port}, "\n",
857             '# Hex Stream: ', unpack( 'H*', $message ), "\n",
858             $_hdr->dump(
859             [
860             [ 'data', length( $message ), $colour ],
861             ],
862             $message
863             )
864             ;
865             } else {
866 0         0 say STDERR format_message( '[%s] %s', scalar( localtime ), $message );
867             }
868              
869 0         0 return;
870             }
871              
872             # Handler for errors
873             sub _error {
874 88     88   91681 my $self = shift;
875 88         350 my %args = throw_args( @_ );
876 88 50       294 $self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) )
877             if $self->debug_level;
878 88         587 Kafka::Exception::IO->throw( %args );
879             }
880              
881              
882              
883             1;
884              
885             __END__
886              
887             =head1 DIAGNOSTICS
888              
889             When error is detected, an exception, represented by object of C<Kafka::Exception::IO> class,
890             is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).
891              
892             L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
893             information about thrown exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
894             for the list of all available methods.
895              
896             Authors suggest using of L<Try::Tiny|Try::Tiny>'s C<try> and C<catch> to handle exceptions while
897             working with L<Kafka|Kafka> package.
898              
899             Here is the list of possible error messages that C<Kafka::IO> may produce:
900              
901             =over 3
902              
903             =item C<Invalid argument>
904              
905             Invalid arguments were passed to a method.
906              
907             =item C<Cannot send>
908              
909             Message cannot be sent on a C<Kafka::IO> object socket.
910              
911             =item C<Cannot receive>
912              
913             Message cannot be received.
914              
915             =item C<Cannot bind>
916              
917             TCP connection cannot be established on given host and port.
918              
919             =back
920              
921             =head2 Debug mode
922              
923             Debug output can be enabled by passing desired level via environment variable
924             using one of the following ways:
925              
926             C<PERL_KAFKA_DEBUG=1> - debug is enabled for the whole L<Kafka|Kafka> package.
927              
928             C<PERL_KAFKA_DEBUG=IO:1> - enable debug for C<Kafka::IO> only.
929              
930             C<Kafka::IO> supports two debug levels (level 2 includes debug output of 1):
931              
932             =over 3
933              
934             =item 1
935              
936             Additional information about processing events/alarms.
937              
938             =item 2
939              
940             Dump of binary messages exchange with Kafka server.
941              
942             =back
943              
944             =head1 SEE ALSO
945              
946             The basic operation of the Kafka package modules:
947              
948             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
949              
950             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
951              
952             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
953              
954             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
955              
956             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
957             properties.
958              
959             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
960             protocol on 32 bit systems.
961              
962             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
963             Apache Kafka's Protocol.
964              
965             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
966              
967             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
968              
969             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
970             by several package modules.
971              
972             A wealth of detail about the Apache Kafka and the Kafka Protocol:
973              
974             Main page at L<http://kafka.apache.org/>
975              
976             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
977              
978             =head1 SOURCE CODE
979              
980             Kafka package is hosted on GitHub:
981             L<https://github.com/TrackingSoft/Kafka>
982              
983             =head1 AUTHOR
984              
985             Sergey Gladkov
986              
987             Please use GitHub project link above to report problems or contact authors.
988              
989             =head1 CONTRIBUTORS
990              
991             Alexander Solovey
992              
993             Jeremy Jordan
994              
995             Sergiy Zuban
996              
997             Vlad Marchenko
998              
999             Damien Krotkine
1000              
1001             =head1 COPYRIGHT AND LICENSE
1002              
1003             Copyright (C) 2012-2017 by TrackingSoft LLC.
1004              
1005             This package is free software; you can redistribute it and/or modify it under
1006             the same terms as Perl itself. See I<perlartistic> at
1007             L<http://dev.perl.org/licenses/artistic.html>.
1008              
1009             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
1010             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
1011             PARTICULAR PURPOSE.
1012              
1013             =cut