File Coverage

lib/Kafka/Protocol.pm
Criterion Covered Total %
statement 433 592 73.1
branch 69 126 54.7
condition 26 63 41.2
subroutine 40 48 83.3
pod 16 16 100.0
total 584 845 69.1


line stmt bran cond sub pod time code
1             package Kafka::Protocol;
2              
3             =head1 NAME
4              
5             Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.07 .
10              
11             =cut
12              
13 12     12   14372 use 5.010;
  12         35  
14 12     12   60 use strict;
  12         18  
  12         251  
15 12     12   48 use warnings;
  12         24  
  12         501  
16              
17             our $VERSION = '1.07';
18              
19 12         720 use Exporter qw(
20             import
21 12     12   57 );
  12         18  
22             our @EXPORT_OK = qw(
23             decode_fetch_response
24             decode_metadata_response
25             decode_offset_response
26             decode_produce_response
27             decode_offsetcommit_response
28             decode_offsetfetch_response
29             encode_fetch_request
30             encode_metadata_request
31             encode_offset_request
32             encode_produce_request
33             encode_api_versions_request
34             decode_api_versions_response
35             encode_find_coordinator_request
36             decode_find_coordinator_response
37             encode_offsetcommit_request
38             encode_offsetfetch_request
39             _decode_MessageSet_template
40             _decode_MessageSet_array
41             _encode_Message
42             _encode_MessageSet_array
43             _encode_string
44             _pack64
45             _unpack64
46             _verify_string
47             $DEFAULT_APIVERSION
48             $IMPLEMENTED_APIVERSIONS
49             $BAD_OFFSET
50             $COMPRESSION_CODEC_MASK
51             $CONSUMERS_REPLICAID
52             $NULL_BYTES_LENGTH
53             $_int64_template
54             );
55              
56 12     12   3380 use Compress::Snappy ();
  12         5051  
  12         250  
57 12     12   2857 use Compress::LZ4Frame ();
  12         5456  
  12         257  
58 12     12   60 use Const::Fast;
  12         22  
  12         79  
59 12     12   3382 use Gzip::Faster qw( gzip gunzip );
  12         10659  
  12         687  
60 12         665 use Params::Util qw(
61             _ARRAY
62             _HASH
63             _SCALAR
64             _STRING
65 12     12   72 );
  12         26  
66 12         404 use Scalar::Util qw(
67             dualvar
68 12     12   58 );
  12         16  
69 12     12   2774 use String::CRC32;
  12         4242  
  12         494  
70 12     12   95 use Try::Tiny;
  12         18  
  12         607  
71              
72 12         1711 use Kafka qw(
73             $BITS64
74             $BLOCK_UNTIL_IS_COMMITTED
75             $COMPRESSION_GZIP
76             $COMPRESSION_NONE
77             $COMPRESSION_SNAPPY
78             $COMPRESSION_LZ4
79             $DEFAULT_MAX_WAIT_TIME
80             %ERROR
81             $ERROR_COMPRESSION
82             $ERROR_MISMATCH_ARGUMENT
83             $ERROR_NOT_BINARY_STRING
84             $ERROR_REQUEST_OR_RESPONSE
85             $NOT_SEND_ANY_RESPONSE
86             $WAIT_WRITTEN_TO_LOCAL_LOG
87 12     12   60 );
  12         24  
88 12     12   554 use Kafka::Exceptions;
  12         19  
  12         567  
89 12         65053 use Kafka::Internals qw(
90             $APIKEY_FETCH
91             $APIKEY_METADATA
92             $APIKEY_OFFSET
93             $APIKEY_PRODUCE
94             $APIKEY_APIVERSIONS
95             $APIKEY_FINDCOORDINATOR
96             $APIKEY_OFFSETCOMMIT
97             $APIKEY_OFFSETFETCH
98             $PRODUCER_ANY_OFFSET
99             format_message
100             debug_level
101 12     12   63 );
  12         22  
