File Coverage

lib/Kafka/IO/Async.pm
Criterion Covered Total %
statement 41 164 25.0
branch 0 48 0.0
condition 0 21 0.0
subroutine 14 36 38.8
pod 5 5 100.0
total 60 274 21.9


line stmt bran cond sub pod time code
1             package Kafka::IO::Async;
2              
3             =head1 NAME
4              
5             Kafka::IO::Async - Pseudo async interface to nonblocking network communication with the Apache Kafka server with Coro.
6             This module implements the same interface that usual Kafka::IO module
7              
8             =head1 VERSION
9              
10             Read documentation for C<Kafka::IO> version 1.08 .
11              
12             =cut
13              
14              
15              
16 9     9   190 use 5.010;
  9         32  
17 9     9   46 use strict;
  9         17  
  9         204  
18 9     9   44 use warnings;
  9         17  
  9         492  
19              
20              
21              
22             our $DEBUG = 0;
23              
24             our $VERSION = 'v1.08';
25              
26 9     9   56 use Carp;
  9         35  
  9         563  
27 9     9   57 use Config;
  9         18  
  9         383  
28 9     9   62 use Const::Fast;
  9         15  
  9         77  
29 9     9   860 use Fcntl;
  9         17  
  9         2331  
30 9         528 use Params::Util qw(
31             _STRING
32 9     9   109 );
  9         22  
33 9         479 use Scalar::Util qw(
34             dualvar
35 9     9   51 );
  9         16  
36 9     9   53 use Try::Tiny;
  9         17  
  9         622  
37              
38 9         1262 use Kafka qw(
39             $ERROR_CANNOT_BIND
40             $ERROR_CANNOT_RECV
41             $ERROR_CANNOT_SEND
42             $ERROR_MISMATCH_ARGUMENT
43             $ERROR_INCOMPATIBLE_HOST_IP_VERSION
44             $ERROR_NO_CONNECTION
45             $IP_V4
46             $IP_V6
47             $KAFKA_SERVER_PORT
48             $REQUEST_TIMEOUT
49 9     9   58 );
  9         17  
50 9     9   63 use Kafka::Exceptions;
  9         63  
  9         490  
51 9         863 use Kafka::Internals qw(
52             $MAX_SOCKET_REQUEST_BYTES
53             debug_level
54             format_message
55 9     9   57 );
  9         13  
56              
57 9     9   6978 use AnyEvent::Handle;
  9         188491  
  9         16443  
