File Coverage

lib/Kafka/Internals.pm
Criterion Covered Total %
statement 75 75 100.0
branch 17 20 85.0
condition 12 17 70.5
subroutine 17 17 100.0
pod 3 3 100.0
total 124 132 93.9


line stmt bran cond sub pod time code
1             package Kafka::Internals;
2              
3             =head1 NAME
4              
5             Kafka::Internals - Constants and functions used internally.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Internals> version 1.08 .
10              
11             =cut
12              
13              
14              
15 17     17   341622 use 5.010;
  17         56  
16 17     17   85 use strict;
  17         28  
  17         345  
17 17     17   75 use warnings;
  17         39  
  17         856  
18              
19             our $VERSION = 'v1.08';
20              
21 17         1319 use Exporter qw(
22             import
23 17     17   114 );
  17         37  
24              
25             our @EXPORT_OK = qw(
26             $APIKEY_PRODUCE
27             $APIKEY_FETCH
28             $APIKEY_OFFSET
29             $APIKEY_METADATA
30             $APIKEY_FINDCOORDINATOR
31             $APIKEY_APIVERSIONS
32             $APIKEY_OFFSETCOMMIT
33             $APIKEY_OFFSETFETCH
34             $APIKEY_SASLHANDSHAKE
35             $DEFAULT_RAISE_ERROR
36             $MAX_CORRELATIONID
37             $MAX_INT16
38             $MAX_INT32
39             $MAX_SOCKET_REQUEST_BYTES
40             $PRODUCER_ANY_OFFSET
41             debug_level
42             _isbig
43             _get_CorrelationId
44             format_message
45             format_reference
46             );
47              
48              
49              
50 17     17   120 use overload;
  17         40  
  17         125  
51 17     17   1133 use Carp;
  17         31  
  17         1129  
52 17     17   113 use Const::Fast;
  17         42  
  17         144  
53 17     17   12221 use Data::Dumper ();
  17         116402  
  17         593  
54 17         915 use Params::Util qw(
55             _INSTANCE
56 17     17   1731 );
  17         8399  
57 17     17   2788 use Try::Tiny;
  17         10615  
  17         860  
58              
59 17         7585 use Kafka qw(
60             %ERROR
61             $ERROR_MISMATCH_ARGUMENT
62 17     17   590 );
  17         34  