102              
103             =head1 SYNOPSIS
104              
105             use 5.010;
106             use strict;
107             use warnings;
108              
109             use Data::Compare;
110             use Kafka qw(
111             $COMPRESSION_NONE
112             $ERROR_NO_ERROR
113             $REQUEST_TIMEOUT
114             $WAIT_WRITTEN_TO_LOCAL_LOG
115             );
116             use Kafka::Internals qw(
117             $PRODUCER_ANY_OFFSET
118             );
119             use Kafka::Protocol qw(
120             decode_produce_response
121             encode_produce_request
122             );
123              
124             # a encoded produce request hex stream
125             my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
126              
127             # a decoded produce request
128             my $decoded = {
129             CorrelationId => 4,
130             ClientId => q{},
131             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
132             Timeout => $REQUEST_TIMEOUT * 100, # ms
133             topics => [
134             {
135             TopicName => 'mytopic',
136             partitions => [
137             {
138             Partition => 0,
139             MessageSet => [
140             {
141             Offset => $PRODUCER_ANY_OFFSET,
142             MagicByte => 0,
143             Attributes => $COMPRESSION_NONE,
144             Key => q{},
145             Value => 'Hello!',
146             },
147             ],
148             },
149             ],
150             },
151             ],
152             };
153              
154             my $encoded_request = encode_produce_request( $decoded );
155             say 'encoded correctly' if $encoded_request eq $encoded;
156              
157             # a encoded produce response hex stream
158             $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
159              
160             # a decoded produce response
161             $decoded = {
162             CorrelationId => 4,
163             topics => [
164             {
165             TopicName => 'mytopic',
166             partitions => [
167             {
168             Partition => 0,
169             ErrorCode => $ERROR_NO_ERROR,
170             Offset => 0,
171             },
172             ],
173             },
174             ],
175             };
176              
177             my $decoded_response = decode_produce_response( \$encoded );
178             say 'decoded correctly' if Compare( $decoded_response, $decoded );
179              
180             # more examples, see t/*_decode_encode.t
181              
182             =head1 DESCRIPTION
183              
184             This module is not a user module.
185              
186             In order to achieve better performance,
187             functions of this module do not perform arguments validation.
188              
189             The main features of the C module are:
190              
191             =over 3
192              
193             =item *
194              
195             Supports parsing the Apache Kafka protocol.
196              
197             =item *
198              
199             Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
200             Within this package we currently support
201             access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
202              
203             =item *
204              
205             Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
206              
207             =back
208              
209             =cut
210              
211             # A Guide To The Kafka Protocol 0.8:
212             # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
213             #
214             # -- Protocol Primitive Types
215             # int8, int16, int32, int64
216             # Signed integers
217             # stored in big endian order.
218             # bytes, string
219             # consist of a signed integer
220             # giving a length N
221             # followed by N bytes of content.
222             # A length of -1 indicates null.
223             # string uses an int16 for its size,
224             # and bytes uses an int32.
225             # Arrays
226             # These will always be encoded as an int32 size containing the length N
227             # followed by N repetitions of the structure
228             # which can itself be made up of other primitive types.
229             #
230             # -- N.B.
231             # - The response will always match the paired request
232             # - One structure common to both the produce and fetch requests is the message set format.
233             # - MessageSets are not preceded by an int32 like other array elements in the protocol.
234             # - A message set is also the unit of compression in Kafka,
235             # and we allow messages to recursively contain compressed message sets.
236             #
237             # -- Protocol Fields
238             # ApiKey => int16 That identifies the API being invoked
239             # ApiVersion => int16 This is a numeric version number for this api.
240             # Currently the supported version for all APIs is 0.
241             # Attributes => int8 Metadata attributes about the message.
242             # The lowest 2 bits contain the compression codec used for the message.
243             # ClientId => string This is a user supplied identifier for the client application.
244             # CorrelationId => int32 This is a user-supplied integer.
245             # It will be passed back in the response by the server, unmodified.
246             # It is useful for matching request and response between the client and server.
247             # Crc => int32 The CRC32 of the remainder of the message bytes.
248             # ErrorCode => int16 The error from this partition, if any.
249             # Errors are given on a per-partition basis
250             # because a given partition may be unavailable or maintained on a different host,
251             # while others may have successfully accepted the produce request.
252             # FetchOffset => int64 The offset to begin this fetch from.
253             # HighwaterMarkOffset => int64 The offset at the end of the log for this partition.
254             # This can be used by the client to determine how many messages behind the end of the log they are.
255             # - 0.8 documents: Replication design
256             # The high watermark is the offset of the last committed message.
257             # Each log is periodically synced to disks.
258             # Data before the flushed offset is guaranteed to be persisted on disks.
259             # As we will see, the flush offset can be before or after high watermark.
260             # - 0.7 documents: Wire protocol
261             # If the last segment file for the partition is not empty and was modified earlier than TIME,
262             # it will return both the first offset for that segment and the high water mark.
263             # The high water mark is not the offset of the last message,
264             # but rather the offset that the next message sent to the partition will be written to.
265             # Host => string The brokers hostname
266             # Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
267             # Key => bytes An optional message key
268             # The key can be null.
269             # Leader => int32 The node id for the kafka broker currently acting as leader for this partition.
270             # If no leader exists because we are in the middle of a leader election this id will be -1.
271             # MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format.
272             # MaxBytes => int32 The maximum bytes to include in the message set for this partition.
273             # MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets
274             # MaxWaitTime => int32 The maximum amount of time (ms)
275             # to block waiting
276             # if insufficient data is available at the time the request is issued.
277             # MessageSetSize => int32 The size in bytes of the message set for this partition
278             # MessageSize => int32 The size of the subsequent request or response message in bytes
279             # MinBytes => int32 The minimum number of bytes of messages that must be available to give a response.
280             # If the client sets this to 0 the server will always respond immediately.
281             # If this is set to 1,
282             # the server will respond as soon
283             # as at least one partition
284             # has at least 1 byte of data
285             # or the specified timeout occurs.
286             # By setting higher values
287             # in combination with the timeout
288             # for reading only large chunks of data
289             # NodeId => int32 The id of the broker.
290             # This must be set to a unique integer for each broker.
291             # Offset => int64 The offset used in kafka as the log sequence number.
292             # When the producer is sending messages it doesn't actually know the offset
293             # and can fill in any value here it likes.
294             # Partition => int32 The id of the partition the fetch is for
295             # or the partition that data is being published to
296             # or the partition this response entry corresponds to.
297             # Port => int32 The brokers port
298             # ReplicaId => int32 Indicates the node id of the replica initiating this request.
299             # Normal client consumers should always specify this as -1 as they have no node id.
300             # Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition.
301             # RequiredAcks => int16 Indicates how many acknowledgements the servers should receive
302             # before responding to the request.
303             # If it is 0 the server does not send any response.
304             # If it is 1, the server will wait the data is written to the local log before sending a response.
305             # If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
306             # Size => int32 The size of the subsequent request or response message in bytes
307             # Time => int64 Used to ask for all messages before a certain time (ms).
308             # There are two special values.
309             # Specify -1 to receive the latest offset (this will only ever return one offset).
310             # Specify -2 to receive the earliest available offsets.
311             # Timeout => int32 This provides a maximum time (ms) the server can await the receipt
312             # of the number of acknowledgements in RequiredAcks.
313             # The timeout is not an exact limit on the request time for a few reasons:
314             # (1) it does not include network latency,
315             # (2) the timer begins at the beginning of the processing of this request
316             # so if many requests are queued due to server overload
317             # that wait time will not be included,
318             # (3) we will not terminate a local write
319             # so if the local write time exceeds this timeout it will not be respected.
320             # To get a hard timeout of this type the client should use the socket timeout.
321             # TopicName => string The name of the topic.
322             # Value => bytes The actual message contents
323             # Kafka supports recursive messages in which case this may itself contain a message set.
324             # The message can be null.
325              
326             our $_int64_template; # Used to unpack a 64 bit value
327             if ( $BITS64 ) {
328             $_int64_template = q{q>};
329             # unpack a big-endian signed quad (64-bit) value on 64 bit systems.
330 35284     35284   230573 *_unpack64 = sub { $_[0] };
331             # pack a big-endian signed quad (64-bit) value on 64 bit systems.
332 40328     40328   270964 *_pack64 = sub { pack( q{q>}, $_[0] ) };
333             } else {
334             eval q{ require Kafka::Int64; } ## no critic
335             or die "Cannot load Kafka::Int64 : $@";
336              
337             $_int64_template = q{a[8]};
338             # unpack a big-endian signed quad (64-bit) value on 32 bit systems.
339             *_unpack64 = \&Kafka::Int64::unpackq;
340             # pack a big-endian signed quad (64-bit) value on 32 bit systems.
341             *_pack64 = \&Kafka::Int64::packq;
342             }
343              
344             =head2 EXPORT
345              
346             The following constants are available for export
347              
348             =cut
349              
350             =head3 C<$DEFAULT_APIVERSION>
351              
352             The default API version that will be used as fallback, if it's not possible to
353             detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be
354             queried to get which API version they implements. On Kafka servers 0.8.x and
355             0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
356             value is '0'
357              
358             =cut
359              
360             const our $DEFAULT_APIVERSION => 0;
361              
362             # HashRef, keys are APIKEYs, values are the api version that this protocol
363             # implements. If not populated, the connection will use the $DEFAULT_APIVERSION
364             our $IMPLEMENTED_APIVERSIONS = {};
365              
366             # Attributes
367              
368             # According to Apache Kafka documentation:
369             # Attributes - Metadata attributes about the message.
370             # The lowest 2 bits contain the compression codec used for the message.
371             const our $COMPRESSION_CODEC_MASK => 0b11;
372              
373             =head3 C<$CONSUMERS_REPLICAID>
374              
375             According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
376              
377             =cut
378             const our $CONSUMERS_REPLICAID => -1;
379              
380             =head3 C<$NULL_BYTES_LENGTH>
381              
382             According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
383              
384             =cut
385             const our $NULL_BYTES_LENGTH => -1;
386              
387             =head3 C<$BAD_OFFSET>
388              
389             According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
390             and can fill in any value here it likes.'
391              
392             =cut
393             const our $BAD_OFFSET => -1;
394              
395             my ( $_Request_header_template, $_Request_header_length ) = (
396             q{l>s>s>l>s>}, # Size
397             # 2 ApiKey
398             # 2 ApiVersion
399             # 4 CorrelationId
400             # 2 ClientId length
401             10 # 'Size' is not included in the calculation of length
402             );
403             my ( $_ProduceRequest_header_template, $_ProduceRequest_header_length ) = (
404             q{s>l>l>}, # 2 RequiredAcks
405             # 4 Timeout
406             # 4 topics array size
407             10
408             );
409             my ( $_MessageSet_template, $_MessageSet_length ) = (
410             # qq{${_int64_template}l>},
411             q{a[8]l>},
412             # 8 Offset
413             # 4 MessageSize
414             12
415             );
416             my ( $_FetchRequest_header_template, $_FetchRequest_header_length ) = (
417             q{l>l>l>l>},
418             # 4 ReplicaId
419             # 4 MaxWaitTime
420             # 4 MinBytes
421             # 4 topics array size
422             16
423             );
424             my ( $_FetchRequest_header_template_v3, $_FetchRequest_header_length_v3 ) = (
425             q{l>l>l>l>l>},
426             # 4 ReplicaId
427             # 4 MaxWaitTime
428             # 4 MinBytes
429             # 4 MaxBytes
430             # 4 topics array size
431             20
432             );
433             my ( $_FetchRequest_body_template, $_FetchRequest_body_length ) = (
434             # qq{l>${_int64_template}l>},
435             q{l>a[8]l>},
436             # 4 Partition
437             # 8 FetchOffset
438             # 4 MaxBytes
439             16
440             );
441             my ( $_OffsetRequest_header_template, $_OffsetRequest_header_length ) = (
442             q{l>l>}, # 4 ReplicaId
443             # 4 topics array size
444             8
445             );
446             my ( $_OffsetRequest_body_template, $_OffsetRequest_body_length ) = (
447             # qq{l>${_int64_template}l>},
448             q{l>a[8]l>},
449             # 4 Partition
450             # 8 Time
451             # 4 MaxNumberOfOffsets
452             16
453             );
454             my ( $_OffsetRequest_body_template_v1, $_OffsetRequest_body_length_v1 ) = (
455             # qq{l>${_int64_template}},
456             q{l>a[8]},
457             # 4 Partition
458             # 8 Time
459             12
460             );
461             my ( $_FetchResponse_header_template, $_FetchResponse_header_length ) = (
462             q{x[l]l>l>}, # Size (skip)
463             # 4 CorrelationId
464             # 4 topics array size
465             8
466             );
467             my ( $_FetchResponse_header_template_v1, $_FetchResponse_header_length_v1 ) = (
468             q{x[l]l>l>l>}, # Size (skip)
469             # 4 CorrelationId
470             # 4 throttle_time_ms
471             # 4 topics array size
472             12
473             );
474             my ( $_Message_template, $_Message_length ) = (
475             qq(${_int64_template}l>l>ccl>),
476             # 8 Offset
477             # MessageSize
478             # Crc
479             # MagicByte
480             # Attributes
481             # Key length
482             8 # Only Offset length
483             );
484             my ( $_Message_template_with_timestamp, $_Message_length_with_timestamp ) = (
485             qq(${_int64_template}l>l>cc${_int64_template}l>),
486             # 8 Offset
487             # MessageSize
488             # Crc
489             # MagicByte
490             # Attributes
491             # Timestamp
492             # Key length
493             8 # Only Offset length
494             );
495              
496             my ( $_FetchResponse_topic_body_template, $_FetchResponse_topic_body_length )= (
497             qq(s>/al>l>s>${_int64_template}),
498             # TopicName
499             # partitions array size
500             # 4 Partition
501             # 2 ErrorCode
502             # 8 HighwaterMarkOffset
503             14 # without TopicName and partitions array size
504             );
505             my $_Key_or_Value_template = q{X[l]l>/a}; # Key or Value
506              
507             #-- public functions -----------------------------------------------------------
508              
509             =head2 FUNCTIONS
510              
511             The following functions are available for C module.
512              
513             =cut
514              
515             =head3 C
516              
517             Encodes the argument and returns a reference to the encoded binary string
518             representing a Request buffer.
519              
520             This function takes the following arguments:
521              
522             =over 3
523              
524             =item C<$ApiVersions_Request>
525              
526             C<$ApiVersions_Request> is a reference to the hash representing the structure
527             of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
528             string), and ApiVersion (must be 0)
529              
530             =back
531              
532             =cut
533              
534             $IMPLEMENTED_APIVERSIONS->{$APIKEY_APIVERSIONS} = 0;
535             sub encode_api_versions_request {
536 0     0 1 0 my ( $ApiVersions_Request ) = @_;
537              
538 0         0 my @data;
539 0         0 my $request = {
540             # template => '...',
541             # len => ...,
542             data => \@data,
543             };
544              
545 0         0 _encode_request_header( $request, $APIKEY_APIVERSIONS, $ApiVersions_Request );
546             # Size
547             # ApiKey
548             # ApiVersion
549             # CorrelationId
550             # ClientId
551              
552 0         0 return pack( $request->{template}, $request->{len}, @data );
553             }
554              
555             my $_decode_api_version_response_template = q{x[l]l>s>l>X[l]l>/(s>s>s>)};
556             # x[l] # Size (skip)
557             # l> # CorrelationId
558             # s> # ErrorCode
559             # l> # ApiVersions array size
560             # X[l]
561             # l>/( # ApiVersions array
562             # s> # ApiKey
563             # s> # MinVersion
564             # s> # MaxVersion
565             # )
566              
567             =head3 C
568              
569             Decodes the argument and returns a reference to the hash representing
570             the structure of the APIVERSIONS Response.
571              
572             This function takes the following arguments:
573              
574             =over 3
575              
576             =item C<$bin_stream_ref>
577              
578             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
579             must be a non-empty binary string.
580              
581             =back
582              
583             =cut
584              
585             sub decode_api_versions_response {
586 0     0 1 0 my ( $bin_stream_ref ) = @_;
587              
588 0         0 my @data = unpack( $_decode_api_version_response_template, $$bin_stream_ref );
589              
590 0         0 my $i = 0;
591 0         0 my $ApiVersions_Response = {};
592              
593 0         0 $ApiVersions_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
594 0         0 $ApiVersions_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
595              
596 0         0 my $ApiVersions_array = $ApiVersions_Response->{ApiVersions} = [];
597 0         0 my $ApiVersions_array_size = $data[ $i++ ]; # ApiVersions array size
598 0         0 while ( $ApiVersions_array_size-- ) {
599 0         0 push( @$ApiVersions_array, {
600             ApiKey => $data[ $i++ ], # ApiKey
601             MinVersion => $data[ $i++ ], # MinVersion
602             MaxVersion => $data[ $i++ ], # MaxVersion
603             }
604             );
605             }
606              
607 0         0 return $ApiVersions_Response;
608             }
609              
610             =head3 C
611              
612             Encodes the argument and returns a reference to the encoded binary string
613             representing a Request buffer.
614              
615             This function takes the following arguments:
616              
617             =over 3
618              
619             =item C<$FindCoordinator_Request>
620              
621             C<$FindCoordinator_Request> is a reference to the hash representing the structure
622             of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
623             string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
624              
625             =back
626              
627             =cut
628              
629             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FINDCOORDINATOR} = 1;
630             sub encode_find_coordinator_request {
631 0     0 1 0 my ( $FindCoordinator_Request ) = @_;
632              
633 0         0 my @data;
634 0         0 my $request = {
635             # template => '...',
636             # len => ...,
637             data => \@data,
638             };
639              
640 0         0 my $api_version =
641             _encode_request_header( $request, $APIKEY_FINDCOORDINATOR, $FindCoordinator_Request );
642             # Size
643             # ApiKey
644             # ApiVersion
645             # CorrelationId
646             # ClientId
647              
648 0         0 my $coordinator_key = $FindCoordinator_Request->{CoordinatorKey};
649 0         0 my $coordinator_type = $FindCoordinator_Request->{CoordinatorType};
650              
651 0         0 $request->{template} .= q{s>}; # string length
652 0         0 $request->{len} += 2;
653 0         0 _encode_string( $request, $coordinator_key ); # CoordinatorKey (GroupId for version 0)
654 0 0       0 if ($api_version >= 1) {
655             # CoordinatorType (0 for groups)
656 0         0 $request->{template} .= q{c>};
657 0         0 $request->{len} += 1;
658 0         0 push( @{ $request->{data} }, $coordinator_type );
  0         0  
659             }
660              
661 0         0 return pack( $request->{template}, $request->{len}, @data );
662             }
663              
664             my $_decode_find_protocol_response_template = q{x[l]l>s>l>s>/al>};
665             # x[l] # Size (skip)
666             # l> # CorrelationId
667             # s> # ErrorCode
668             # l> # NodeId
669             # s>/a # Host
670             # l> # Port
671              
672             my $_decode_find_protocol_response_template_v1 = q{x[l]l>l>s>l>s>/al>};
673             # x[l] # Size (skip)
674             # l> # CorrelationId
675             # l> # throttle_time_ms
676             # s> # ErrorCode
677             # s>/a # ErrorMessage
678             # l> # NodeId
679             # s>/a # Host
680             # l> # Port
681              
682             =head3 C
683              
684             Decodes the argument and returns a reference to the hash representing
685             the structure of the FINDCOORDINATOR Response.
686              
687             This function takes the following arguments:
688              
689             =over 3
690              
691             =item C<$bin_stream_ref>
692              
693             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
694             must be a non-empty binary string.
695              
696             =back
697              
698             =cut
699              
700             sub decode_find_coordinator_response {
701 0     0 1 0 my ( $bin_stream_ref, $api_version ) = @_;
702              
703 0   0     0 $api_version //= $DEFAULT_APIVERSION;
704 0         0 my $is_v1 = $api_version == 1;
705              
706 0 0       0 my @data = unpack( $is_v1 ? $_decode_find_protocol_response_template_v1
707             : $_decode_find_protocol_response_template, $$bin_stream_ref );
708              
709 0         0 my $i = 0;
710 0         0 my $FindCoordinator_Response = {};
711              
712 0         0 $FindCoordinator_Response->{CorrelationId} = $data[ $i++ ];
713 0 0       0 if ($is_v1) {
714 0         0 $FindCoordinator_Response->{ThrottleTimeMs} = $data[ $i++ ]; # only v1
715             }
716 0         0 $FindCoordinator_Response->{ErrorCode} = $data[ $i++ ];
717 0 0       0 if ($is_v1) {
718 0         0 $FindCoordinator_Response->{ErrorMessage} = $data[ $i++ ]; # only v1
719             }
720 0         0 $FindCoordinator_Response->{NodeId} = $data[ $i++ ];
721 0         0 $FindCoordinator_Response->{Host} = $data[ $i++ ];
722 0         0 $FindCoordinator_Response->{Port} = $data[ $i++ ];
723              
724 0         0 return $FindCoordinator_Response;
725             }
726              
727             # PRODUCE Request --------------------------------------------------------------
728              
729             =head3 C
730              
731             Encodes the argument and returns a reference to the encoded binary string
732             representing a Request buffer.
733              
734             This function takes the following arguments:
735              
736             =over 3
737              
738             =item C<$Produce_Request>
739              
740             C<$Produce_Request> is a reference to the hash representing
741             the structure of the PRODUCE Request (examples see C).
742              
743             =item C<$compression_codec>
744              
745             Optional.
746              
747             C<$compression_codec> sets the required type of C<$messages> compression,
748             if the compression is desirable.
749              
750             Supported codecs:
751             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
752             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
753             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
754             L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
755              
756             NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
757              
758             =back
759              
760             =cut
761              
762             $IMPLEMENTED_APIVERSIONS->{$APIKEY_PRODUCE} = 2;
763             sub encode_produce_request {
764 5111     5111 1 11499 my ( $Produce_Request, $compression_codec ) = @_;
765              
766 5111         5169 my @data;
767 5111         8073 my $request = {
768             # template => '...',
769             # len => ...,
770             data => \@data,
771             };
772              
773 5111         9050 _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
774             # Size
775             # ApiKey
776             # ApiVersion
777             # CorrelationId
778             # ClientId
779              
780 5111         5150 my $topics_array = $Produce_Request->{topics};
781             push( @data,
782             $Produce_Request->{RequiredAcks}, # RequiredAcks
783             $Produce_Request->{Timeout}, # Timeout
784 5111         6644 scalar( @$topics_array ), # topics array size
785             );
786 5111         5151 $request->{template} .= $_ProduceRequest_header_template;
787 5111         4903 $request->{len} += $_ProduceRequest_header_length;
788              
789 5111         6176 foreach my $topic ( @$topics_array ) {
790 5111         4905 $request->{template} .= q{s>}; # string length
791 5111         4575 $request->{len} += 2;
792 5111         7099 _encode_string( $request, $topic->{TopicName} ); # TopicName
793              
794 5111         4818 my $partitions_array = $topic->{partitions};
795 5111         4942 push( @data, scalar( @$partitions_array ) );
796 5111         5075 $request->{template} .= q{l>}; # partitions array size
797 5111         4457 $request->{len} += 4;
798 5111         5662 foreach my $partition ( @$partitions_array ) {
799 5111         6028 push( @data, $partition->{Partition} );
800 5111         4902 $request->{template} .= q{l>}; # Partition
801 5111         4260 $request->{len} += 4;
802              
803 5111         6719 _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );
804             }
805             }
806              
807 5111         31532 return pack( $request->{template}, $request->{len}, @data );
808             }
809              
810             # PRODUCE Response -------------------------------------------------------------
811              
812             my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
813             # x[l] # Size (skip)
814             # l> # CorrelationId
815              
816             # l> # topics array size
817             # X[l]
818             # l>/( # topics array
819             # s>/a # TopicName
820              
821             # l> # partitions array size
822             # X[l]
823             # l>/( # partitions array
824             # l> # Partition
825             # s> # ErrorCode
826             # $_int64_template # Offset
827             # )
828             # )
829              
830             my $_decode_produce_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))l>};
831             # x[l] # Size (skip)
832             # l> # CorrelationId
833              
834             # l> # topics array size
835             # X[l]
836             # l>/( # topics array
837             # s>/a # TopicName
838              
839             # l> # partitions array size
840             # X[l]
841             # l>/( # partitions array
842             # l> # Partition
843             # s> # ErrorCode
844             # $_int64_template # Offset
845             # )
846             # )
847             # l> # Throttle_Time_Ms
848              
849             my $_decode_produce_response_template_v2 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))l>};
850             # x[l] # Size (skip)
851             # l> # CorrelationId
852              
853             # l> # topics array size
854             # X[l]
855             # l>/( # topics array
856             # s>/a # TopicName
857              
858             # l> # partitions array size
859             # X[l]
860             # l>/( # partitions array
861             # l> # Partition
862             # s> # ErrorCode
863             # $_int64_template # Offset
864             # $_int64_template # Log_Append_Time
865             # )
866             # )
867             # l> # Throttle_Time_Ms
868              
869             =head3 C
870              
871             Decodes the argument and returns a reference to the hash representing
872             the structure of the PRODUCE Response (examples see C).
873              
874             This function takes the following arguments:
875              
876             =over 3
877              
878             =item C<$bin_stream_ref>
879              
880             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
881             must be a non-empty binary string.
882              
883             =back
884              
885             =cut
886             sub decode_produce_response {
887 98     98 1 619 my ( $bin_stream_ref, $api_version ) = @_;
888              
889 98   33     828 $api_version //= $DEFAULT_APIVERSION;
890 98         198 my $is_v1 = $api_version == 1;
891 98         172 my $is_v2 = $api_version == 2;
892              
893 98 50       810 my @data = unpack( $is_v1 ? $_decode_produce_response_template_v1
    50          
894             : $is_v2 ? $_decode_produce_response_template_v2
895             : $_decode_produce_response_template, $$bin_stream_ref );
896              
897 98         231 my $i = 0;
898 98         205 my $Produce_Response = {};
899              
900 98         291 $Produce_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
901              
902 98         296 my $topics_array = $Produce_Response->{topics} = [];
903 98         190 my $topics_array_size = $data[ $i++ ]; # topics array size
904 98         287 while ( $topics_array_size-- ) {
905 98         271 my $topic = {
906             TopicName => $data[ $i++ ],
907             };
908              
909 98         276 my $partitions_array = $topic->{partitions} = [];
910 98         174 my $partitions_array_size = $data[ $i++ ]; # partitions array size
911 98         267 while ( $partitions_array_size-- ) {
912 98 50       406 my $partition = {
913             Partition => $data[ $i++ ], # Partition
914             ErrorCode => $data[ $i++ ], # ErrorCode
915             Offset => _unpack64( $data[ $i++ ] ), # Offset
916             ( $is_v2 ? (Log_Append_Time => _unpack64( $data[ $i++ ] )) : () ), # Log_Append_Time
917             };
918              
919 98         354 push( @$partitions_array, $partition );
920             }
921              
922 98         259 push( @$topics_array, $topic );
923             }
924             defined $data[ $i ]
925 98 50       303 and $Produce_Response->{Throttle_Time_Ms} = $data[ $i++ ];
926              
927 98         289 return $Produce_Response;
928             }
929              
930             # FETCH Request ----------------------------------------------------------------
931              
932             =head3 C
933              
934             Encodes the argument and returns a reference to the encoded binary string
935             representing a Request buffer.
936              
937             This function takes the following arguments:
938              
939             =over 3
940              
941             =item C<$Fetch_Request>
942              
943             C<$Fetch_Request> is a reference to the hash representing
944             the structure of the FETCH Request (examples see C).
945              
946             =back
947              
948             =cut
949              
950             # version 0, 1, 2 have the same request protocol.
951             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FETCH} = 3;
952             sub encode_fetch_request {
953 5015     5015 1 9101 my ( $Fetch_Request ) = @_;
954              
955 5015         5393 my @data;
956 5015         9177 my $request = {
957             # template => '...',
958             # len => ...,
959             data => \@data,
960             };
961 5015         9137 my $api_version = _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
962             # Size
963             # ApiKey
964             # ApiVersion
965             # CorrelationId
966             # ClientId
967 5015         5887 my $is_v3 = $api_version == 3;
968              
969 5015         5592 push( @data, $CONSUMERS_REPLICAID ); # ReplicaId
970 5015         5260 my $topics_array = $Fetch_Request->{topics};
971             push( @data,
972             $Fetch_Request->{MaxWaitTime}, # MaxWaitTime
973             $Fetch_Request->{MinBytes}, # MinBytes
974 5015 50       8801 ( $is_v3 ? $Fetch_Request->{MaxBytes} : () ), # MaxBytes (version 3 only)
975             scalar( @$topics_array ), # topics array size
976             );
977 5015 50       6121 if ($is_v3) {
978 0         0 $request->{template} .= $_FetchRequest_header_template_v3;
979 0         0 $request->{len} += $_FetchRequest_header_length_v3;
980             } else {
981 5015         5567 $request->{template} .= $_FetchRequest_header_template;
982 5015         4920 $request->{len} += $_FetchRequest_header_length;
983             }
984              
985 5015         6747 foreach my $topic ( @$topics_array ) {
986 5015         6505 $request->{template} .= q{s>}; # string length
987 5015         5461 $request->{len} += 2;
988 5015         7981 _encode_string( $request, $topic->{TopicName} ); # TopicName
989              
990 5015         5167 my $partitions_array = $topic->{partitions};
991 5015         5171 push( @data, scalar( @$partitions_array ) );
992 5015         5052 $request->{template} .= q{l>}; # partitions array size
993 5015         4723 $request->{len} += 4;
994 5015         5509 foreach my $partition ( @$partitions_array ) {
995             push( @data,
996             $partition->{Partition}, # Partition
997             _pack64( $partition->{FetchOffset} ), # FetchOffset
998             $partition->{MaxBytes}, # MaxBytes
999 5015         9345 );
1000 5015         6514 $request->{template} .= $_FetchRequest_body_template;
1001 5015         7670 $request->{len} += $_FetchRequest_body_length;
1002             }
1003             }
1004              
1005 5015         26228 return pack( $request->{template}, $request->{len}, @data );
1006             }
1007              
1008             # FETCH Response ---------------------------------------------------------------
1009              
1010             =head3 C
1011              
1012             Decodes the argument and returns a reference to the hash representing
1013             the structure of the FETCH Response (examples see C).
1014              
1015             This function takes the following arguments:
1016              
1017             =over 3
1018              
1019             =item C<$bin_stream_ref>
1020              
1021             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1022             must be a non-empty binary string.
1023              
1024             =back
1025              
1026             =cut
1027             sub decode_fetch_response {
1028 5021     5021 1 11740 my ( $bin_stream_ref, $api_version ) = @_;
1029              
1030 5021   33     14764 $api_version //= $DEFAULT_APIVERSION;
1031 5021         5411 my $is_v1 = $api_version == 1;
1032 5021         5142 my $is_v2 = $api_version == 2;
1033 5021         4893 my $is_v3 = $api_version == 3;
1034              
1035              
1036             # According to Apache Kafka documentation:
1037             # As an optimization the server is allowed to return a partial message at the end of the message set.
1038             # Clients should handle this case.
1039             # NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
1040              
1041 5021         5236 my @data;
1042 5021         9874 my $response = {
1043             # template => '...',
1044             # stream_offset => ...,
1045             data => \@data,
1046             bin_stream => $bin_stream_ref,
1047             };
1048              
1049 5021         9521 _decode_fetch_response_template( $response, $api_version );
1050 5021         33262 @data = unpack( $response->{template}, $$bin_stream_ref );
1051              
1052 5021         7245 my $i = 0;
1053 5021         6710 my $Fetch_Response = {};
1054              
1055 5021         7648 $Fetch_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1056 5021 50 33     19657 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1057 0         0 $Fetch_Response->{ThrottleTimeMs} = $data[ $i++ ]; # ( ThrottleTimeMs ) only v1 and above
1058             }
1059              
1060 5021         8092 my $topics_array = $Fetch_Response->{topics} = [];
1061 5021         6654 my $topics_array_size = $data[ $i++ ]; # topics array size
1062 5021         7661 while ( $topics_array_size-- ) {
1063 5021         8131 my $topic = {
1064             TopicName => $ data[ $i++ ], # TopicName
1065             };
1066              
1067 5021         6576 my $partitions_array = $topic->{partitions} = [];
1068 5021         5458 my $partitions_array_size = $data[ $i++ ]; # partitions array size
1069 5021         5727 my ( $MessageSetSize, $MessageSet_array );
1070 5021         7608 while ( $partitions_array_size-- ) {
1071 5021         8836 my $partition = {
1072             Partition => $data[ $i++ ], # Partition
1073             ErrorCode => $data[ $i++ ], # ErrorCode
1074             HighwaterMarkOffset => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset
1075             };
1076              
1077 5021         5649 $MessageSetSize = $data[ $i++ ]; # MessageSetSize
1078 5021         6492 $MessageSet_array = $partition->{MessageSet} = [];
1079              
1080 5021         9376 _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );
1081              
1082 5021         9524 push( @$partitions_array, $partition );
1083             }
1084              
1085 5021         8918 push( @$topics_array, $topic );
1086             }
1087              
1088 5021         17723 return $Fetch_Response;
1089             }
1090              
1091             # OFFSET Request ---------------------------------------------------------------
1092              
1093             =head3 C
1094              
1095             Encodes the argument and returns a reference to the encoded binary string
1096             representing a Request buffer.
1097              
1098             This function takes the following arguments:
1099              
1100             =over 3
1101              
1102             =item C<$Offset_Request>
1103              
1104             C<$Offset_Request> is a reference to the hash representing
1105             the structure of the OFFSET Request (examples see C).
1106              
1107             =back
1108              
1109             =cut
1110             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSET} = 1;
1111             sub encode_offset_request {
1112 27     27 1 1039 my ( $Offset_Request ) = @_;
1113              
1114 27         35 my @data;
1115 27         67 my $request = {
1116             # template => '...',
1117             # len => ...,
1118             data => \@data,
1119             };
1120 27         71 my $api_version = _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
1121             # Size
1122             # ApiKey
1123             # ApiVersion
1124             # CorrelationId
1125             # ClientId
1126              
1127 27         50 my $is_v1 = $api_version == 1;
1128              
1129 27         45 my $topics_array = $Offset_Request->{topics};
1130 27         52 push( @data,
1131             $CONSUMERS_REPLICAID, # ReplicaId
1132             scalar( @$topics_array ), # topics array size
1133             );
1134 27         42 $request->{template} .= $_OffsetRequest_header_template;
1135 27         53 $request->{len} += $_OffsetRequest_header_length;
1136              
1137 27 50       60 my $body_template = $is_v1 ? $_OffsetRequest_body_template_v1 : $_OffsetRequest_body_template;
1138 27 50       51 my $body_length = $is_v1 ? $_OffsetRequest_body_length_v1 : $_OffsetRequest_body_length;
1139 27         52 foreach my $topic ( @$topics_array ) {
1140 27         46 $request->{template} .= q{s>}; # string length
1141 27         40 $request->{len} += 2;
1142 27         64 _encode_string( $request, $topic->{TopicName} ); # TopicName
1143              
1144 27         40 my $partitions_array = $topic->{partitions};
1145 27         54 push( @data, scalar( @$partitions_array ) );
1146 27         50 $request->{template} .= q{l>}; # partitions array size
1147 27         51 $request->{len} += 4; # [l] partitions array size
1148 27         42 foreach my $partition ( @$partitions_array ) {
1149             push( @data,
1150             $partition->{Partition}, # Partition
1151             _pack64( $partition->{Time} ), # Time
1152             $is_v1 ? () : $partition->{MaxNumberOfOffsets}, # MaxNumberOfOffsets
1153 27 50       72 );
1154 27         48 $request->{template} .= $body_template;
1155 27         55 $request->{len} += $body_length;
1156             }
1157             }
1158              
1159             # say STDERR $request->{template};
1160             # say STDERR HexDump($_) foreach @data; use Data::HexDump;
1161 27         312 return pack( $request->{template}, $request->{len}, @data );
1162             }
1163              
1164             # OFFSET Response --------------------------------------------------------------
1165              
1166             my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
1167             # x[l] # Size (skip)
1168             # l> # CorrelationId
1169              
1170             # l> # topics array size
1171             # X[l]
1172             # l>/( # topics array
1173             # s>/a # TopicName
1174              
1175             # l> # PartitionOffsets array size
1176             # X[l]
1177             # l>/( # PartitionOffsets array
1178             # l> # Partition
1179             # s> # ErrorCode
1180              
1181             # l> # Offset array size
1182             # X[l]
1183             # l>/( # Offset array
1184             # $_int64_template # Offset
1185             # )
1186             # )
1187             # )
1188              
1189             my $_decode_offset_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))};
1190             # x[l] # Size (skip)
1191             # l> # CorrelationId
1192              
1193             # l> # topics array size
1194             # X[l]
1195             # l>/( # topics array
1196             # s>/a # TopicName
1197              
1198             # l> # PartitionOffsets array size
1199             # X[l]
1200             # l>/( # PartitionOffsets array
1201             # l> # Partition
1202             # s> # ErrorCode
1203             # $_int64_template # Timestamp
1204             # $_int64_template # Offset
1205             # )
1206             # )
1207              
1208             =head3 C
1209              
1210             Decodes the argument and returns a reference to the hash representing
1211             the structure of the OFFSET Response (examples see C).
1212              
1213             This function takes the following arguments:
1214              
1215             =over 3
1216              
1217             =item C<$bin_stream_ref>
1218              
1219             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1220             must be a non-empty binary string.
1221              
1222             =back
1223              
1224             =cut
1225             sub decode_offset_response {
1226 23     23 1 365 my ( $bin_stream_ref, $api_version ) = @_;
1227              
1228 23   66     64 $api_version //= $DEFAULT_APIVERSION;
1229 23         40 my $is_v1 = $api_version == 1;
1230 23 50       54 my $template = $is_v1 ? $_decode_offset_response_template_v1 : $_decode_offset_response_template;
1231              
1232 23         142 my @data = unpack( $template, $$bin_stream_ref );
1233              
1234 23         42 my $i = 0;
1235 23         43 my $Offset_Response = { };
1236              
1237 23         53 $Offset_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1238              
1239 23         41 my $topics_array = $Offset_Response->{topics} = [];
1240 23         32 my $topics_array_size = $data[ $i++ ]; # topics array size
1241 23         54 while ( $topics_array_size-- ) {
1242 23         47 my $topic = {
1243             TopicName => $data[ $i++ ], # TopicName
1244             };
1245              
1246 23         43 my $PartitionOffsets_array = $topic->{PartitionOffsets} = [];
1247 23         31 my $PartitionOffsets_array_size = $data[ $i++ ]; # PartitionOffsets array size
1248 23         35 my ( $PartitionOffset, $Offset_array, $Offset_array_size );
1249 23         46 while ( $PartitionOffsets_array_size-- ) {
1250 23         69 $PartitionOffset = {
1251             Partition => $data[ $i++ ], # Partition
1252             ErrorCode => $data[ $i++ ], # ErrorCode
1253             };
1254 23 50       45 if ($is_v1) {
1255 0         0 $PartitionOffset->{Timestamp} = _unpack64($data[ $i++ ]); # Timestamp (v1 only)
1256 0         0 $PartitionOffset->{Offset} = _unpack64($data[ $i++ ]); # Offset (v1 only)
1257              
1258             } else {
1259 23         43 $Offset_array = $PartitionOffset->{Offset} = [];
1260 23         35 $Offset_array_size = $data[ $i++ ]; # Offset array size
1261 23         44 while ( $Offset_array_size-- ) {
1262 41         78 push( @$Offset_array, _unpack64( $data[ $i++ ] ) ); # Offset
1263             }
1264             }
1265              
1266 23         45 push( @$PartitionOffsets_array, $PartitionOffset );
1267             }
1268              
1269 23         51 push( @$topics_array, $topic );
1270             }
1271              
1272 23         58 return $Offset_Response;
1273             }
1274              
1275             # METADATA Request -------------------------------------------------------------
1276              
1277             =head3 C
1278              
1279             Encodes the argument and returns a reference to the encoded binary string
1280             representing a Request buffer.
1281              
1282             This function takes the following arguments:
1283              
1284             =over 3
1285              
1286             =item C<$Metadata_Request>
1287              
1288             C<$Metadata_Request> is a reference to the hash representing
1289             the structure of the METADATA Request (examples see C).
1290              
1291             =back
1292              
1293             =cut
1294             $IMPLEMENTED_APIVERSIONS->{$APIKEY_METADATA} = 0;
1295             sub encode_metadata_request {
1296 127     127 1 2134 my ( $Metadata_Request ) = @_;
1297              
1298 127         274 my @data;
1299 127         591 my $request = {
1300             # template => '...',
1301             # len => ...,
1302             data => \@data,
1303             };
1304              
1305 127         710 _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
1306             # Size
1307             # ApiKey
1308             # ApiVersion
1309             # CorrelationId
1310             # ClientId
1311              
1312 127         241 my $topics_array = $Metadata_Request->{topics};
1313 127         272 push( @data, scalar( @$topics_array ) ); # topics array size
1314 127         563 $request->{template} .= q{l>};
1315 127         217 $request->{len} += 4;
1316              
1317 127         358 foreach my $topic ( @$topics_array ) {
1318 127         266 $request->{template} .= q{s>}; # string length
1319 127         218 $request->{len} += 2;
1320 127         271 _encode_string( $request, $topic ); # TopicName
1321             }
1322              
1323 127         1640 return pack( $request->{template}, $request->{len}, @data );
1324             }
1325              
1326             # METADATA Response ------------------------------------------------------------
1327              
1328             my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
1329             # x[l] # Size (skip)
1330             # l> # CorrelationId
1331              
1332             # l> # Broker array size
1333             # X[l]
1334             # l>/( # Broker array
1335             # l> # NodeId
1336             # s>/a # Host
1337             # l> # Port
1338             # )
1339              
1340             # l> # TopicMetadata array size
1341             # X[l]
1342             # l>/( # TopicMetadata array
1343             # s> # ErrorCode
1344             # s>/a # TopicName
1345              
1346             # l> # PartitionMetadata array size
1347             # X[l]
1348             # l>/( # PartitionMetadata array
1349             # s> # ErrorCode
1350             # l> # Partition
1351             # l> # Leader
1352              
1353             # l> # Replicas array size
1354             # X[l]
1355             # l>/( # Replicas array
1356             # l> # ReplicaId
1357             # )
1358              
1359             # l> # Isr array size
1360             # X[l]
1361             # l>/( # Isr array
1362             # l> # ReplicaId
1363             # )
1364             # )
1365             # )
1366              
1367             =head3 C
1368              
1369             Decodes the argument and returns a reference to the hash representing
1370             the structure of the METADATA Response (examples see C).
1371              
1372             This function takes the following arguments:
1373              
1374             =over 3
1375              
1376             =item C<$bin_stream_ref>
1377              
1378             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1379             must be a non-empty binary string.
1380              
1381             =back
1382              
1383             =cut
1384             sub decode_metadata_response {
1385 120     120 1 650 my ( $bin_stream_ref ) = @_;
1386              
1387 120         1290 my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );
1388              
1389 120         318 my $i = 0;
1390 120         251 my $Metadata_Response = {};
1391              
1392 120         364 $Metadata_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1393              
1394 120         349 my $Broker_array = $Metadata_Response->{Broker} = [];
1395 120         227 my $Broker_array_size = $data[ $i++ ]; # Broker array size
1396 120         314 while ( $Broker_array_size-- ) {
1397 360         1351 push( @$Broker_array, {
1398             NodeId => $data[ $i++ ], # NodeId
1399             Host => $data[ $i++ ], # Host
1400             Port => $data[ $i++ ], # Port
1401             }
1402             );
1403             }
1404              
1405 120         281 my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} = [];
1406 120         260 my $TopicMetadata_array_size = $data[ $i++ ]; # TopicMetadata array size
1407 120         306 while ( $TopicMetadata_array_size-- ) {
1408 120         492 my $TopicMetadata = {
1409             ErrorCode => $data[ $i++ ], # ErrorCode
1410             TopicName => $data[ $i++ ], # TopicName
1411             };
1412              
1413 120         261 my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} = [];
1414 120         235 my $PartitionMetadata_array_size = $data[ $i++ ]; # PartitionMetadata array size
1415 120         378 while ( $PartitionMetadata_array_size-- ) {
1416 120         449 my $PartitionMetadata = {
1417             ErrorCode => $data[ $i++ ], # ErrorCode
1418             Partition => $data[ $i++ ], # Partition
1419             Leader => $data[ $i++ ], # Leader
1420             };
1421              
1422 120         263 my $Replicas_array = $PartitionMetadata->{Replicas} = [];
1423 120         224 my $Replicas_array_size = $data[ $i++ ]; # Replicas array size
1424 120         276 while ( $Replicas_array_size-- ) {
1425 360         790 push( @$Replicas_array, $data[ $i++ ] ); # ReplicaId
1426             }
1427              
1428 120         264 my $Isr_array = $PartitionMetadata->{Isr} = [];
1429 120         208 my $Isr_array_size = $data[ $i++ ]; # Isr array size
1430 120         263 while ( $Isr_array_size-- ) {
1431 360         679 push( @$Isr_array, $data[ $i++ ] ); # ReplicaId
1432             }
1433              
1434 120         337 push( @$PartitionMetadata_array, $PartitionMetadata );
1435             }
1436              
1437 120         298 push( @$TopicMetadata_array, $TopicMetadata );
1438             }
1439              
1440 120         518 return $Metadata_Response;
1441             }
1442              
1443             # OffsetCommit Request -------------------------------------------------------------
1444              
1445             =head3 C
1446              
1447             Encodes the argument and returns a reference to the encoded binary string
1448             representing a Request buffer.
1449              
1450             This function takes the following arguments:
1451              
1452             =over 3
1453              
1454             =item C<$OffsetCommit_Request>
1455              
1456             C<$OffsetCommit_Request> is a reference to the hash representing
1457             the structure of the OffsetCommit Request (examples see C).
1458              
1459             =back
1460              
1461             =cut
1462             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETCOMMIT} = 1;
1463             sub encode_offsetcommit_request {
1464 0     0 1 0 my ( $OffsetCommit_Request ) = @_;
1465              
1466 0         0 my @data;
1467 0         0 my $request = {
1468             # template => '...',
1469             # len => ...,
1470             data => \@data,
1471             };
1472              
1473 0         0 my $api_version = _encode_request_header( $request, $APIKEY_OFFSETCOMMIT, $OffsetCommit_Request );
1474             # Size
1475             # ApiKey
1476             # ApiVersion
1477             # CorrelationId
1478             # ClientId
1479              
1480 0         0 my $is_v1 = $api_version == 1;
1481              
1482 0         0 $request->{template} .= q{s>};
1483 0         0 $request->{len} += 2;
1484 0         0 _encode_string( $request, $OffsetCommit_Request->{GroupId} ); # GroupId
1485              
1486 0 0       0 if ($is_v1) {
1487 0         0 $request->{template} .= q{l>}; # GroupGenerationId
1488 0         0 $request->{len} += 4;
1489             ### WARNING TODO: we don't support consumer groups properly, so we set
1490             ### generation id to -1 and member id null (see
1491             ### https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol )
1492 0         0 push( @data, -1 );
1493              
1494 0         0 $request->{template} .= q{s>}; # string length
1495 0         0 $request->{len} += 2;
1496 0         0 _encode_string( $request, '' ); # MemberId
1497             }
1498              
1499 0         0 my $topics_array = $OffsetCommit_Request->{topics};
1500 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1501 0         0 $request->{template} .= q{l>};
1502 0         0 $request->{len} += 4;
1503              
1504 0         0 foreach my $topic ( @$topics_array ) {
1505 0         0 $request->{template} .= q{s>};
1506 0         0 $request->{len} += 2;
1507 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1508              
1509 0         0 my $partitions_array = $topic->{partitions};
1510 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1511 0         0 $request->{template} .= q{l>};
1512 0         0 $request->{len} += 4;
1513              
1514 0         0 foreach my $partition ( @$partitions_array ){
1515             push( @data,
1516             $partition->{Partition}, # Partition
1517             $partition->{Offset}, # Offset
1518 0         0 );
1519 0         0 $request->{template} .= qq{l>${_int64_template}};
1520 0         0 $request->{len} += 12;
1521              
1522 0 0       0 if ($is_v1) { # timestamp
1523             ### WARNING TODO: currently we hardcode "now"
1524 0         0 push( @data, time() * 1000);
1525 0         0 $request->{template} .= qq{${_int64_template}};
1526 0         0 $request->{len} += 8;
1527             }
1528              
1529 0         0 $request->{template} .= q{s>};
1530 0         0 $request->{len} += 2;
1531 0         0 _encode_string( $request, $partition->{Metadata} ), # Metadata
1532             }
1533             }
1534 0         0 return pack( $request->{template}, $request->{len}, @data );
1535             }
1536              
1537             # OFFSETCOMMIT Response --------------------------------------------------------------
1538              
1539             my $_decode_offsetcommit_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))};
1540             # x[l] # Size (skip)
1541             # l> # CorrelationId
1542             # l> # topics array size
1543             # X[l]
1544             # l>/( # topics array
1545             # s>/a # TopicName
1546             # l> # partitions array size
1547             # X[l]
1548             # l>/( # partitions array
1549             # l> # Partition
1550             # s> # ErrorCode
1551             # )
1552             # )
1553              
1554             =head3 C
1555              
1556             Decodes the argument and returns a reference to the hash representing
1557             the structure of the OFFSETCOMMIT Response (examples see C).
1558              
1559             This function takes the following arguments:
1560              
1561             =over 3
1562              
1563             =item C<$bin_stream_ref>
1564              
1565             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1566             must be a non-empty binary string.
1567              
1568             =back
1569              
1570             =cut
1571             sub decode_offsetcommit_response {
1572 0     0 1 0 my ( $bin_stream_ref ) = @_;
1573              
1574 0         0 my @data = unpack( $_decode_offsetcommit_response_template, $$bin_stream_ref );
1575              
1576 0         0 my $i = 0;
1577 0         0 my $OffsetCommit_Response = {};
1578              
1579 0         0 $OffsetCommit_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1580              
1581 0         0 my $topics_array = $OffsetCommit_Response->{topics} = [];
1582 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1583 0         0 while ( $topics_array_size-- ) {
1584 0         0 my $topic = {
1585             TopicName => $data[ $i++ ], # TopicName
1586             };
1587              
1588 0         0 my $Partition_array = $topic->{partitions} = [];
1589 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1590 0         0 while ( $Partition_array_size-- ) {
1591 0         0 my $Partition = {
1592             Partition => $data[ $i++ ], # Partition
1593             ErrorCode => $data[ $i++ ], # ErrorCode
1594             };
1595 0         0 push( @$Partition_array, $Partition);
1596             }
1597 0         0 push( @$topics_array, $topic);
1598             }
1599 0         0 return $OffsetCommit_Response;
1600             }
1601              
1602             # OffsetFetch Request -------------------------------------------------------------
1603              
1604             =head3 C
1605              
1606             Encodes the argument and returns a reference to the encoded binary string
1607             representing a Request buffer.
1608              
1609             This function takes the following arguments:
1610              
1611             =over 3
1612              
1613             =item C<$OffsetFetch_Request>
1614              
1615             C<$OffsetFetch_Request> is a reference to the hash representing
1616             the structure of the OffsetFetch Request (examples see C).
1617              
1618             =back
1619              
1620             =cut
1621             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETFETCH} = 1;
1622             sub encode_offsetfetch_request {
1623 0     0 1 0 my ( $OffsetFetch_Request ) = @_;
1624              
1625 0         0 my @data;
1626 0         0 my $request = {
1627             # template => '...',
1628             # len => ...,
1629             data => \@data,
1630             };
1631              
1632 0         0 _encode_request_header( $request, $APIKEY_OFFSETFETCH, $OffsetFetch_Request );
1633             # Size
1634             # ApiKey
1635             # ApiVersion
1636             # CorrelationId
1637             # ClientId
1638              
1639 0         0 $request->{template} .= q{s>};
1640 0         0 $request->{len} += 2;
1641 0         0 _encode_string( $request, $OffsetFetch_Request->{GroupId} ); # GroupId
1642              
1643 0         0 my $topics_array = $OffsetFetch_Request->{topics};
1644 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1645 0         0 $request->{template} .= q{l>};
1646 0         0 $request->{len} += 4;
1647              
1648 0         0 foreach my $topic ( @$topics_array ) {
1649 0         0 $request->{template} .= q{s>};
1650 0         0 $request->{len} += 2;
1651 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1652              
1653 0         0 my $partitions_array = $topic->{partitions};
1654 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1655 0         0 $request->{template} .= q{l>};
1656 0         0 $request->{len} += 4;
1657              
1658 0         0 foreach my $partition ( @$partitions_array ){
1659             push( @data,
1660             $partition->{Partition}, # Partition
1661 0         0 );
1662 0         0 $request->{template} .= qq{l>};
1663 0         0 $request->{len} += 4;
1664             }
1665             }
1666 0         0 return pack( $request->{template}, $request->{len}, @data );
1667             }
1668              
1669             # OFFSETFETCH Response --------------------------------------------------------------
1670              
1671             my $_decode_offsetfetch_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}s>/as>))};
1672             # x[l] # Size (skip)
1673             # l> # CorrelationId
1674             # l> # topics array size
1675             # X[l]
1676             # l>/( # topics array
1677             # s>/a # TopicName
1678             # l> # partitions array size
1679             # X[l]
1680             # l>/( # partitions array
1681             # l> # Partition
1682             # $_int64_template # Offset
1683             # s>/a # Metadata
1684             # s> # ErrorCode
1685             # )
1686             # )
1687              
1688             =head3 C
1689              
1690             Decodes the argument and returns a reference to the hash representing
1691             the structure of the OFFSETFETCH Response (examples see C).
1692              
1693             This function takes the following arguments:
1694              
1695             =over 3
1696              
1697             =item C<$bin_stream_ref>
1698              
1699             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1700             must be a non-empty binary string.
1701              
1702             =back
1703              
1704             =cut
1705             sub decode_offsetfetch_response {
1706 0     0 1 0 my ( $bin_stream_ref ) = @_;
1707              
1708 0         0 my @data = unpack( $_decode_offsetfetch_response_template, $$bin_stream_ref );
1709              
1710 0         0 my $i = 0;
1711 0         0 my $OffsetFetch_Response = {};
1712              
1713 0         0 $OffsetFetch_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1714              
1715 0         0 my $topics_array = $OffsetFetch_Response->{topics} = [];
1716 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1717 0         0 while ( $topics_array_size-- ) {
1718 0         0 my $topic = {
1719             TopicName => $data[ $i++ ], # TopicName
1720             };
1721              
1722 0         0 my $Partition_array = $topic->{partitions} = [];
1723 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1724 0         0 while ( $Partition_array_size-- ) {
1725 0         0 my $Partition = {
1726             Partition => $data[ $i++ ], # Partition
1727             Offset => $data[ $i++ ], # Partition
1728             Metadata => $data[ $i++ ], # Partition
1729             ErrorCode => $data[ $i++ ], # ErrorCode
1730             };
1731 0         0 push( @$Partition_array, $Partition);
1732             }
1733 0         0 push( @$topics_array, $topic);
1734             }
1735 0         0 return $OffsetFetch_Response;
1736             }
1737              
1738             #-- private functions ----------------------------------------------------------
1739              
1740             # Generates a template to encrypt the request header
1741             sub _encode_request_header {
1742 10280     10280   15249 my ( $request, $api_key, $request_ref ) = @_;
1743              
1744             # we need to find out which API version to use for the request (and the
1745             # response). $request_ref->{ApiVersion} can be specified by the end user,
1746             # or providede by by Kafka::Connection ( see
1747             # Kafka::Connection::_get_api_versions() ). If not provided, we
1748             # default to $DEFAULT_APIVERSION
1749 10280   66     24834 my $api_version = $request_ref->{ApiVersion} // $DEFAULT_APIVERSION;
1750 10280         18582 @{ $request->{data} } = (
1751             # Size
1752             $api_key, # ApiKey
1753             $api_version, # ApiVersion
1754             $request_ref->{CorrelationId}, # CorrelationId
1755 10280         11635 );
1756 10280         13312 $request->{template} = $_Request_header_template;
1757 10280         10969 $request->{len} = $_Request_header_length;
1758 10280         17625 _encode_string( $request, $request_ref->{ClientId} ); # ClientId
1759              
1760 10280         11207 return $api_version;
1761             }
1762              
1763             # Generates a template to decrypt the fetch response body
1764             sub _decode_fetch_response_template {
1765 5021     5021   5900 my ( $response, $api_version ) = @_;
1766              
1767 5021         5312 my $is_v1 = $api_version == 1;
1768 5021         4784 my $is_v2 = $api_version == 2;
1769 5021         4646 my $is_v3 = $api_version == 3;
1770              
1771 5021 50 33     16709 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1772 0         0 $response->{template} = $_FetchResponse_header_template_v1;
1773 0         0 $response->{stream_offset} = $_FetchResponse_header_length_v1; # bytes before topics array size
1774             # [l] Size
1775             # [l] CorrelationId
1776             # [l] throttle_time_ms
1777             } else {
1778 5021         6734 $response->{template} = $_FetchResponse_header_template;
1779 5021         5212 $response->{stream_offset} = $_FetchResponse_header_length; # bytes before topics array size
1780             # [l] Size
1781             # [l] CorrelationId
1782             }
1783             my $topics_array_size = unpack(
1784             q{x[}.$response->{stream_offset} . q{]}
1785             .q{l>}, # topics array size
1786 5021         8588 ${ $response->{bin_stream} }
  5021         10252  
1787             );
1788 5021         6725 $response->{stream_offset} += 4; # bytes before TopicName length
1789             # [l] topics array size
1790              
1791 5021         5536 my ( $TopicName_length, $partitions_array_size );
1792 5021         7938 while ( $topics_array_size-- ) {
1793             $TopicName_length = unpack(
1794             q{x[}.$response->{stream_offset}
1795             .q{]s>}, # TopicName length
1796 5021         7042 ${ $response->{bin_stream} }
  5021         7579  
1797             );
1798             $response->{stream_offset} += # bytes before partitions array size
1799 5021         5783 2 # [s] TopicName length
1800             + $TopicName_length # TopicName
1801             ;
1802             $partitions_array_size = unpack(
1803             q{x[}.$response->{stream_offset}
1804             .q{]l>}, # partitions array size
1805 5021         5830 ${ $response->{bin_stream} }
  5021         7204  
1806             );
1807 5021         5400 $response->{stream_offset} += 4; # bytes before Partition
1808             # [l] partitions array size
1809              
1810 5021         6921 $response->{template} .= $_FetchResponse_topic_body_template;
1811 5021         4879 $response->{stream_offset} += $_FetchResponse_topic_body_length; # (without TopicName and partitions array size)
1812             # bytes before MessageSetSize
1813             # TopicName
1814             # [l] # partitions array size
1815             # [l] Partition
1816             # [s] ErrorCode
1817             # [q] HighwaterMarkOffset
1818              
1819 5021         7249 _decode_MessageSet_template( $response );
1820             }
1821              
1822 5021         5492 return;
1823             }
1824              
1825             # kafka uses a snappy-java compressor, with it's own headers and frame formats
1826             my $XERIAL_SNAPPY_MAGIC_HEADER = "\x82SNAPPY\x00";
1827             my $XERIAL_SNAPPY_BLOCK_SIZE = 0x8000; # 32Kb
1828             my $XERIAL_SNAPPY_FILE_VERSION = 1;
1829              
1830             # https://github.com/xerial/snappy-java
1831             # https://github.com/kubo/snzip/blob/master/snappy-java-format.c
1832             sub _snappy_xerial_decompress {
1833 4     4   7 my ( $data ) = @_;
1834 4         4 my $uncompressed;
1835 4         8 my $raw_format_suspected = 1;
1836 4 50       8 if ( length($data) > 16 ) {
1837 4         14 my ( $header, $x_version, $x_compatversion) = unpack( q{a[8]L>L>}, $data );
1838 4 100       11 if ( $header eq $XERIAL_SNAPPY_MAGIC_HEADER ) {
1839 3         5 $raw_format_suspected = 0;
1840 3 50       8 _error( $ERROR_COMPRESSION, 'snappy: bad compatversion in snappy xerial header' ) unless $x_compatversion == $XERIAL_SNAPPY_FILE_VERSION;
1841 3 50       6 _error( $ERROR_COMPRESSION, 'snappy: bad version in snappy xerial header' ) unless $x_version == $XERIAL_SNAPPY_FILE_VERSION;
1842 3         11 $data = substr( $data, 16 );
1843 3         8 while ( length($data) > 0 ) {
1844 4   100     13 $uncompressed //= '';
1845 4 50       8 _error( $ERROR_COMPRESSION, 'snappy: bad frame length header' ) if length($data) < 4;
1846 4         8 my $compressed_frame_length = unpack( q{L>}, $data );
1847 4         8 $data = substr( $data, 4 );
1848 4 50       8 _error( $ERROR_COMPRESSION, 'snappy: partial frame ' ) if length($data) < $compressed_frame_length;
1849 4         11 my $compressed_frame = substr( $data, 0, $compressed_frame_length, '');
1850 4         58 my $uncompressed_frame = Compress::Snappy::decompress( $compressed_frame );
1851 4 50       8 _error( $ERROR_COMPRESSION, 'snappy: can\'t uncompress frame ' ) if not defined $uncompressed_frame;
1852 4         61 $uncompressed .= $uncompressed_frame;
1853             }
1854             }
1855             }
1856 4 100       9 if ( $raw_format_suspected ) {
1857 1         6 $uncompressed = Compress::Snappy::decompress( $data );
1858             }
1859 4         10 return $uncompressed;
1860             }
1861              
1862             sub _snappy_xerial_compress {
1863 3     3   426 my ( $data ) = @_;
1864 3         4 my $compressed_data;
1865 3         7 while ( length($data) ) {
1866 4         40 my $block = substr $data, 0, $XERIAL_SNAPPY_BLOCK_SIZE, '';
1867 4         152 my $compressed_block = Compress::Snappy::compress( $block );
1868 4 50       14 _error( $ERROR_COMPRESSION, 'snappy: can\'t compress frame!' ) if not defined $compressed_block;
1869 4   66     22 $compressed_data //= pack( q{a[8]L>L>}, $XERIAL_SNAPPY_MAGIC_HEADER, $XERIAL_SNAPPY_FILE_VERSION, $XERIAL_SNAPPY_FILE_VERSION );
1870 4         17 $compressed_data .= pack( q{L>}, length($compressed_block) ) . $compressed_block;
1871             }
1872 3         12 return $compressed_data;
1873             }
1874              
1875             sub _decompress_data {
1876 7     7   12 my ( $data, $compression_codec ) = @_;
1877 7 50 50     26 say STDERR format_message( '[%s] decompress_data request, compression_codec: %s, data: %s',
1878             scalar( localtime ),
1879             $compression_codec,
1880             unpack( 'H*', $data ),
1881             ) if Kafka::Protocol->debug_level // 0 >= 3;
1882              
1883 7         7 my $decompressed;
1884 7 100       29 if ( $compression_codec == $COMPRESSION_GZIP ) {
    100          
    50          
1885             try {
1886 3     3   275 $decompressed = gunzip( $data );
1887             } catch {
1888 1     1   14 _error( $ERROR_COMPRESSION, format_message( 'gunzip failed: %s', $_ ) );
1889 3         19 };
1890             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1891 3   33     9 $decompressed = _snappy_xerial_decompress( $data )
1892             // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
1893             } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
1894             # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
1895             # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
1896 1 50       8 if ( Compress::LZ4Frame::looks_like_lz4frame( $data ) ) {
1897 0   0     0 $decompressed = Compress::LZ4Frame::decompress( $data ) // _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data' );
1898             } else {
1899 1         3 _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data. Frame is not valid' );
1900             }
1901              
1902             } else {
1903 0         0 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1904             }
1905 5 50       43 _error( $ERROR_COMPRESSION, 'Decompression produced empty result' ) unless defined $decompressed;
1906 5         9 return $decompressed;
1907             }
1908              
1909             sub _compress_data {
1910 4     4   8 my ( $data, $compression_codec ) = @_;
1911 4         6 my $compressed;
1912              
1913             # Compression
1914 4 100       11 if ( $compression_codec == $COMPRESSION_GZIP ) {
    50          
    0          
1915 2   33     222 $compressed = gzip( $data )
1916             // _error( $ERROR_COMPRESSION, 'Unable to compress gzip data' );
1917             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1918 2   33     6 $compressed = _snappy_xerial_compress( $data )
1919             // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
1920             } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
1921             # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
1922             # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
1923 0   0     0 $compressed = Compress::LZ4Frame::compress_checksum( $data )
1924             // _error( $ERROR_COMPRESSION, 'Unable to compress LZ4 data' );
1925             } else {
1926 0         0 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1927             }
1928 4         22 return $compressed;
1929             }
1930              
1931              
1932             # Decrypts MessageSet
1933             sub _decode_MessageSet_array {
1934 10056     10056   132864 my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
1935             # $_override_offset should not be set manually, it's used when recursing,
1936             # to decode internal compressed message, which have offsets starting at 0,
1937             # 1, etc, and instead we need to set the external wrapping message offset.
1938              
1939 10056         10930 my $data = $response->{data};
1940 10056         9462 my $data_array_size = scalar @{ $data };
  10056         11795  
1941              
1942             # NOTE: not all messages can be returned
1943 10056         12091 my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
1944 10056   100     29343 while ( $MessageSetSize && $$i_ref < $data_array_size ) {
1945              
1946 25084         33317 my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
1947 25084 100       33487 if (defined $_override_offset) {
1948 12         12 $message_offset = $_override_offset;
1949             }
1950             $Message = {
1951 25084         33108 Offset => $message_offset, # Offset
1952             };
1953              
1954 25084         27161 $MessageSize = $data->[ $$i_ref++ ]; # MessageSize
1955             # NOTE: The CRC is the CRC32 of the remainder of the message bytes.
1956             # This is used to check the integrity of the message on the broker and consumer:
1957             # MagicByte + Attributes + Key length + Key + Value length + Value
1958 25084         24728 $Crc = $data->[ $$i_ref++ ]; # Crc
1959             # WARNING: The current version of the module does not support the following:
1960             # A message set is also the unit of compression in Kafka,
1961             # and we allow messages to recursively contain compressed message sets to allow batch compression.
1962 25084         26538 $Message->{MagicByte} = $data->[ $$i_ref++ ]; # MagicByte
1963 25084         26651 my $is_v1_msg_format = $Message->{MagicByte} == 1;
1964 25084         26587 $Message->{Attributes} = $data->[ $$i_ref++ ]; # Attributes
1965 25084 50       32612 if ($is_v1_msg_format) {
1966 0         0 $Message->{Timestamp} = $data->[ $$i_ref++ ]; # Timestamp
1967             }
1968              
1969 25084         24689 $Key_length = $data->[ $$i_ref++ ]; # Key length
1970 25084 50       36645 $Message->{Key} = $Key_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Key
1971 25084         24674 $Value_length = $data->[ $$i_ref++ ]; # Value length
1972 25084 50       38114 $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Value
1973              
1974 25084 100       34611 if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
1975 7         12 my $decompressed = _decompress_data($Message->{Value}, $compression_codec);
1976 5         5 my @data;
1977 5         7 my $Value_length = length $decompressed;
1978 5         13 my $resp = {
1979             data => \@data,
1980             bin_stream => \$decompressed,
1981             stream_offset => 0,
1982             };
1983 5         11 _decode_MessageSet_sized_template( $Value_length, $resp );
1984 5         8 @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
  5         22  
1985 5         9 my $i = 0; # i_ref
1986 5         6 my $size = length( $decompressed );
1987 5         15 _decode_MessageSet_array(
1988             $resp,
1989             $size, # message set size
1990             \$i, # i_ref
1991             $MessageSet_array_ref,
1992             $message_offset
1993             );
1994             } else {
1995 25077         29401 push( @$MessageSet_array_ref, $Message );
1996             }
1997              
1998 25082         51847 $MessageSetSize -= 12
1999             # [q] Offset
2000             # [l] MessageSize
2001             + $MessageSize # Message
2002             ;
2003             }
2004              
2005 10054         13502 return;
2006             }
2007              
2008             # Generates a template to encode single message within MessageSet
2009             sub _encode_Message {
2010 25164     25164   53721 my ($request, $Key, $Value, $Attributes, $Offset, $v1_format, $Timestamp) = @_;
2011              
2012             # v1 format - supported since 0.10.0, introduces Timestamps
2013 25164 50       33359 my $MagicByte = $v1_format ? 1 : 0;
2014 25164   50     56939 $Timestamp //= -1;
2015 25164   33     29577 $Attributes //= $COMPRESSION_NONE; # According to Apache Kafka documentation:
2016             # The lowest 2 bits contain the compression codec used for the message.
2017             # The other bits should be set to 0.
2018              
2019              
2020 25164         22321 my $key_length = length( $Key );
2021 25164         21387 my $value_length = length( $Value );
2022              
2023 25164 50       109196 my $message_body = pack(
    50          
    50          
    50          
    50          
    50          
2024             q{cc} # MagicByte
2025             # Attributes
2026             .( $v1_format ? q{a[8]} : q{} ) # Timestamp - v1 only (when MagicByte > 0) (#$_int64_template)
2027             .q{l>} # Key length
2028             .( $key_length ? qq{a[$key_length]} : q{} ) # Key
2029             .q{l>} # Value length
2030             .( $value_length ? qq{a[$value_length]} : q{} ) # Value
2031             ,
2032             $MagicByte,
2033             $Attributes,
2034             $v1_format ? ( _pack64( $Timestamp ) ) : (),
2035             $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ),
2036             $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ),
2037             );
2038              
2039 25164         26594 my $MessageBodySize = length($message_body);
2040 25164         24472 my $MessageSize = $MessageBodySize + 4;
2041              
2042 25164         25255 my $data = $request->{data};
2043 25164         35498 $request->{template} .= $_MessageSet_template . qq{l>a[$MessageBodySize]}; # crc + message_body
2044 25164         24255 $request->{len} += $_MessageSet_length + $MessageSize;
2045              
2046 25164         32806 push @$data, (_pack64( $Offset ), $MessageSize, crc32( $message_body ), $message_body );
2047 25164         52472 return;
2048             }
2049              
2050             # Generates a template to encode MessageSet
2051             sub _encode_MessageSet_array {
2052 10127     10127   31482 my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;
2053              
2054             # not sure if it would be good to mix different formats in different messages in the same set
2055             # so detect if we should use v1 format
2056 10127         10134 my $use_v1_format;
2057 10127         11385 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2058 25160 50       38257 if ( $MessageSet->{Timestamp} ) {
2059 0         0 $use_v1_format = 1;
2060 0         0 last;
2061             }
2062             }
2063              
2064 10127 100       13591 if ( $compression_codec ) {
2065 4         11 my $subrequest = {
2066             data => [],
2067             template => '',
2068             len => 0
2069             };
2070 4         7 my ( $Key, $Value );
2071              
2072 4         6 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2073             _encode_Message( $subrequest,
2074             $Key = $MessageSet->{Key},
2075             $MessageSet->{Value},
2076             $COMPRESSION_NONE,
2077             $PRODUCER_ANY_OFFSET,
2078             $use_v1_format,
2079             $MessageSet->{Timestamp}
2080 10         26 );
2081             }
2082              
2083 4         6 $Value = pack($subrequest->{template}, @{$subrequest->{data}});
  4         14  