58              
59              
60              
61             =head1 SYNOPSIS
62              
63             use 5.010;
64             use strict;
65             use warnings;
66              
67             use Scalar::Util qw(
68             blessed
69             );
70             use Try::Tiny;
71              
72             use Kafka::IO::Async;
73              
74             my $io;
75             try {
76             $io = Kafka::IO::Async->new( host => 'localhost' );
77             } catch {
78             my $error = $_;
79             if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
80             warn 'Error: (', $error->code, ') ', $error->message, "\n";
81             exit;
82             } else {
83             die $error;
84             }
85             };
86              
87             # Closes and cleans up
88             $io->close;
89             undef $io;
90              
91             =head1 DESCRIPTION
92              
93             This module is private and should not be used directly.
94              
95             In order to achieve better performance, methods of this module do not
96             perform arguments validation.
97              
98             The main features of the C<Kafka::IO::Async> class are:
99              
100             =over 3
101              
102             =item *
103              
104             Provides an object oriented API for communication with Kafka.
105              
106             =item *
107              
108             This class allows you to create Kafka 0.9+ clients.
109              
110             =back
111              
112             =cut
113              
114             our $_hdr;
115              
116             #-- constructor ----------------------------------------------------------------
117              
118             =head2 CONSTRUCTOR
119              
120             =head3 C<new>
121              
122             Establishes TCP connection to given host and port, creates and returns C<Kafka::IO::Async> IO object.
123              
124             C<new()> takes arguments in key-value pairs. The following arguments are currently recognized:
125              
126             =over 3
127              
128             =item C<host =E<gt> $host>
129              
130             C<$host> is Kafka host to connect to. It can be a host name or an IP-address in
131             IPv4 or IPv6 form (for example '127.0.0.1', '0:0:0:0:0:0:0:1' or '::1').
132              
133             =item C<port =E<gt> $port>
134              
135             Optional, default = C<$KAFKA_SERVER_PORT>.
136              
137             C<$port> is integer attribute denoting the port number of to access Apache Kafka.
138              
139             C<$KAFKA_SERVER_PORT> is the default Apache Kafka server port that can be imported
140             from the L<Kafka|Kafka> module.
141              
142             =item C<timeout =E<gt> $timeout>
143              
144             C<$REQUEST_TIMEOUT> is the default timeout that can be imported from the L<Kafka|Kafka> module.
145              
146             Special behavior when C<timeout> is set to C<undef>:
147              
148             =back
149              
150             =over 3
151              
152             =item *
153              
154             Alarms are not used internally (namely when performing C<gethostbyname>).
155              
156             =item *
157              
158             Default C<$REQUEST_TIMEOUT> is used for the rest of IO operations.
159              
160             =back
161              
162             =over 3
163              
164             =item C<ip_version =E<gt> $ip_version>
165              
166             Force version of IP protocol for resolving host name (or interpretation of passed address).
167              
168             Optional, undefined by default, which works in the following way: version of IP address
169             is detected automatically, host name is resolved into IPv4 address.
170              
171             See description of L<$IP_V4|Kafka::IO/$IP_V4>, L<$IP_V6|Kafka::IO/$IP_V6>
172             in C<Kafka> L<EXPORT|Kafka/EXPORT>.
173              
174             =back
175              
176             =cut
177             sub new {
178 0     0 1   my ( $class, %p ) = @_;
179              
180 0           my $self = bless {
181             host => '',
182             timeout => $REQUEST_TIMEOUT,
183             port => $KAFKA_SERVER_PORT,
184             ip_version => undef,
185             af => '', # Address family constant
186             pf => '', # Protocol family constant
187             ip => '', # Human-readable textual representation of the ip address
188             }, $class;
189              
190 0   0       exists $p{$_} and $self->{$_} = $p{$_} foreach keys %$self;
191              
192             # we trust it: make it untainted
193 0           ( $self->{host} ) = $self->{host} =~ /\A(.+)\z/;
194 0           ( $self->{port} ) = $self->{port} =~ /\A(.+)\z/;
195              
196 0           $self->{socket} = undef;
197 0           $self->{_io_select} = undef;
198 0           my $error;
199             try {
200 0     0     $self->_connect();
201             } catch {
202 0     0     $error = $_;
203 0           };
204              
205 0 0         $self->_error( $ERROR_CANNOT_BIND, format_message("Kafka::IO::Async(%s:%s)->new: %s", $self->{host}, $self->{port}, $error ) )
206             if defined $error
207             ;
208 0           return $self;
209             }
210              
211             #-- public attributes ----------------------------------------------------------
212              
213             =head2 METHODS
214              
215             The following methods are provided by C<Kafka::IO::Async> class:
216              
217             =cut
218              
219             =head3 C<< send( $message <, $timeout> ) >>
220              
221             Sends a C<$message> to Kafka.
222              
223             The argument must be a bytes string.
224              
225             Use optional C<$timeout> argument to override default timeout for this request only.
226              
227             Returns the number of characters sent.
228              
229             =cut
230             sub send {
231 0     0 1   my ( $self, $message, $timeout ) = @_;
232 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
233             unless defined( _STRING( $message ) )
234             ;
235 0           my $length = length( $message );
236 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
237             unless $length <= $MAX_SOCKET_REQUEST_BYTES
238             ;
239 0 0 0       $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
240 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->send' )
241             unless $timeout > 0
242             ;
243              
244 0           my $socket = $self->{socket};
245 0 0 0       $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) if !$socket || $socket->destroyed;
246              
247 0           $socket->wtimeout_reset;
248 0           $socket->wtimeout($timeout);
249             $socket->on_wtimeout(sub {
250             #my ($h) = @_;
251 0     0     $self->close;
252             $self->_error(
253             $ERROR_CANNOT_SEND,
254             format_message( "Kafka::IO::Async(%s)->send: ERROR='%s' (length=%s, timeout=%s)",
255             $self->{host},
256 0           'Write timeout fired',
257             $length,
258             $timeout,
259             )
260             );
261 0           });
262              
263 0           $socket->push_write($message);
264              
265 0           return length($message);
266             }
267              
268             =head3 C<< receive( $length <, $timeout> ) >>
269              
270             Receives a message up to C<$length> size from Kafka.
271              
272             C<$length> argument must be a positive number.
273              
274             Use optional C<$timeout> argument to override default timeout for this call only.
275              
276             Returns a reference to the received message.
277              
278             =cut
279             sub receive {
280 0     0 1   my ( $self, $length, $timeout ) = @_;
281 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
282             unless $length > 0
283             ;
284 0 0 0       $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
285 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
286             unless $timeout > 0
287             ;
288 0           my $socket = $self->{socket};
289 0 0 0       $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) if !$socket || $socket->destroyed;
290              
291 0           my $message = '';
292 0           my $error;
293 0           my $cv = AnyEvent->condvar;
294              
295 0           $socket->rtimeout_reset;
296 0           $socket->rtimeout($timeout);
297             $socket->on_rtimeout(sub {
298             #my ($h) = @_;
299 0     0     $self->close;
300             $error = format_message( "Kafka::IO::Async(%s)->receive: ERROR='%s' (timeout=%s)",
301             $self->{host},
302 0           'Read timeout fired',
303             $timeout,
304             );
305 0           $cv->send;
306 0           });
307              
308             $socket->on_error(sub {
309 0     0     my ($h, $fatal, $message) = @_;
310 0 0         $self->close if $fatal;
311             $error = format_message( "Kafka::IO::Async(%s)->handle: ERROR='%s' FATAL=%s",
312             $self->{host},
313 0           $message,
314             $fatal,
315             );
316 0           $cv->send;
317 0           });
318              
319             $socket->push_read(chunk => $length, sub {
320 0     0     my ($h, $data) = @_;
321 0           $message = $data;
322 0           $cv->send;
323 0           });
324              
325 0           $cv->recv;
326 0           $socket->rtimeout(0);
327 0 0         die $error if $error;
328              
329 0           return \$message;
330             }
331              
332             =head3 C<< try_receive( $length <, $timeout> ) >>
333             Receives a message up to C<$length> size from Kafka.
334              
335             C<$length> argument must be a positive number.
336              
337             Use optional C<$timeout> argument to override default timeout for this call only.
338              
339             Returns a reference to the received message.
340              
341             =cut
342              
343             sub try_receive {
344 0     0 1   my ( $self, $length, $timeout ) = @_;
345 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
346             unless $length > 0
347             ;
348 0 0 0       $timeout = $self->{timeout} // $REQUEST_TIMEOUT unless defined $timeout;
349 0 0         $self->_error( $ERROR_MISMATCH_ARGUMENT, '->receive' )
350             unless $timeout > 0
351             ;
352 0           my $socket = $self->{socket};
353 0 0 0       $self->_error( $ERROR_NO_CONNECTION, 'Attempt to work with a closed socket' ) if !$socket || $socket->destroyed;
354              
355 0           my $message = '';
356 0           my $error;
357 0           my $cv = AnyEvent->condvar;
358              
359 0           $socket->rtimeout($timeout);
360             $socket->on_rtimeout(sub {
361             #my ($h) = @_;
362 0     0     $self->close;
363             $error = format_message( "Kafka::IO::Async(%s)->receive: ERROR='%s' (timeout=%s)",
364             $self->{host},
365 0           'Read timeout fired',
366             $timeout,
367             );
368 0           $cv->send;
369 0           });
370              
371             $socket->on_eof(sub {
372 0     0     $message = undef;
373 0           $cv->send;
374 0           });
375              
376             $socket->on_read(sub {
377             #my ($h, $data) = @_;
378 0     0     my ($h) = @_;
379 0           $message = substr $h->{rbuf}, 0, $length, '';
380 0           $h->on_read();
381 0           $h->on_eof();
382 0           $cv->send;
383 0           });
384              
385 0           $cv->recv;
386 0           $socket->rtimeout(0);
387 0 0         die $error if $error;
388              
389 0           return \$message;
390             }
391              
392             =head3 C<close>
393              
394             Closes connection to Kafka server.
395             Returns true if those operations succeed and if no error was reported by any PerlIO layer.
396              
397             =cut
398             sub close {
399 0     0 1   my ( $self ) = @_;
400              
401 0           my $ret = 1;
402 0 0         if ( $self->{socket} ) {
403 0           $self->{socket}->destroy;
404 0           $self->{socket} = undef;
405             }
406              
407 0           return $ret;
408             }
409              
410              
411             #-- private attributes ---------------------------------------------------------
412              
413             #-- private methods ------------------------------------------------------------
414              
415             # You need to have access to Kafka instance and be able to connect through TCP.
416             # uses http://devpit.org/wiki/Connect%28%29_with_timeout_%28in_Perl%29
417             sub _connect {
418 0     0     my ( $self ) = @_;
419              
420 0           my $error;
421 0           $self->{socket} = undef;
422              
423 0           my $name = $self->{host};
424 0           my $port = $self->{port};
425 0           my $timeout = $self->{timeout};
426              
427 0           my $cv = AnyEvent->condvar;
428              
429             my $connection = AnyEvent::Handle->new(
430             connect => [$name, $port],
431 0     0     ($timeout ? (on_prepare => sub { $timeout } ) : ()),
432             on_connect => sub {
433             #my ($h, $host, $port, $retry) = @_;
434 0     0     $cv->send;
435             },
436             on_connect_error => sub {
437 0     0     my ($h, $message) = @_;
438 0           $error = format_message( "connect host = %s, port = %s: %s\n", $name, $port, $message );
439 0           $cv->send;
440             },
441             on_error => sub {
442 0     0     my ($h, $fatal, $message) = @_;
443 0 0         $self->close if $fatal;
444             $self->_error(
445             $ERROR_NO_CONNECTION,
446             format_message( "Kafka::IO::Async(%s)->handle: ERROR='%s' FATAL=%s",
447             $self->{host},
448 0           $message,
449             $fatal,
450             )
451             );
452             },
453             on_drain => sub {
454 0     0     my ($h) = @_;
455 0           $h->wtimeout(0);
456             }
457 0 0         );
458              
459 0           $cv->recv;
460              
461 0 0         die $error if $error;
462              
463 0           $self->{socket} = $connection;
464 0           return $connection;
465             }
466              
467              
468             # Show additional debugging information
469             sub _debug_msg {
470 0     0     my ( $self, $message, $header, $colour ) = @_;
471              
472 0 0         if ( $header ) {
473 0 0         unless ( $_hdr ) {
474 0           require Data::HexDump::Range;
475 0           $_hdr = Data::HexDump::Range->new(
476             FORMAT => 'ANSI', # 'ANSI'|'ASCII'|'HTML'
477             COLOR => 'bw', # 'bw' | 'cycle'
478             OFFSET_FORMAT => 'hex', # 'hex' | 'dec'
479             DATA_WIDTH => 16, # 16 | 20 | ...
480             DISPLAY_RANGE_NAME => 0,
481             # MAXIMUM_RANGE_NAME_SIZE => 16,
482             DISPLAY_COLUMN_NAMES => 1,
483             DISPLAY_RULER => 1,
484             DISPLAY_OFFSET => 1,
485             # DISPLAY_CUMULATIVE_OFFSET => 1,
486             DISPLAY_ZERO_SIZE_RANGE_WARNING => 0,
487             DISPLAY_ZERO_SIZE_RANGE => 1,
488             DISPLAY_RANGE_NAME => 0,
489             # DISPLAY_RANGE_SIZE => 1,
490             DISPLAY_ASCII_DUMP => 1,
491             DISPLAY_HEX_DUMP => 1,
492             # DISPLAY_DEC_DUMP => 1,
493             # COLOR_NAMES => {},
494             ORIENTATION => 'horizontal',
495             );
496             }
497              
498             say STDERR
499 0           "# $header ", $self->{host}, ':', $self->{port}, "\n",
500             '# Hex Stream: ', unpack( 'H*', $message ), "\n",
501             $_hdr->dump(
502             [
503             [ 'data', length( $message ), $colour ],
504             ],
505             $message
506             )
507             ;
508             } else {
509 0           say STDERR format_message( '[%s] %s', scalar( localtime ), $message );
510             }
511              
512 0           return;
513             }
514              
515             # Handler for errors
516             sub _error {
517 0     0     my $self = shift;
518 0           my %args = throw_args( @_ );
519 0 0         $self->_debug_msg( format_message( 'throwing IO error %s: %s', $args{code}, $args{message} ) )
520             if $self->debug_level;
521 0           Kafka::Exception::IO->throw( %args );
522             }
523              
524              
525              
526             1;
527              
528             __END__
529              
530             =head1 DIAGNOSTICS
531              
532             When error is detected, an exception, represented by object of C<Kafka::Exception::IO> class,
533             is thrown (see L<Kafka::Exceptions|Kafka::Exceptions>).
534              
535             L<code|Kafka::Exceptions/code> and a more descriptive L<message|Kafka::Exceptions/message> provide
536             information about thrown exception. Consult documentation of the L<Kafka::Exceptions|Kafka::Exceptions>
537             for the list of all available methods.
538              
539             Authors suggest using of L<Try::Tiny|Try::Tiny>'s C<try> and C<catch> to handle exceptions while
540             working with L<Kafka|Kafka> package.
541              
542             Here is the list of possible error messages that C<Kafka::IO::Async> may produce:
543              
544             =over 3
545              
546             =item C<Invalid argument>
547              
548             Invalid arguments were passed to a method.
549              
550             =item C<Cannot send>
551              
552             Message cannot be sent on a C<Kafka::IO::Async> object socket.
553              
554             =item C<Cannot receive>
555              
556             Message cannot be received.
557              
558             =item C<Cannot bind>
559              
560             TCP connection cannot be established on given host and port.
561              
562             =back
563              
564             =head2 Debug mode
565              
566             Debug output can be enabled by passing desired level via environment variable
567             using one of the following ways:
568              
569             C<PERL_KAFKA_DEBUG=1> - debug is enabled for the whole L<Kafka|Kafka> package.
570              
571             C<PERL_KAFKA_DEBUG=IO:1> - enable debug for C<Kafka::IO::Async> only.
572              
573             C<Kafka::IO::Async> supports two debug levels (level 2 includes debug output of 1):
574              
575             =over 3
576              
577             =item 1
578              
579             Additional information about processing events/alarms.
580              
581             =item 2
582              
583             Dump of binary messages exchange with Kafka server.
584              
585             =back
586              
587             =head1 SEE ALSO
588              
589             The basic operation of the Kafka package modules:
590              
591             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
592              
593             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
594              
595             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
596              
597             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
598              
599             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
600             properties.
601              
602             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
603             protocol on 32 bit systems.
604              
605             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
606             Apache Kafka's Protocol.
607              
608             L<Kafka::IO::Async|Kafka::IO::Async> - low-level interface for communication with Kafka server.
609              
610             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
611              
612             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
613             by several package modules.
614              
615             A wealth of detail about the Apache Kafka and the Kafka Protocol:
616              
617             Main page at L<http://kafka.apache.org/>
618              
619             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
620              
621             =head1 SOURCE CODE
622              
623             Kafka package is hosted on GitHub:
624             L<https://github.com/TrackingSoft/Kafka>
625              
626             =head1 AUTHOR
627              
628             Sergey Gladkov
629              
630             Please use GitHub project link above to report problems or contact authors.
631              
632             =head1 CONTRIBUTORS
633              
634             Alexander Solovey
635              
636             Jeremy Jordan
637              
638             Sergiy Zuban
639              
640             Vlad Marchenko
641              
642             Damien Krotkine
643              
644             =head1 COPYRIGHT AND LICENSE
645              
646             Copyright (C) 2012-2017 by TrackingSoft LLC.
647              
648             This package is free software; you can redistribute it and/or modify it under
649             the same terms as Perl itself. See I<perlartistic> at
650             L<http://dev.perl.org/licenses/artistic.html>.
651              
652             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
653             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
654             PARTICULAR PURPOSE.
655              
656             =cut