63              
64              
65              
66             =head1 SYNOPSIS
67              
68             use 5.010;
69             use strict;
70             use warnings;
71              
72             use Kafka::Internals qw(
73             $MAX_SOCKET_REQUEST_BYTES
74             );
75              
76             my $bin_stream_size = $MAX_SOCKET_REQUEST_BYTES;
77              
78             =head1 DESCRIPTION
79              
80             This module is private and should not be used directly.
81              
82             In order to achieve better performance, functions of this module do
83             not perform arguments validation.
84              
85             =head2 EXPORT
86              
87             The following constants are available for export
88              
89             =cut
90              
91             #-- Api Keys
92              
93             =head3 C<$APIKEY_PRODUCE>
94              
95             The numeric code that the C<ApiKey> in the request take for the C<ProduceRequest> request type.
96              
97             =cut
98             const our $APIKEY_PRODUCE => 0;
99              
100             =head3 C<$APIKEY_FETCH>
101              
102             The numeric code that the C<ApiKey> in the request take for the C<FetchRequest> request type.
103              
104             =cut
105             const our $APIKEY_FETCH => 1;
106              
107             =head3 C<$APIKEY_OFFSET>
108              
109             The numeric code that the C<ApiKey> in the request take for the C<OffsetRequest> request type.
110              
111             =cut
112             const our $APIKEY_OFFSET => 2;
113              
114             =head3 C<$APIKEY_METADATA>
115              
116             The numeric code that the C<ApiKey> in the request take for the C<MetadataRequest> request type.
117              
118             =cut
119             const our $APIKEY_METADATA => 3;
120             # The numeric code that the ApiKey in the request take for the C<LeaderAndIsrRequest> request type.
121             const our $APIKEY_LEADERANDISR => 4; # Not used now
122             # The numeric code that the ApiKey in the request take for the C<StopReplicaRequest> request type.
123             const our $APIKEY_STOPREPLICA => 5; # Not used now
124              
125             =head3 C<$APIKEY_OFFSETCOMMIT>
126              
127             The numeric code that the ApiKey in the request take for the C<OffsetCommitRequest> request type.
128              
129             =cut
130             const our $APIKEY_OFFSETCOMMIT => 8;
131              
132             =head3 C<$APIKEY_OFFSETFETCH>
133              
134             The numeric code that the ApiKey in the request take for the C<OffsetFetchRequest> request type.
135              
136             =cut
137             const our $APIKEY_OFFSETFETCH => 9;
138              
139             =head3 C<$APIKEY_FINDCOORDINATOR>
140              
141             The numeric code that the C<ApiKey> in the request take for the C<FindCoordinator> request type.
142              
143             =cut
144             const our $APIKEY_FINDCOORDINATOR => 10;
145              
146             =head3 C<$APIKEY_SASLHANDSHAKE>
147              
148             The numeric code that the C<ApiKey> in the request take for the C<SaslHandshake> request type.
149              
150             =cut
151             const our $APIKEY_SASLHANDSHAKE => 17;
152              
153             =head3 C<$APIKEY_APIVERSIONS>
154              
155             The numeric code that the C<ApiKey> in the request take for the C<ApiVersions> request type.
156              
157             =cut
158             const our $APIKEY_APIVERSIONS => 18;
159              
160             # Important configuration properties
161              
162             =head3 C<$MAX_SOCKET_REQUEST_BYTES>
163              
164             The maximum number of bytes in a socket request.
165              
166             The maximum size of a request that the socket server will accept.
167             Default limit (as configured in F<server.properties>) is 104857600.
168              
169             =cut
170             const our $MAX_SOCKET_REQUEST_BYTES => 100 * 1024 * 1024;
171              
172             =head3 C<$PRODUCER_ANY_OFFSET>
173              
174             According to Apache Kafka documentation: 'When the producer is sending messages it doesn't actually know the offset and can fill in any
175             value here it likes.'
176              
177             =cut
178             const our $PRODUCER_ANY_OFFSET => 0;
179              
180             =head3 C<$MAX_CORRELATIONID>
181              
182             Largest positive integer on 32-bit machines.
183              
184             =cut
185             const our $MAX_CORRELATIONID => 0x7fffffff;
186              
187             =head3 C<$MAX_INT32>
188              
189             Largest positive integer on 32-bit machines.
190              
191             =cut
192             const our $MAX_INT32 => 0x7fffffff;
193              
194             =head3 C<$MAX_INT16>
195              
196             Largest positive int16 value.
197              
198             =cut
199             const our $MAX_INT16 => 0x7fff;
200              
201             #-- public functions -----------------------------------------------------------
202              
203             #-- private functions ----------------------------------------------------------
204              
205             # Used to generate a CorrelationId.
206             sub _get_CorrelationId {
207 10175     10175   78049 return( -int( rand( $MAX_CORRELATIONID ) ) );
208             }
209              
210             # Verifies that the argument is of Math::BigInt type
211             sub _isbig {
212 10386     10386   13538 my ( $num ) = @_;
213              
214 10386         195156 return defined _INSTANCE( $num, 'Math::BigInt' );
215             }
216              
217             #-- public attributes ----------------------------------------------------------
218              
219             =head2 METHODS
220              
221             The following methods are defined in the C<Kafka::Internals>:
222              
223             =cut
224              
225             #-- public methods -------------------------------------------------------------
226              
227             =head3 C<debug_level( $flags )>
228              
229             Gets or sets debug level for a particular L<Kafka|Kafka> module, based on environment variable C<PERL_KAFKA_DEBUG> or flags.
230              
231             $flags - (string) argument that can be used to pass coma delimited module names (omit C<Kafka::>).
232              
233             Returns C<$DEBUG> level for the module from which C<debug_level> was called.
234              
235             =cut
236             our %_debug_levels; # per-module levels cache to speed-up multiple calls to debug_level()
237             sub debug_level {
238 17     17   145 no strict 'refs'; ## no critic
  17         33  
  17         12309  
239              
240 36566   66 36566 1 10472416 my $class = ref( $_[0] ) || $_[0];
241              
242 36566 100 100     94554 return ${ $_debug_levels{ $class } } if @_ == 1 && exists $_debug_levels{ $class };
  36533         68319  
243              
244 33   100     196 my $flags = $_[1] // $ENV{PERL_KAFKA_DEBUG};
245 33 100       110 if( defined $flags ) {
246 9         43 foreach my $spec ( split /\s*,\s*/, $flags ) {
247 12         46 my @elements = split( /\s*:\s*/, $spec, 2 );
248 12         18 my( $module_name, $level );
249 12 100       27 if ( scalar( @elements ) > 1 ) {
250 8         17 ( $module_name, $level ) = @elements;
251             } else {
252 4         25 $module_name = ( $class =~ /([^:]+)$/ )[0];
253 4         12 $level = $spec;
254             }
255              
256 12         18 *{ "Kafka::${module_name}::DEBUG" } = \$level; ## no critic
  12         60  
257             }
258             }
259 33         54 $_debug_levels{ $class } = \${ "${class}::DEBUG" }; ## no critic
  33         241  
260              
261 33         69 return ${ $_debug_levels{ $class } };
  33         186  
262             }
263              
264             =head2 format_reference
265              
266             say format_reference( $object );
267              
268             Dumps reference using preconfigured L<Data::Dumper>. Produces less verbose
269             output than default L<Data::Dumper> settings.
270              
271             =cut
272              
273             my $dumper;
274             my $empty_array = [];
275              
276             sub format_reference {
277 62     62 1 121 my ( $value ) = @_;
278              
279 62 100       133 unless( $dumper ) {
280 5         110 $dumper = Data::Dumper->new( $empty_array )
281             ->Indent( 0 )
282             ->Terse( 1 )
283             ->Quotekeys( 0 )
284             ->Sortkeys( 1 )
285             ->Useperl( 1 ) # XS version seems to have a bug which sometimes results in modification of original object
286             # ->Sparseseen( 1 ) # speed up since we don't use "Seen" hash
287             ;
288             }
289              
290 62         787 my $r;
291 62 100 66     182 if (
292             overload::Overloaded( $value ) &&
293             overload::Method( $value, '""' )
294             ) {
295 8         1797 $r = "$value"; # force stringification
296             } else {
297 54         2569 $r = $dumper->Values( [ $value ] )->Dump;
298 54         10030 $dumper->Reset->Values( $empty_array );
299             }
300              
301 62         19365 return $r;
302             }
303              
304             =head2 format_message
305              
306             $string = format_message( 'Object %d loaded. Status: %s', $id, $message );
307              
308             Returns string formatted using printf-style syntax.
309              
310             If there are more than one argument and the first argument contains C<%...>
311             conversions, arguments are converted to a string message using C<sprintf()>. In this case, undefined
312             values are printed as C<< <undef> >> and references are converted to strings using L</format_reference>.
313              
314             =cut
315             sub format_message {
316 823   50 823 1 2632 my $format = shift // return;
317              
318 823         1114 my $got = scalar @_;
319              
320 823 50 33     3799 return $format unless $got && $format =~ /\%/;
321              
322 823         1071 my $expected = 0;
323 823         5125 while ( $format =~ /(%%|%[^%])/g ) {
324 1818 50       4220 next if $1 eq '%%'; # don't count escape sequence
325 1818         4871 ++$expected;
326             }
327              
328 823 50       1607 Carp::cluck "Wrong number of arguments: $expected vs $got" unless $got == $expected;
329              
330             return sprintf $format, map {
331 823 100       1382 !defined $_
  1818 100       9933  
332             ? '<undef>'
333             : ref $_
334             ? format_reference( $_ )
335             : $_
336             } @_;
337             }
338              
339             #-- private attributes ---------------------------------------------------------
340              
341             #-- private methods ------------------------------------------------------------
342              
343             1;
344              
345             __END__
346              
347             =head1 SEE ALSO
348              
349             The basic operation of the Kafka package modules:
350              
351             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
352              
353             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
354              
355             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
356              
357             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
358              
359             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
360             properties.
361              
362             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
363             protocol on 32 bit systems.
364              
365             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
366             Apache Kafka's Protocol.
367              
368             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
369              
370             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
371              
372             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
373             by several package modules.
374              
375             A wealth of detail about the Apache Kafka and the Kafka Protocol:
376              
377             Main page at L<http://kafka.apache.org/>
378              
379             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
380              
381             =head1 SOURCE CODE
382              
383             Kafka package is hosted on GitHub:
384             L<https://github.com/TrackingSoft/Kafka>
385              
386             =head1 AUTHOR
387              
388             Sergey Gladkov
389              
390             Please use GitHub project link above to report problems or contact authors.
391              
392             =head1 CONTRIBUTORS
393              
394             Alexander Solovey
395              
396             Jeremy Jordan
397              
398             Sergiy Zuban
399              
400             Vlad Marchenko
401              
402             =head1 COPYRIGHT AND LICENSE
403              
404             Copyright (C) 2012-2017 by TrackingSoft LLC.
405              
406             This package is free software; you can redistribute it and/or modify it under
407             the same terms as Perl itself. See I<perlartistic> at
408             L<http://dev.perl.org/licenses/artistic.html>.
409              
410             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
411             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
412             PARTICULAR PURPOSE.
413              
414             =cut