2084              
2085 4         14 $MessageSet_array_ref = [
2086             {
2087             Offset => $PRODUCER_ANY_OFFSET,
2088             Key => $Key,
2089             Value => _compress_data($Value, $compression_codec),
2090             }
2091             ];
2092              
2093             }
2094              
2095 10127         21040 my $subrequest = {
2096             data => [],
2097             template => '',
2098             len => 0
2099             };
2100              
2101 10127         12868 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2102             _encode_Message( $subrequest,
2103             $MessageSet->{Key},
2104             $MessageSet->{Value},
2105             $compression_codec // $COMPRESSION_NONE,
2106             $MessageSet->{Offset},
2107             $use_v1_format,
2108             $MessageSet->{Timestamp}
2109 25154   66     82395 );
2110             }
2111              
2112 10127         11508 my $data = $request->{data};
2113 10127         15443 $request->{template} .= q{l>} . $subrequest->{template};
2114 10127         11188 $request->{len} += 4 + $subrequest->{len};
2115 10127         10612 push @$data, ( $subrequest->{len}, @{ $subrequest->{data} } );
  10127         19670  
2116              
2117 10127         25775 return;
2118             }
2119              
2120             # Generates a template to decrypt MessageSet
2121             sub _decode_MessageSet_template {
2122 10051     10051   192830 my ( $response ) = @_;
2123              
2124             my $MessageSetSize = unpack(
2125             q{x[}.$response->{stream_offset}
2126             .q{]l>}, # MessageSetSize
2127 10051         12253 ${ $response->{bin_stream} }
  10051         14089  
2128             );
2129 10051         13575 $response->{template} .= q{l>}; # MessageSetSize
2130 10051         9910 $response->{stream_offset} += 4; # bytes before Offset
2131              
2132 10051         12872 return _decode_MessageSet_sized_template($MessageSetSize, $response);
2133             }
2134              
2135             sub _decode_MessageSet_sized_template {
2136 10056     10056   11868 my ( $MessageSetSize, $response ) = @_;
2137              
2138 10056         9676 my $bin_stream_length = length ${ $response->{bin_stream} };
  10056         11052  
2139              
2140 10056         11089 my ( $local_template, $MessageSize, $MagicByte, $Key_length, $Value_length );
2141             CREATE_TEMPLATE:
2142 10056         14119 while ( $MessageSetSize ) {
2143             # Not the full MessageSet
2144             # 22 is the minimal size of a v0 message format until (and including) the Key
2145             # Length. If the message version is v1, then it's 22 + 8. We'll check later
2146 25090 100       30908 last CREATE_TEMPLATE if $MessageSetSize < 22;
2147              
2148             # [q] Offset
2149             # [l] MessageSize
2150             # [l] Crc
2151             # [c] MagicByte
2152             # [c] Attributes
2153             # ( [q] Timestamp ) message format v1 only
2154             # [l] Key length
2155             # [l] Value length
2156              
2157 25088         23440 $local_template = q{};
2158             MESSAGE_SET:
2159             {
2160 25088         20491 $response->{stream_offset} += 8; # the size of the offset data value
  25088         22983  
2161             ($MessageSize, $MagicByte) = unpack(
2162             q{x[}.$response->{stream_offset}.q{]}
2163             .q{l>} # MessageSize
2164             .q{x[l]} # Crc
2165             .q{c}, # MagicByte
2166              
2167 25088         31723 ${ $response->{bin_stream} }
  25088         43388  
2168             );
2169              
2170 25088         28817 my $is_v1_msg_format = $MagicByte == 1;
2171 25088 50 33     35927 if ($is_v1_msg_format && $MessageSetSize < (22+8)) {
2172             # Not the full MessageSet
2173 0         0 $local_template = q{};
2174 0         0 last MESSAGE_SET;
2175             }
2176 25088 50       30965 $local_template .= ( $is_v1_msg_format ? $_Message_template_with_timestamp : $_Message_template );
2177              
2178 25088 50       30352 $response->{stream_offset} += ($is_v1_msg_format ? 18: 10);
2179              
2180             $Key_length = unpack(
2181             q{x[}.$response->{stream_offset}
2182             .q{]l>}, # Key length
2183 25088         26952 ${ $response->{bin_stream} }
  25088         31488  
2184             );
2185              
2186 25088         25096 $response->{stream_offset} += 4; # bytes before Key or Value length
2187             # [l] Key length
2188 25088 50       31688 $response->{stream_offset} += $Key_length # bytes before Key
2189             if $Key_length != $NULL_BYTES_LENGTH; # Key
2190 25088 100       32294 if ( $bin_stream_length >= $response->{stream_offset} + 4 ) { # + [l] Value length
2191 25086 50       31664 $local_template .= $_Key_or_Value_template
2192             if $Key_length != $NULL_BYTES_LENGTH;
2193             } else {
2194             # Not the full MessageSet
2195 2         2 $local_template = q{};
2196 2         4 last MESSAGE_SET;
2197             }
2198              
2199 25086         22308 $local_template .= q{l>}; # Value length
2200             $Value_length = unpack(
2201             q{x[}.$response->{stream_offset}
2202             .q{]l>}, # Value length
2203 25086         27290 ${ $response->{bin_stream} }
  25086         33004  
2204             );
2205             $response->{stream_offset} += # bytes before Value or next Message
2206 25086         24896 4 # [l] Value length
2207             ;
2208 25086 50       34306 $response->{stream_offset} += $Value_length # bytes before next Message
2209             if $Value_length != $NULL_BYTES_LENGTH; # Value
2210 25086 100       29325 if ( $bin_stream_length >= $response->{stream_offset} ) {
2211 25084 50       37067 $local_template .= $_Key_or_Value_template
2212             if $Value_length != $NULL_BYTES_LENGTH;
2213             } else {
2214             # Not the full MessageSet
2215 2         3 $local_template = q{};
2216 2         3 last MESSAGE_SET;
2217             }
2218             }
2219              
2220 25088 100       26910 if ( $local_template ) {
2221 25084         26926 $response->{template} .= $local_template;
2222 25084         37247 $MessageSetSize -= 12
2223             # [q] Offset
2224             # [l] MessageSize
2225             + $MessageSize # Message
2226             ;
2227             } else {
2228 4         7 last CREATE_TEMPLATE;
2229             }
2230             }
2231              
2232 10056         17319 return;
2233             }
2234              
2235             # Generates a template to encrypt string
2236             sub _encode_string {
2237 30789     30789   878146 my ( $request, $string ) = @_;
2238              
2239 30789 100       39890 if ( $string eq q{} ) {
2240 129         220 push( @{ $request->{data} }, 0 );
  129         388  
2241             } else {
2242 30660         30715 my $string_length = length $string;
2243 30660         28687 push( @{ $request->{data} }, $string_length, $string );
  30660         43076  
2244 30660         42123 $request->{template} .= q{a*}; # string;
2245 30660         32554 $request->{len} += $string_length;
2246             }
2247              
2248 30789         34024 return;
2249             }
2250              
2251             # Handler for errors
2252             sub _error {
2253 2     2   7 Kafka::Exception::Protocol->throw( throw_args( @_ ) );
2254              
2255 0           return;
2256             }
2257              
2258             1;
2259              
2260             __END__