File Coverage

lib/Kafka/Protocol.pm
Criterion Covered Total %
statement 385 542 71.0
branch 56 104 53.8
condition 21 52 40.3
subroutine 34 42 80.9
pod 16 16 100.0
total 512 756 67.7


line stmt bran cond sub pod time code
1             package Kafka::Protocol;
2              
3             =head1 NAME
4              
5             Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.06 .
10              
11             =cut
12              
13 12     12   18697 use 5.010;
  12         42  
14 12     12   69 use strict;
  12         27  
  12         281  
15 12     12   62 use warnings;
  12         25  
  12         578  
16              
17             our $VERSION = '1.06';
18              
19 12         1058 use Exporter qw(
20             import
21 12     12   72 );
  12         29  
22             our @EXPORT_OK = qw(
23             decode_fetch_response
24             decode_metadata_response
25             decode_offset_response
26             decode_produce_response
27             decode_offsetcommit_response
28             decode_offsetfetch_response
29             encode_fetch_request
30             encode_metadata_request
31             encode_offset_request
32             encode_produce_request
33             encode_api_versions_request
34             decode_api_versions_response
35             encode_find_coordinator_request
36             decode_find_coordinator_response
37             encode_offsetcommit_request
38             encode_offsetfetch_request
39             _decode_MessageSet_template
40             _decode_MessageSet_array
41             _encode_MessageSet_array
42             _encode_string
43             _pack64
44             _unpack64
45             _verify_string
46             $DEFAULT_APIVERSION
47             $IMPLEMENTED_APIVERSIONS
48             $BAD_OFFSET
49             $COMPRESSION_CODEC_MASK
50             $CONSUMERS_REPLICAID
51             $NULL_BYTES_LENGTH
52             $_int64_template
53             );
54              
55 12     12   5659 use Compress::Snappy;
  12         6083  
  12         633  
56 12     12   79 use Const::Fast;
  12         26  
  12         91  
57 12     12   5926 use Gzip::Faster qw( gzip gunzip );
  12         13891  
  12         833  
58 12         711 use Params::Util qw(
59             _ARRAY
60             _HASH
61             _SCALAR
62             _STRING
63 12     12   89 );
  12         23  
64 12         515 use Scalar::Util qw(
65             dualvar
66 12     12   78 );
  12         24  
67 12     12   5286 use String::CRC32;
  12         6156  
  12         576  
68 12     12   82 use Try::Tiny;
  12         23  
  12         703  
69              
70 12         2071 use Kafka qw(
71             $BITS64
72             $BLOCK_UNTIL_IS_COMMITTED
73             $COMPRESSION_GZIP
74             $COMPRESSION_NONE
75             $COMPRESSION_SNAPPY
76             $DEFAULT_MAX_WAIT_TIME
77             %ERROR
78             $ERROR_COMPRESSION
79             $ERROR_MISMATCH_ARGUMENT
80             $ERROR_NOT_BINARY_STRING
81             $ERROR_REQUEST_OR_RESPONSE
82             $NOT_SEND_ANY_RESPONSE
83             $WAIT_WRITTEN_TO_LOCAL_LOG
84 12     12   111 );
  12         42  
85 12     12   594 use Kafka::Exceptions;
  12         33  
  12         647  
86 12         76768 use Kafka::Internals qw(
87             $APIKEY_FETCH
88             $APIKEY_METADATA
89             $APIKEY_OFFSET
90             $APIKEY_PRODUCE
91             $APIKEY_APIVERSIONS
92             $APIKEY_FINDCOORDINATOR
93             $APIKEY_OFFSETCOMMIT
94             $APIKEY_OFFSETFETCH
95             $PRODUCER_ANY_OFFSET
96             format_message
97 12     12   74 );
  12         25  
98              
99              
100             =head1 SYNOPSIS
101              
102             use 5.010;
103             use strict;
104             use warnings;
105              
106             use Data::Compare;
107             use Kafka qw(
108             $COMPRESSION_NONE
109             $ERROR_NO_ERROR
110             $REQUEST_TIMEOUT
111             $WAIT_WRITTEN_TO_LOCAL_LOG
112             );
113             use Kafka::Internals qw(
114             $PRODUCER_ANY_OFFSET
115             );
116             use Kafka::Protocol qw(
117             decode_produce_response
118             encode_produce_request
119             );
120              
121             # a encoded produce request hex stream
122             my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
123              
124             # a decoded produce request
125             my $decoded = {
126             CorrelationId => 4,
127             ClientId => q{},
128             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
129             Timeout => $REQUEST_TIMEOUT * 100, # ms
130             topics => [
131             {
132             TopicName => 'mytopic',
133             partitions => [
134             {
135             Partition => 0,
136             MessageSet => [
137             {
138             Offset => $PRODUCER_ANY_OFFSET,
139             MagicByte => 0,
140             Attributes => $COMPRESSION_NONE,
141             Key => q{},
142             Value => 'Hello!',
143             },
144             ],
145             },
146             ],
147             },
148             ],
149             };
150              
151             my $encoded_request = encode_produce_request( $decoded );
152             say 'encoded correctly' if $encoded_request eq $encoded;
153              
154             # a encoded produce response hex stream
155             $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
156              
157             # a decoded produce response
158             $decoded = {
159             CorrelationId => 4,
160             topics => [
161             {
162             TopicName => 'mytopic',
163             partitions => [
164             {
165             Partition => 0,
166             ErrorCode => $ERROR_NO_ERROR,
167             Offset => 0,
168             },
169             ],
170             },
171             ],
172             };
173              
174             my $decoded_response = decode_produce_response( \$encoded );
175             say 'decoded correctly' if Compare( $decoded_response, $decoded );
176              
177             # more examples, see t/*_decode_encode.t
178              
179             =head1 DESCRIPTION
180              
181             This module is not a user module.
182              
183             In order to achieve better performance,
184             functions of this module do not perform arguments validation.
185              
186             The main features of the C module are:
187              
188             =over 3
189              
190             =item *
191              
192             Supports parsing the Apache Kafka protocol.
193              
194             =item *
195              
196             Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
197             Within this package we currently support
198             access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
199              
200             =item *
201              
202             Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
203              
204             =back
205              
206             =cut
207              
208             # A Guide To The Kafka Protocol 0.8:
209             # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
210             #
211             # -- Protocol Primitive Types
212             # int8, int16, int32, int64
213             # Signed integers
214             # stored in big endian order.
215             # bytes, string
216             # consist of a signed integer
217             # giving a length N
218             # followed by N bytes of content.
219             # A length of -1 indicates null.
220             # string uses an int16 for its size,
221             # and bytes uses an int32.
222             # Arrays
223             # These will always be encoded as an int32 size containing the length N
224             # followed by N repetitions of the structure
225             # which can itself be made up of other primitive types.
226             #
227             # -- N.B.
228             # - The response will always match the paired request
229             # - One structure common to both the produce and fetch requests is the message set format.
230             # - MessageSets are not preceded by an int32 like other array elements in the protocol.
231             # - A message set is also the unit of compression in Kafka,
232             # and we allow messages to recursively contain compressed message sets.
233             #
234             # -- Protocol Fields
235             # ApiKey => int16 That identifies the API being invoked
236             # ApiVersion => int16 This is a numeric version number for this api.
237             # Currently the supported version for all APIs is 0.
238             # Attributes => int8 Metadata attributes about the message.
239             # The lowest 2 bits contain the compression codec used for the message.
240             # ClientId => string This is a user supplied identifier for the client application.
241             # CorrelationId => int32 This is a user-supplied integer.
242             # It will be passed back in the response by the server, unmodified.
243             # It is useful for matching request and response between the client and server.
244             # Crc => int32 The CRC32 of the remainder of the message bytes.
245             # ErrorCode => int16 The error from this partition, if any.
246             # Errors are given on a per-partition basis
247             # because a given partition may be unavailable or maintained on a different host,
248             # while others may have successfully accepted the produce request.
249             # FetchOffset => int64 The offset to begin this fetch from.
250             # HighwaterMarkOffset => int64 The offset at the end of the log for this partition.
251             # This can be used by the client to determine how many messages behind the end of the log they are.
252             # - 0.8 documents: Replication design
253             # The high watermark is the offset of the last committed message.
254             # Each log is periodically synced to disks.
255             # Data before the flushed offset is guaranteed to be persisted on disks.
256             # As we will see, the flush offset can be before or after high watermark.
257             # - 0.7 documents: Wire protocol
258             # If the last segment file for the partition is not empty and was modified earlier than TIME,
259             # it will return both the first offset for that segment and the high water mark.
260             # The high water mark is not the offset of the last message,
261             # but rather the offset that the next message sent to the partition will be written to.
262             # Host => string The brokers hostname
263             # Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
264             # Key => bytes An optional message key
265             # The key can be null.
266             # Leader => int32 The node id for the kafka broker currently acting as leader for this partition.
267             # If no leader exists because we are in the middle of a leader election this id will be -1.
268             # MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format.
269             # MaxBytes => int32 The maximum bytes to include in the message set for this partition.
270             # MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets
271             # MaxWaitTime => int32 The maximum amount of time (ms)
272             # to block waiting
273             # if insufficient data is available at the time the request is issued.
274             # MessageSetSize => int32 The size in bytes of the message set for this partition
275             # MessageSize => int32 The size of the subsequent request or response message in bytes
276             # MinBytes => int32 The minimum number of bytes of messages that must be available to give a response.
277             # If the client sets this to 0 the server will always respond immediately.
278             # If this is set to 1,
279             # the server will respond as soon
280             # as at least one partition
281             # has at least 1 byte of data
282             # or the specified timeout occurs.
283             # By setting higher values
284             # in combination with the timeout
285             # for reading only large chunks of data
286             # NodeId => int32 The id of the broker.
287             # This must be set to a unique integer for each broker.
288             # Offset => int64 The offset used in kafka as the log sequence number.
289             # When the producer is sending messages it doesn't actually know the offset
290             # and can fill in any value here it likes.
291             # Partition => int32 The id of the partition the fetch is for
292             # or the partition that data is being published to
293             # or the partition this response entry corresponds to.
294             # Port => int32 The brokers port
295             # ReplicaId => int32 Indicates the node id of the replica initiating this request.
296             # Normal client consumers should always specify this as -1 as they have no node id.
297             # Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition.
298             # RequiredAcks => int16 Indicates how many acknowledgements the servers should receive
299             # before responding to the request.
300             # If it is 0 the server does not send any response.
301             # If it is 1, the server will wait the data is written to the local log before sending a response.
302             # If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
303             # Size => int32 The size of the subsequent request or response message in bytes
304             # Time => int64 Used to ask for all messages before a certain time (ms).
305             # There are two special values.
306             # Specify -1 to receive the latest offset (this will only ever return one offset).
307             # Specify -2 to receive the earliest available offsets.
308             # Timeout => int32 This provides a maximum time (ms) the server can await the receipt
309             # of the number of acknowledgements in RequiredAcks.
310             # The timeout is not an exact limit on the request time for a few reasons:
311             # (1) it does not include network latency,
312             # (2) the timer begins at the beginning of the processing of this request
313             # so if many requests are queued due to server overload
314             # that wait time will not be included,
315             # (3) we will not terminate a local write
316             # so if the local write time exceeds this timeout it will not be respected.
317             # To get a hard timeout of this type the client should use the socket timeout.
318             # TopicName => string The name of the topic.
319             # Value => bytes The actual message contents
320             # Kafka supports recursive messages in which case this may itself contain a message set.
321             # The message can be null.
322              
323             our $_int64_template; # Used to unpack a 64 bit value
324             if ( $BITS64 ) {
325             $_int64_template = q{q>};
326             # unpack a big-endian signed quad (64-bit) value on 64 bit systems.
327 35281     35281   279913 *_unpack64 = sub { $_[0] };
328             # pack a big-endian signed quad (64-bit) value on 64 bit systems.
329 40318     40318   317879 *_pack64 = sub { pack( q{q>}, $_[0] ) };
330             } else {
331             eval q{ require Kafka::Int64; } ## no critic
332             or die "Cannot load Kafka::Int64 : $@";
333              
334             $_int64_template = q{a[8]};
335             # unpack a big-endian signed quad (64-bit) value on 32 bit systems.
336             *_unpack64 = \&Kafka::Int64::unpackq;
337             # pack a big-endian signed quad (64-bit) value on 32 bit systems.
338             *_pack64 = \&Kafka::Int64::packq;
339             }
340              
341             =head2 EXPORT
342              
343             The following constants are available for export
344              
345             =cut
346              
347             =head3 C<$DEFAULT_APIVERSION>
348              
349             The default API version that will be used as fallback, if it's not possible to
350             detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be
351             queried to get which API version they implements. On Kafka servers 0.8.x and
352             0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
353             value is '0'
354              
355             =cut
356              
357             const our $DEFAULT_APIVERSION => 0;
358              
359             # HashRef, keys are APIKEYs, values are the api version that this protocol
360             # implements. If not populated, the connection will use the $DEFAULT_APIVERSION
361             our $IMPLEMENTED_APIVERSIONS = {};
362              
363             # Attributes
364              
365             # According to Apache Kafka documentation:
366             # Attributes - Metadata attributes about the message.
367             # The lowest 2 bits contain the compression codec used for the message.
368             const our $COMPRESSION_CODEC_MASK => 0b11;
369              
370             =head3 C<$CONSUMERS_REPLICAID>
371              
372             According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
373              
374             =cut
375             const our $CONSUMERS_REPLICAID => -1;
376              
377             =head3 C<$NULL_BYTES_LENGTH>
378              
379             According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
380              
381             =cut
382             const our $NULL_BYTES_LENGTH => -1;
383              
384             =head3 C<$BAD_OFFSET>
385              
386             According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
387             and can fill in any value here it likes.'
388              
389             =cut
390             const our $BAD_OFFSET => -1;
391              
392             my ( $_Request_header_template, $_Request_header_length ) = (
393             q{l>s>s>l>s>}, # Size
394             # 2 ApiKey
395             # 2 ApiVersion
396             # 4 CorrelationId
397             # 2 ClientId length
398             10 # 'Size' is not included in the calculation of length
399             );
400             my ( $_ProduceRequest_header_template, $_ProduceRequest_header_length ) = (
401             q{s>l>l>}, # 2 RequiredAcks
402             # 4 Timeout
403             # 4 topics array size
404             10
405             );
406             my ( $_MessageSet_template, $_MessageSet_length ) = (
407             # qq{${_int64_template}l>},
408             q{a[8]l>},
409             # 8 Offset
410             # 4 MessageSize
411             12
412             );
413             my ( $_FetchRequest_header_template, $_FetchRequest_header_length ) = (
414             q{l>l>l>l>},
415             # 4 ReplicaId
416             # 4 MaxWaitTime
417             # 4 MinBytes
418             # 4 topics array size
419             16
420             );
421             my ( $_FetchRequest_header_template_v3, $_FetchRequest_header_length_v3 ) = (
422             q{l>l>l>l>l>},
423             # 4 ReplicaId
424             # 4 MaxWaitTime
425             # 4 MinBytes
426             # 4 MaxBytes
427             # 4 topics array size
428             20
429             );
430             my ( $_FetchRequest_body_template, $_FetchRequest_body_length ) = (
431             # qq{l>${_int64_template}l>},
432             q{l>a[8]l>},
433             # 4 Partition
434             # 8 FetchOffset
435             # 4 MaxBytes
436             16
437             );
438             my ( $_OffsetRequest_header_template, $_OffsetRequest_header_length ) = (
439             q{l>l>}, # 4 ReplicaId
440             # 4 topics array size
441             8
442             );
443             my ( $_OffsetRequest_body_template, $_OffsetRequest_body_length ) = (
444             # qq{l>${_int64_template}l>},
445             q{l>a[8]l>},
446             # 4 Partition
447             # 8 Time
448             # 4 MaxNumberOfOffsets
449             16
450             );
451             my ( $_OffsetRequest_body_template_v1, $_OffsetRequest_body_length_v1 ) = (
452             # qq{l>${_int64_template}},
453             q{l>a[8]},
454             # 4 Partition
455             # 8 Time
456             12
457             );
458             my ( $_FetchResponse_header_template, $_FetchResponse_header_length ) = (
459             q{x[l]l>l>}, # Size (skip)
460             # 4 CorrelationId
461             # 4 topics array size
462             8
463             );
464             my ( $_FetchResponse_header_template_v1, $_FetchResponse_header_length_v1 ) = (
465             q{x[l]l>l>l>}, # Size (skip)
466             # 4 CorrelationId
467             # 4 throttle_time_ms
468             # 4 topics array size
469             12
470             );
471             my ( $_Message_template, $_Message_length ) = (
472             qq(${_int64_template}l>l>ccl>),
473             # 8 Offset
474             # MessageSize
475             # Crc
476             # MagicByte
477             # Attributes
478             # Key length
479             8 # Only Offset length
480             );
481             my ( $_Message_template_with_timestamp, $_Message_length_with_timestamp ) = (
482             qq(${_int64_template}l>l>cc${_int64_template}l>),
483             # 8 Offset
484             # MessageSize
485             # Crc
486             # MagicByte
487             # Attributes
488             # Timestamp
489             # Key length
490             8 # Only Offset length
491             );
492              
493             my ( $_FetchResponse_topic_body_template, $_FetchResponse_topic_body_length )= (
494             qq(s>/al>l>s>${_int64_template}),
495             # TopicName
496             # partitions array size
497             # 4 Partition
498             # 2 ErrorCode
499             # 8 HighwaterMarkOffset
500             14 # without TopicName and partitions array size
501             );
502             my $_Key_or_Value_template = q{X[l]l>/a}; # Key or Value
503              
504             #-- public functions -----------------------------------------------------------
505              
506             =head2 FUNCTIONS
507              
508             The following functions are available for C module.
509              
510             =cut
511              
512             =head3 C
513              
514             Encodes the argument and returns a reference to the encoded binary string
515             representing a Request buffer.
516              
517             This function takes the following arguments:
518              
519             =over 3
520              
521             =item C<$ApiVersions_Request>
522              
523             C<$ApiVersions_Request> is a reference to the hash representing the structure
524             of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
525             string), and ApiVersion (must be 0)
526              
527             =back
528              
529             =cut
530              
531             $IMPLEMENTED_APIVERSIONS->{$APIKEY_APIVERSIONS} = 0;
532             sub encode_api_versions_request {
533 0     0 1 0 my ( $ApiVersions_Request ) = @_;
534              
535 0         0 my @data;
536 0         0 my $request = {
537             # template => '...',
538             # len => ...,
539             data => \@data,
540             };
541              
542 0         0 _encode_request_header( $request, $APIKEY_APIVERSIONS, $ApiVersions_Request );
543             # Size
544             # ApiKey
545             # ApiVersion
546             # CorrelationId
547             # ClientId
548              
549 0         0 return pack( $request->{template}, $request->{len}, @data );
550             }
551              
552             my $_decode_api_version_response_template = q{x[l]l>s>l>X[l]l>/(s>s>s>)};
553             # x[l] # Size (skip)
554             # l> # CorrelationId
555             # s> # ErrorCode
556             # l> # ApiVersions array size
557             # X[l]
558             # l>/( # ApiVersions array
559             # s> # ApiKey
560             # s> # MinVersion
561             # s> # MaxVersion
562             # )
563              
564             =head3 C
565              
566             Decodes the argument and returns a reference to the hash representing
567             the structure of the APIVERSIONS Response.
568              
569             This function takes the following arguments:
570              
571             =over 3
572              
573             =item C<$bin_stream_ref>
574              
575             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
576             must be a non-empty binary string.
577              
578             =back
579              
580             =cut
581              
582             sub decode_api_versions_response {
583 0     0 1 0 my ( $bin_stream_ref ) = @_;
584              
585 0         0 my @data = unpack( $_decode_api_version_response_template, $$bin_stream_ref );
586              
587 0         0 my $i = 0;
588 0         0 my $ApiVersions_Response = {};
589              
590 0         0 $ApiVersions_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
591 0         0 $ApiVersions_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
592              
593 0         0 my $ApiVersions_array = $ApiVersions_Response->{ApiVersions} = [];
594 0         0 my $ApiVersions_array_size = $data[ $i++ ]; # ApiVersions array size
595 0         0 while ( $ApiVersions_array_size-- ) {
596 0         0 push( @$ApiVersions_array, {
597             ApiKey => $data[ $i++ ], # ApiKey
598             MinVersion => $data[ $i++ ], # MinVersion
599             MaxVersion => $data[ $i++ ], # MaxVersion
600             }
601             );
602             }
603              
604 0         0 return $ApiVersions_Response;
605             }
606              
607             =head3 C
608              
609             Encodes the argument and returns a reference to the encoded binary string
610             representing a Request buffer.
611              
612             This function takes the following arguments:
613              
614             =over 3
615              
616             =item C<$FindCoordinator_Request>
617              
618             C<$FindCoordinator_Request> is a reference to the hash representing the structure
619             of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
620             string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
621              
622             =back
623              
624             =cut
625              
626             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FINDCOORDINATOR} = 1;
627             sub encode_find_coordinator_request {
628 0     0 1 0 my ( $FindCoordinator_Request ) = @_;
629              
630 0         0 my @data;
631 0         0 my $request = {
632             # template => '...',
633             # len => ...,
634             data => \@data,
635             };
636              
637 0         0 my $api_version =
638             _encode_request_header( $request, $APIKEY_FINDCOORDINATOR, $FindCoordinator_Request );
639             # Size
640             # ApiKey
641             # ApiVersion
642             # CorrelationId
643             # ClientId
644              
645 0         0 my $coordinator_key = $FindCoordinator_Request->{CoordinatorKey};
646 0         0 my $coordinator_type = $FindCoordinator_Request->{CoordinatorType};
647              
648 0         0 $request->{template} .= q{s>}; # string length
649 0         0 $request->{len} += 2;
650 0         0 _encode_string( $request, $coordinator_key ); # CoordinatorKey (GroupId for version 0)
651 0 0       0 if ($api_version >= 1) {
652             # CoordinatorType (0 for groups)
653 0         0 $request->{template} .= q{c>};
654 0         0 $request->{len} += 1;
655 0         0 push( @{ $request->{data} }, $coordinator_type );
  0         0  
656             }
657              
658 0         0 return pack( $request->{template}, $request->{len}, @data );
659             }
660              
661             my $_decode_find_protocol_response_template = q{x[l]l>s>l>s>/al>};
662             # x[l] # Size (skip)
663             # l> # CorrelationId
664             # s> # ErrorCode
665             # l> # NodeId
666             # s>/a # Host
667             # l> # Port
668              
669             my $_decode_find_protocol_response_template_v1 = q{x[l]l>l>s>l>s>/al>};
670             # x[l] # Size (skip)
671             # l> # CorrelationId
672             # l> # throttle_time_ms
673             # s> # ErrorCode
674             # s>/a # ErrorMessage
675             # l> # NodeId
676             # s>/a # Host
677             # l> # Port
678              
679             =head3 C
680              
681             Decodes the argument and returns a reference to the hash representing
682             the structure of the FINDCOORDINATOR Response.
683              
684             This function takes the following arguments:
685              
686             =over 3
687              
688             =item C<$bin_stream_ref>
689              
690             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
691             must be a non-empty binary string.
692              
693             =back
694              
695             =cut
696              
697             sub decode_find_coordinator_response {
698 0     0 1 0 my ( $bin_stream_ref, $api_version ) = @_;
699              
700 0   0     0 $api_version //= $DEFAULT_APIVERSION;
701 0         0 my $is_v1 = $api_version == 1;
702              
703 0 0       0 my @data = unpack( $is_v1 ? $_decode_find_protocol_response_template_v1
704             : $_decode_find_protocol_response_template, $$bin_stream_ref );
705              
706 0         0 my $i = 0;
707 0         0 my $FindCoordinator_Response = {};
708              
709 0         0 $FindCoordinator_Response->{CorrelationId} = $data[ $i++ ];
710 0 0       0 if ($is_v1) {
711 0         0 $FindCoordinator_Response->{ThrottleTimeMs} = $data[ $i++ ]; # only v1
712             }
713 0         0 $FindCoordinator_Response->{ErrorCode} = $data[ $i++ ];
714 0 0       0 if ($is_v1) {
715 0         0 $FindCoordinator_Response->{ErrorMessage} = $data[ $i++ ]; # only v1
716             }
717 0         0 $FindCoordinator_Response->{NodeId} = $data[ $i++ ];
718 0         0 $FindCoordinator_Response->{Host} = $data[ $i++ ];
719 0         0 $FindCoordinator_Response->{Port} = $data[ $i++ ];
720              
721 0         0 return $FindCoordinator_Response;
722             }
723              
724             # PRODUCE Request --------------------------------------------------------------
725              
726             =head3 C
727              
728             Encodes the argument and returns a reference to the encoded binary string
729             representing a Request buffer.
730              
731             This function takes the following arguments:
732              
733             =over 3
734              
735             =item C<$Produce_Request>
736              
737             C<$Produce_Request> is a reference to the hash representing
738             the structure of the PRODUCE Request (examples see C).
739              
740             =item C<$compression_codec>
741              
742             Optional.
743              
744             C<$compression_codec> sets the required type of C<$messages> compression,
745             if the compression is desirable.
746              
747             Supported codecs:
748             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
749             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
750             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>.
751              
752             =back
753              
754             =cut
755              
756             $IMPLEMENTED_APIVERSIONS->{$APIKEY_PRODUCE} = 2;
757             sub encode_produce_request {
758 5111     5111 1 18018 my ( $Produce_Request, $compression_codec ) = @_;
759              
760 5111         6493 my @data;
761 5111         11351 my $request = {
762             # template => '...',
763             # len => ...,
764             data => \@data,
765             };
766              
767 5111         10958 _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
768             # Size
769             # ApiKey
770             # ApiVersion
771             # CorrelationId
772             # ClientId
773              
774 5111         6683 my $topics_array = $Produce_Request->{topics};
775             push( @data,
776             $Produce_Request->{RequiredAcks}, # RequiredAcks
777             $Produce_Request->{Timeout}, # Timeout
778 5111         8305 scalar( @$topics_array ), # topics array size
779             );
780 5111         7080 $request->{template} .= $_ProduceRequest_header_template;
781 5111         6256 $request->{len} += $_ProduceRequest_header_length;
782              
783 5111         7582 foreach my $topic ( @$topics_array ) {
784 5111         6310 $request->{template} .= q{s>}; # string length
785 5111         5737 $request->{len} += 2;
786 5111         8916 _encode_string( $request, $topic->{TopicName} ); # TopicName
787              
788 5111         6052 my $partitions_array = $topic->{partitions};
789 5111         6346 push( @data, scalar( @$partitions_array ) );
790 5111         6238 $request->{template} .= q{l>}; # partitions array size
791 5111         6804 $request->{len} += 4;
792 5111         7251 foreach my $partition ( @$partitions_array ) {
793 5111         8144 push( @data, $partition->{Partition} );
794 5111         6032 $request->{template} .= q{l>}; # Partition
795 5111         5663 $request->{len} += 4;
796              
797 5111         8526 _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );
798             }
799             }
800              
801 5111         43874 return pack( $request->{template}, $request->{len}, @data );
802             }
803              
804             # PRODUCE Response -------------------------------------------------------------
805              
806             my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
807             # x[l] # Size (skip)
808             # l> # CorrelationId
809              
810             # l> # topics array size
811             # X[l]
812             # l>/( # topics array
813             # s>/a # TopicName
814              
815             # l> # partitions array size
816             # X[l]
817             # l>/( # partitions array
818             # l> # Partition
819             # s> # ErrorCode
820             # $_int64_template # Offset
821             # )
822             # )
823              
824             my $_decode_produce_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))l>};
825             # x[l] # Size (skip)
826             # l> # CorrelationId
827              
828             # l> # topics array size
829             # X[l]
830             # l>/( # topics array
831             # s>/a # TopicName
832              
833             # l> # partitions array size
834             # X[l]
835             # l>/( # partitions array
836             # l> # Partition
837             # s> # ErrorCode
838             # $_int64_template # Offset
839             # )
840             # )
841             # l> # Throttle_Time_Ms
842              
843             my $_decode_produce_response_template_v2 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))l>};
844             # x[l] # Size (skip)
845             # l> # CorrelationId
846              
847             # l> # topics array size
848             # X[l]
849             # l>/( # topics array
850             # s>/a # TopicName
851              
852             # l> # partitions array size
853             # X[l]
854             # l>/( # partitions array
855             # l> # Partition
856             # s> # ErrorCode
857             # $_int64_template # Offset
858             # $_int64_template # Log_Append_Time
859             # )
860             # )
861             # l> # Throttle_Time_Ms
862              
863             =head3 C
864              
865             Decodes the argument and returns a reference to the hash representing
866             the structure of the PRODUCE Response (examples see C).
867              
868             This function takes the following arguments:
869              
870             =over 3
871              
872             =item C<$bin_stream_ref>
873              
874             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
875             must be a non-empty binary string.
876              
877             =back
878              
879             =cut
880             sub decode_produce_response {
881 98     98 1 675 my ( $bin_stream_ref, $api_version ) = @_;
882              
883 98   33     659 $api_version //= $DEFAULT_APIVERSION;
884 98         270 my $is_v1 = $api_version == 1;
885 98         243 my $is_v2 = $api_version == 2;
886              
887 98 50       1101 my @data = unpack( $is_v1 ? $_decode_produce_response_template_v1
    50          
888             : $is_v2 ? $_decode_produce_response_template_v2
889             : $_decode_produce_response_template, $$bin_stream_ref );
890              
891 98         304 my $i = 0;
892 98         297 my $Produce_Response = {};
893              
894 98         425 $Produce_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
895              
896 98         337 my $topics_array = $Produce_Response->{topics} = [];
897 98         281 my $topics_array_size = $data[ $i++ ]; # topics array size
898 98         395 while ( $topics_array_size-- ) {
899 98         382 my $topic = {
900             TopicName => $data[ $i++ ],
901             };
902              
903 98         320 my $partitions_array = $topic->{partitions} = [];
904 98         241 my $partitions_array_size = $data[ $i++ ]; # partitions array size
905 98         345 while ( $partitions_array_size-- ) {
906 98 50       597 my $partition = {
907             Partition => $data[ $i++ ], # Partition
908             ErrorCode => $data[ $i++ ], # ErrorCode
909             Offset => _unpack64( $data[ $i++ ] ), # Offset
910             ( $is_v2 ? (Log_Append_Time => _unpack64( $data[ $i++ ] )) : () ), # Log_Append_Time
911             };
912              
913 98         479 push( @$partitions_array, $partition );
914             }
915              
916 98         374 push( @$topics_array, $topic );
917             }
918             defined $data[ $i ]
919 98 50       492 and $Produce_Response->{Throttle_Time_Ms} = $data[ $i++ ];
920              
921 98         440 return $Produce_Response;
922             }
923              
924             # FETCH Request ----------------------------------------------------------------
925              
926             =head3 C
927              
928             Encodes the argument and returns a reference to the encoded binary string
929             representing a Request buffer.
930              
931             This function takes the following arguments:
932              
933             =over 3
934              
935             =item C<$Fetch_Request>
936              
937             C<$Fetch_Request> is a reference to the hash representing
938             the structure of the FETCH Request (examples see C).
939              
940             =back
941              
942             =cut
943              
944             # version 0, 1, 2 have the same request protocol.
945             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FETCH} = 3;
946             sub encode_fetch_request {
947 5015     5015 1 12966 my ( $Fetch_Request ) = @_;
948              
949 5015         5908 my @data;
950 5015         10581 my $request = {
951             # template => '...',
952             # len => ...,
953             data => \@data,
954             };
955 5015         10457 my $api_version = _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
956             # Size
957             # ApiKey
958             # ApiVersion
959             # CorrelationId
960             # ClientId
961 5015         7302 my $is_v3 = $api_version == 3;
962              
963 5015         6651 push( @data, $CONSUMERS_REPLICAID ); # ReplicaId
964 5015         6338 my $topics_array = $Fetch_Request->{topics};
965             push( @data,
966             $Fetch_Request->{MaxWaitTime}, # MaxWaitTime
967             $Fetch_Request->{MinBytes}, # MinBytes
968 5015 50       10910 ( $is_v3 ? $Fetch_Request->{MaxBytes} : () ), # MaxBytes (version 3 only)
969             scalar( @$topics_array ), # topics array size
970             );
971 5015 50       7632 if ($is_v3) {
972 0         0 $request->{template} .= $_FetchRequest_header_template_v3;
973 0         0 $request->{len} += $_FetchRequest_header_length_v3;
974             } else {
975 5015         6921 $request->{template} .= $_FetchRequest_header_template;
976 5015         7445 $request->{len} += $_FetchRequest_header_length;
977             }
978              
979 5015         8422 foreach my $topic ( @$topics_array ) {
980 5015         9330 $request->{template} .= q{s>}; # string length
981 5015         7090 $request->{len} += 2;
982 5015         9472 _encode_string( $request, $topic->{TopicName} ); # TopicName
983              
984 5015         5975 my $partitions_array = $topic->{partitions};
985 5015         6265 push( @data, scalar( @$partitions_array ) );
986 5015         6369 $request->{template} .= q{l>}; # partitions array size
987 5015         5970 $request->{len} += 4;
988 5015         6795 foreach my $partition ( @$partitions_array ) {
989             push( @data,
990             $partition->{Partition}, # Partition
991             _pack64( $partition->{FetchOffset} ), # FetchOffset
992             $partition->{MaxBytes}, # MaxBytes
993 5015         9217 );
994 5015         8337 $request->{template} .= $_FetchRequest_body_template;
995 5015         9906 $request->{len} += $_FetchRequest_body_length;
996             }
997             }
998              
999 5015         32199 return pack( $request->{template}, $request->{len}, @data );
1000             }
1001              
1002             # FETCH Response ---------------------------------------------------------------
1003              
1004             =head3 C
1005              
1006             Decodes the argument and returns a reference to the hash representing
1007             the structure of the FETCH Response (examples see C).
1008              
1009             This function takes the following arguments:
1010              
1011             =over 3
1012              
1013             =item C<$bin_stream_ref>
1014              
1015             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1016             must be a non-empty binary string.
1017              
1018             =back
1019              
1020             =cut
1021             sub decode_fetch_response {
1022 5021     5021 1 16324 my ( $bin_stream_ref, $api_version ) = @_;
1023              
1024 5021   33     18609 $api_version //= $DEFAULT_APIVERSION;
1025 5021         6875 my $is_v1 = $api_version == 1;
1026 5021         6060 my $is_v2 = $api_version == 2;
1027 5021         6039 my $is_v3 = $api_version == 3;
1028              
1029              
1030             # According to Apache Kafka documentation:
1031             # As an optimization the server is allowed to return a partial message at the end of the message set.
1032             # Clients should handle this case.
1033             # NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
1034              
1035 5021         6271 my @data;
1036 5021         12602 my $response = {
1037             # template => '...',
1038             # stream_offset => ...,
1039             data => \@data,
1040             bin_stream => $bin_stream_ref,
1041             };
1042              
1043 5021         11702 _decode_fetch_response_template( $response, $api_version );
1044 5021         46732 @data = unpack( $response->{template}, $$bin_stream_ref );
1045              
1046 5021         9494 my $i = 0;
1047 5021         7756 my $Fetch_Response = {};
1048              
1049 5021         10080 $Fetch_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1050 5021 50 33     24877 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1051 0         0 $Fetch_Response->{ThrottleTimeMs} = $data[ $i++ ]; # ( ThrottleTimeMs ) only v1 and above
1052             }
1053              
1054 5021         9549 my $topics_array = $Fetch_Response->{topics} = [];
1055 5021         7606 my $topics_array_size = $data[ $i++ ]; # topics array size
1056 5021         10630 while ( $topics_array_size-- ) {
1057 5021         10710 my $topic = {
1058             TopicName => $ data[ $i++ ], # TopicName
1059             };
1060              
1061 5021         8127 my $partitions_array = $topic->{partitions} = [];
1062 5021         6779 my $partitions_array_size = $data[ $i++ ]; # partitions array size
1063 5021         7231 my ( $MessageSetSize, $MessageSet_array );
1064 5021         9376 while ( $partitions_array_size-- ) {
1065 5021         10811 my $partition = {
1066             Partition => $data[ $i++ ], # Partition
1067             ErrorCode => $data[ $i++ ], # ErrorCode
1068             HighwaterMarkOffset => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset
1069             };
1070              
1071 5021         7331 $MessageSetSize = $data[ $i++ ]; # MessageSetSize
1072 5021         8661 $MessageSet_array = $partition->{MessageSet} = [];
1073              
1074 5021         11670 _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );
1075              
1076 5021         11588 push( @$partitions_array, $partition );
1077             }
1078              
1079 5021         11008 push( @$topics_array, $topic );
1080             }
1081              
1082 5021         22484 return $Fetch_Response;
1083             }
1084              
1085             # OFFSET Request ---------------------------------------------------------------
1086              
1087             =head3 C
1088              
1089             Encodes the argument and returns a reference to the encoded binary string
1090             representing a Request buffer.
1091              
1092             This function takes the following arguments:
1093              
1094             =over 3
1095              
1096             =item C<$Offset_Request>
1097              
1098             C<$Offset_Request> is a reference to the hash representing
1099             the structure of the OFFSET Request (examples see C).
1100              
1101             =back
1102              
1103             =cut
1104             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSET} = 1;
1105             sub encode_offset_request {
1106 27     27 1 1994 my ( $Offset_Request ) = @_;
1107              
1108 27         44 my @data;
1109 27         86 my $request = {
1110             # template => '...',
1111             # len => ...,
1112             data => \@data,
1113             };
1114 27         128 my $api_version = _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
1115             # Size
1116             # ApiKey
1117             # ApiVersion
1118             # CorrelationId
1119             # ClientId
1120              
1121 27         52 my $is_v1 = $api_version == 1;
1122              
1123 27         51 my $topics_array = $Offset_Request->{topics};
1124 27         62 push( @data,
1125             $CONSUMERS_REPLICAID, # ReplicaId
1126             scalar( @$topics_array ), # topics array size
1127             );
1128 27         58 $request->{template} .= $_OffsetRequest_header_template;
1129 27         46 $request->{len} += $_OffsetRequest_header_length;
1130              
1131 27 50       76 my $body_template = $is_v1 ? $_OffsetRequest_body_template_v1 : $_OffsetRequest_body_template;
1132 27 50       61 my $body_length = $is_v1 ? $_OffsetRequest_body_length_v1 : $_OffsetRequest_body_length;
1133 27         66 foreach my $topic ( @$topics_array ) {
1134 27         50 $request->{template} .= q{s>}; # string length
1135 27         46 $request->{len} += 2;
1136 27         62 _encode_string( $request, $topic->{TopicName} ); # TopicName
1137              
1138 27         40 my $partitions_array = $topic->{partitions};
1139 27         49 push( @data, scalar( @$partitions_array ) );
1140 27         62 $request->{template} .= q{l>}; # partitions array size
1141 27         41 $request->{len} += 4; # [l] partitions array size
1142 27         50 foreach my $partition ( @$partitions_array ) {
1143             push( @data,
1144             $partition->{Partition}, # Partition
1145             _pack64( $partition->{Time} ), # Time
1146             $is_v1 ? () : $partition->{MaxNumberOfOffsets}, # MaxNumberOfOffsets
1147 27 50       74 );
1148 27         53 $request->{template} .= $body_template;
1149 27         89 $request->{len} += $body_length;
1150             }
1151             }
1152              
1153             # say STDERR $request->{template};
1154             # say STDERR HexDump($_) foreach @data; use Data::HexDump;
1155 27         346 return pack( $request->{template}, $request->{len}, @data );
1156             }
1157              
1158             # OFFSET Response --------------------------------------------------------------
1159              
1160             my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
1161             # x[l] # Size (skip)
1162             # l> # CorrelationId
1163              
1164             # l> # topics array size
1165             # X[l]
1166             # l>/( # topics array
1167             # s>/a # TopicName
1168              
1169             # l> # PartitionOffsets array size
1170             # X[l]
1171             # l>/( # PartitionOffsets array
1172             # l> # Partition
1173             # s> # ErrorCode
1174              
1175             # l> # Offset array size
1176             # X[l]
1177             # l>/( # Offset array
1178             # $_int64_template # Offset
1179             # )
1180             # )
1181             # )
1182              
1183             my $_decode_offset_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))};
1184             # x[l] # Size (skip)
1185             # l> # CorrelationId
1186              
1187             # l> # topics array size
1188             # X[l]
1189             # l>/( # topics array
1190             # s>/a # TopicName
1191              
1192             # l> # PartitionOffsets array size
1193             # X[l]
1194             # l>/( # PartitionOffsets array
1195             # l> # Partition
1196             # s> # ErrorCode
1197             # $_int64_template # Timestamp
1198             # $_int64_template # Offset
1199             # )
1200             # )
1201              
1202             =head3 C
1203              
1204             Decodes the argument and returns a reference to the hash representing
1205             the structure of the OFFSET Response (examples see C).
1206              
1207             This function takes the following arguments:
1208              
1209             =over 3
1210              
1211             =item C<$bin_stream_ref>
1212              
1213             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1214             must be a non-empty binary string.
1215              
1216             =back
1217              
1218             =cut
1219             sub decode_offset_response {
1220 23     23 1 370 my ( $bin_stream_ref, $api_version ) = @_;
1221              
1222 23   66     65 $api_version //= $DEFAULT_APIVERSION;
1223 23         38 my $is_v1 = $api_version == 1;
1224 23 50       49 my $template = $is_v1 ? $_decode_offset_response_template_v1 : $_decode_offset_response_template;
1225              
1226 23         141 my @data = unpack( $template, $$bin_stream_ref );
1227              
1228 23         48 my $i = 0;
1229 23         39 my $Offset_Response = { };
1230              
1231 23         56 $Offset_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1232              
1233 23         52 my $topics_array = $Offset_Response->{topics} = [];
1234 23         36 my $topics_array_size = $data[ $i++ ]; # topics array size
1235 23         54 while ( $topics_array_size-- ) {
1236 23         51 my $topic = {
1237             TopicName => $data[ $i++ ], # TopicName
1238             };
1239              
1240 23         44 my $PartitionOffsets_array = $topic->{PartitionOffsets} = [];
1241 23         38 my $PartitionOffsets_array_size = $data[ $i++ ]; # PartitionOffsets array size
1242 23         41 my ( $PartitionOffset, $Offset_array, $Offset_array_size );
1243 23         51 while ( $PartitionOffsets_array_size-- ) {
1244 23         58 $PartitionOffset = {
1245             Partition => $data[ $i++ ], # Partition
1246             ErrorCode => $data[ $i++ ], # ErrorCode
1247             };
1248 23 50       49 if ($is_v1) {
1249 0         0 $PartitionOffset->{Timestamp} = _unpack64($data[ $i++ ]); # Timestamp (v1 only)
1250 0         0 $PartitionOffset->{Offset} = _unpack64($data[ $i++ ]); # Offset (v1 only)
1251              
1252             } else {
1253 23         43 $Offset_array = $PartitionOffset->{Offset} = [];
1254 23         36 $Offset_array_size = $data[ $i++ ]; # Offset array size
1255 23         47 while ( $Offset_array_size-- ) {
1256 41         83 push( @$Offset_array, _unpack64( $data[ $i++ ] ) ); # Offset
1257             }
1258             }
1259              
1260 23         52 push( @$PartitionOffsets_array, $PartitionOffset );
1261             }
1262              
1263 23         62 push( @$topics_array, $topic );
1264             }
1265              
1266 23         67 return $Offset_Response;
1267             }
1268              
1269             # METADATA Request -------------------------------------------------------------
1270              
1271             =head3 C
1272              
1273             Encodes the argument and returns a reference to the encoded binary string
1274             representing a Request buffer.
1275              
1276             This function takes the following arguments:
1277              
1278             =over 3
1279              
1280             =item C<$Metadata_Request>
1281              
1282             C<$Metadata_Request> is a reference to the hash representing
1283             the structure of the METADATA Request (examples see C).
1284              
1285             =back
1286              
1287             =cut
1288             $IMPLEMENTED_APIVERSIONS->{$APIKEY_METADATA} = 0;
1289             sub encode_metadata_request {
1290 127     127 1 3522 my ( $Metadata_Request ) = @_;
1291              
1292 127         296 my @data;
1293 127         592 my $request = {
1294             # template => '...',
1295             # len => ...,
1296             data => \@data,
1297             };
1298              
1299 127         945 _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
1300             # Size
1301             # ApiKey
1302             # ApiVersion
1303             # CorrelationId
1304             # ClientId
1305              
1306 127         399 my $topics_array = $Metadata_Request->{topics};
1307 127         415 push( @data, scalar( @$topics_array ) ); # topics array size
1308 127         579 $request->{template} .= q{l>};
1309 127         346 $request->{len} += 4;
1310              
1311 127         642 foreach my $topic ( @$topics_array ) {
1312 127         408 $request->{template} .= q{s>}; # string length
1313 127         303 $request->{len} += 2;
1314 127         473 _encode_string( $request, $topic ); # TopicName
1315             }
1316              
1317 127         2067 return pack( $request->{template}, $request->{len}, @data );
1318             }
1319              
1320             # METADATA Response ------------------------------------------------------------
1321              
1322             my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
1323             # x[l] # Size (skip)
1324             # l> # CorrelationId
1325              
1326             # l> # Broker array size
1327             # X[l]
1328             # l>/( # Broker array
1329             # l> # NodeId
1330             # s>/a # Host
1331             # l> # Port
1332             # )
1333              
1334             # l> # TopicMetadata array size
1335             # X[l]
1336             # l>/( # TopicMetadata array
1337             # s> # ErrorCode
1338             # s>/a # TopicName
1339              
1340             # l> # PartitionMetadata array size
1341             # X[l]
1342             # l>/( # PartitionMetadata array
1343             # s> # ErrorCode
1344             # l> # Partition
1345             # l> # Leader
1346              
1347             # l> # Replicas array size
1348             # X[l]
1349             # l>/( # Replicas array
1350             # l> # ReplicaId
1351             # )
1352              
1353             # l> # Isr array size
1354             # X[l]
1355             # l>/( # Isr array
1356             # l> # ReplicaId
1357             # )
1358             # )
1359             # )
1360              
1361             =head3 C
1362              
1363             Decodes the argument and returns a reference to the hash representing
1364             the structure of the METADATA Response (examples see C).
1365              
1366             This function takes the following arguments:
1367              
1368             =over 3
1369              
1370             =item C<$bin_stream_ref>
1371              
1372             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1373             must be a non-empty binary string.
1374              
1375             =back
1376              
1377             =cut
1378             sub decode_metadata_response {
1379 120     120 1 904 my ( $bin_stream_ref ) = @_;
1380              
1381 120         1841 my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );
1382              
1383 120         451 my $i = 0;
1384 120         426 my $Metadata_Response = {};
1385              
1386 120         671 $Metadata_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1387              
1388 120         579 my $Broker_array = $Metadata_Response->{Broker} = [];
1389 120         406 my $Broker_array_size = $data[ $i++ ]; # Broker array size
1390 120         464 while ( $Broker_array_size-- ) {
1391 360         1948 push( @$Broker_array, {
1392             NodeId => $data[ $i++ ], # NodeId
1393             Host => $data[ $i++ ], # Host
1394             Port => $data[ $i++ ], # Port
1395             }
1396             );
1397             }
1398              
1399 120         451 my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} = [];
1400 120         357 my $TopicMetadata_array_size = $data[ $i++ ]; # TopicMetadata array size
1401 120         376 while ( $TopicMetadata_array_size-- ) {
1402 120         669 my $TopicMetadata = {
1403             ErrorCode => $data[ $i++ ], # ErrorCode
1404             TopicName => $data[ $i++ ], # TopicName
1405             };
1406              
1407 120         452 my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} = [];
1408 120         325 my $PartitionMetadata_array_size = $data[ $i++ ]; # PartitionMetadata array size
1409 120         691 while ( $PartitionMetadata_array_size-- ) {
1410 120         676 my $PartitionMetadata = {
1411             ErrorCode => $data[ $i++ ], # ErrorCode
1412             Partition => $data[ $i++ ], # Partition
1413             Leader => $data[ $i++ ], # Leader
1414             };
1415              
1416 120         414 my $Replicas_array = $PartitionMetadata->{Replicas} = [];
1417 120         422 my $Replicas_array_size = $data[ $i++ ]; # Replicas array size
1418 120         643 while ( $Replicas_array_size-- ) {
1419 360         1536 push( @$Replicas_array, $data[ $i++ ] ); # ReplicaId
1420             }
1421              
1422 120         419 my $Isr_array = $PartitionMetadata->{Isr} = [];
1423 120         305 my $Isr_array_size = $data[ $i++ ]; # Isr array size
1424 120         532 while ( $Isr_array_size-- ) {
1425 360         961 push( @$Isr_array, $data[ $i++ ] ); # ReplicaId
1426             }
1427              
1428 120         468 push( @$PartitionMetadata_array, $PartitionMetadata );
1429             }
1430              
1431 120         413 push( @$TopicMetadata_array, $TopicMetadata );
1432             }
1433              
1434 120         577 return $Metadata_Response;
1435             }
1436              
1437             # OffsetCommit Request -------------------------------------------------------------
1438              
1439             =head3 C
1440              
1441             Encodes the argument and returns a reference to the encoded binary string
1442             representing a Request buffer.
1443              
1444             This function takes the following arguments:
1445              
1446             =over 3
1447              
1448             =item C<$OffsetCommit_Request>
1449              
1450             C<$OffsetCommit_Request> is a reference to the hash representing
1451             the structure of the OffsetCommit Request (examples see C).
1452              
1453             =back
1454              
1455             =cut
1456             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETCOMMIT} = 1;
1457             sub encode_offsetcommit_request {
1458 0     0 1 0 my ( $OffsetCommit_Request ) = @_;
1459              
1460 0         0 my @data;
1461 0         0 my $request = {
1462             # template => '...',
1463             # len => ...,
1464             data => \@data,
1465             };
1466              
1467 0         0 my $api_version = _encode_request_header( $request, $APIKEY_OFFSETCOMMIT, $OffsetCommit_Request );
1468             # Size
1469             # ApiKey
1470             # ApiVersion
1471             # CorrelationId
1472             # ClientId
1473              
1474 0         0 my $is_v1 = $api_version == 1;
1475              
1476 0         0 $request->{template} .= q{s>};
1477 0         0 $request->{len} += 2;
1478 0         0 _encode_string( $request, $OffsetCommit_Request->{GroupId} ); # GroupId
1479              
1480 0 0       0 if ($is_v1) {
1481 0         0 $request->{template} .= q{l>}; # GroupGenerationId
1482 0         0 $request->{len} += 4;
1483             ### WARNING TODO: we don't support consumer groups properly, so we set
1484             ### generation id to -1 and member id null (see
1485             ### https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol )
1486 0         0 push( @data, -1 );
1487              
1488 0         0 $request->{template} .= q{s>}; # string length
1489 0         0 $request->{len} += 2;
1490 0         0 _encode_string( $request, '' ); # MemberId
1491             }
1492              
1493 0         0 my $topics_array = $OffsetCommit_Request->{topics};
1494 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1495 0         0 $request->{template} .= q{l>};
1496 0         0 $request->{len} += 4;
1497              
1498 0         0 foreach my $topic ( @$topics_array ) {
1499 0         0 $request->{template} .= q{s>};
1500 0         0 $request->{len} += 2;
1501 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1502              
1503 0         0 my $partitions_array = $topic->{partitions};
1504 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1505 0         0 $request->{template} .= q{l>};
1506 0         0 $request->{len} += 4;
1507              
1508 0         0 foreach my $partition ( @$partitions_array ){
1509             push( @data,
1510             $partition->{Partition}, # Partition
1511             $partition->{Offset}, # Offset
1512 0         0 );
1513 0         0 $request->{template} .= qq{l>${_int64_template}};
1514 0         0 $request->{len} += 12;
1515              
1516 0 0       0 if ($is_v1) { # timestamp
1517             ### WARNING TODO: currently we hardcode "now"
1518 0         0 push( @data, time() * 1000);
1519 0         0 $request->{template} .= qq{${_int64_template}};
1520 0         0 $request->{len} += 8;
1521             }
1522              
1523 0         0 $request->{template} .= q{s>};
1524 0         0 $request->{len} += 2;
1525 0         0 _encode_string( $request, $partition->{Metadata} ), # Metadata
1526             }
1527             }
1528 0         0 return pack( $request->{template}, $request->{len}, @data );
1529             }
1530              
1531             # OFFSETCOMMIT Response --------------------------------------------------------------
1532              
1533             my $_decode_offsetcommit_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))};
1534             # x[l] # Size (skip)
1535             # l> # CorrelationId
1536             # l> # topics array size
1537             # X[l]
1538             # l>/( # topics array
1539             # s>/a # TopicName
1540             # l> # partitions array size
1541             # X[l]
1542             # l>/( # partitions array
1543             # l> # Partition
1544             # s> # ErrorCode
1545             # )
1546             # )
1547              
1548             =head3 C
1549              
1550             Decodes the argument and returns a reference to the hash representing
1551             the structure of the OFFSETCOMMIT Response (examples see C).
1552              
1553             This function takes the following arguments:
1554              
1555             =over 3
1556              
1557             =item C<$bin_stream_ref>
1558              
1559             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1560             must be a non-empty binary string.
1561              
1562             =back
1563              
1564             =cut
1565             sub decode_offsetcommit_response {
1566 0     0 1 0 my ( $bin_stream_ref ) = @_;
1567              
1568 0         0 my @data = unpack( $_decode_offsetcommit_response_template, $$bin_stream_ref );
1569              
1570 0         0 my $i = 0;
1571 0         0 my $OffsetCommit_Response = {};
1572              
1573 0         0 $OffsetCommit_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1574              
1575 0         0 my $topics_array = $OffsetCommit_Response->{topics} = [];
1576 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1577 0         0 while ( $topics_array_size-- ) {
1578 0         0 my $topic = {
1579             TopicName => $data[ $i++ ], # TopicName
1580             };
1581              
1582 0         0 my $Partition_array = $topic->{partitions} = [];
1583 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1584 0         0 while ( $Partition_array_size-- ) {
1585 0         0 my $Partition = {
1586             Partition => $data[ $i++ ], # Partition
1587             ErrorCode => $data[ $i++ ], # ErrorCode
1588             };
1589 0         0 push( @$Partition_array, $Partition);
1590             }
1591 0         0 push( @$topics_array, $topic);
1592             }
1593 0         0 return $OffsetCommit_Response;
1594             }
1595              
1596             # OffsetFetch Request -------------------------------------------------------------
1597              
1598             =head3 C
1599              
1600             Encodes the argument and returns a reference to the encoded binary string
1601             representing a Request buffer.
1602              
1603             This function takes the following arguments:
1604              
1605             =over 3
1606              
1607             =item C<$OffsetFetch_Request>
1608              
1609             C<$OffsetFetch_Request> is a reference to the hash representing
1610             the structure of the OffsetFetch Request (examples see C).
1611              
1612             =back
1613              
1614             =cut
1615             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETFETCH} = 1;
1616             sub encode_offsetfetch_request {
1617 0     0 1 0 my ( $OffsetFetch_Request ) = @_;
1618              
1619 0         0 my @data;
1620 0         0 my $request = {
1621             # template => '...',
1622             # len => ...,
1623             data => \@data,
1624             };
1625              
1626 0         0 _encode_request_header( $request, $APIKEY_OFFSETFETCH, $OffsetFetch_Request );
1627             # Size
1628             # ApiKey
1629             # ApiVersion
1630             # CorrelationId
1631             # ClientId
1632              
1633 0         0 $request->{template} .= q{s>};
1634 0         0 $request->{len} += 2;
1635 0         0 _encode_string( $request, $OffsetFetch_Request->{GroupId} ); # GroupId
1636              
1637 0         0 my $topics_array = $OffsetFetch_Request->{topics};
1638 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1639 0         0 $request->{template} .= q{l>};
1640 0         0 $request->{len} += 4;
1641              
1642 0         0 foreach my $topic ( @$topics_array ) {
1643 0         0 $request->{template} .= q{s>};
1644 0         0 $request->{len} += 2;
1645 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1646              
1647 0         0 my $partitions_array = $topic->{partitions};
1648 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1649 0         0 $request->{template} .= q{l>};
1650 0         0 $request->{len} += 4;
1651              
1652 0         0 foreach my $partition ( @$partitions_array ){
1653             push( @data,
1654             $partition->{Partition}, # Partition
1655 0         0 );
1656 0         0 $request->{template} .= qq{l>};
1657 0         0 $request->{len} += 4;
1658             }
1659             }
1660 0         0 return pack( $request->{template}, $request->{len}, @data );
1661             }
1662              
1663             # OFFSETFETCH Response --------------------------------------------------------------
1664              
1665             my $_decode_offsetfetch_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}s>/as>))};
1666             # x[l] # Size (skip)
1667             # l> # CorrelationId
1668             # l> # topics array size
1669             # X[l]
1670             # l>/( # topics array
1671             # s>/a # TopicName
1672             # l> # partitions array size
1673             # X[l]
1674             # l>/( # partitions array
1675             # l> # Partition
1676             # $_int64_template # Offset
1677             # s>/a # Metadata
1678             # s> # ErrorCode
1679             # )
1680             # )
1681              
1682             =head3 C
1683              
1684             Decodes the argument and returns a reference to the hash representing
1685             the structure of the OFFSETFETCH Response (examples see C).
1686              
1687             This function takes the following arguments:
1688              
1689             =over 3
1690              
1691             =item C<$bin_stream_ref>
1692              
1693             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1694             must be a non-empty binary string.
1695              
1696             =back
1697              
1698             =cut
1699             sub decode_offsetfetch_response {
1700 0     0 1 0 my ( $bin_stream_ref ) = @_;
1701              
1702 0         0 my @data = unpack( $_decode_offsetfetch_response_template, $$bin_stream_ref );
1703              
1704 0         0 my $i = 0;
1705 0         0 my $OffsetFetch_Response = {};
1706              
1707 0         0 $OffsetFetch_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1708              
1709 0         0 my $topics_array = $OffsetFetch_Response->{topics} = [];
1710 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1711 0         0 while ( $topics_array_size-- ) {
1712 0         0 my $topic = {
1713             TopicName => $data[ $i++ ], # TopicName
1714             };
1715              
1716 0         0 my $Partition_array = $topic->{partitions} = [];
1717 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1718 0         0 while ( $Partition_array_size-- ) {
1719 0         0 my $Partition = {
1720             Partition => $data[ $i++ ], # Partition
1721             Offset => $data[ $i++ ], # Partition
1722             Metadata => $data[ $i++ ], # Partition
1723             ErrorCode => $data[ $i++ ], # ErrorCode
1724             };
1725 0         0 push( @$Partition_array, $Partition);
1726             }
1727 0         0 push( @$topics_array, $topic);
1728             }
1729 0         0 return $OffsetFetch_Response;
1730             }
1731              
1732             #-- private functions ----------------------------------------------------------
1733              
1734             # Generates a template to encrypt the request header
1735             sub _encode_request_header {
1736 10280     10280   18412 my ( $request, $api_key, $request_ref ) = @_;
1737              
1738             # we need to find out which API version to use for the request (and the
1739             # response). $request_ref->{ApiVersion} can be specified by the end user,
1740             # or providede by by Kafka::Connection ( see
1741             # Kafka::Connection::_get_api_versions() ). If not provided, we
1742             # default to $DEFAULT_APIVERSION
1743 10280   66     31039 my $api_version = $request_ref->{ApiVersion} // $DEFAULT_APIVERSION;
1744 10280         24381 @{ $request->{data} } = (
1745             # Size
1746             $api_key, # ApiKey
1747             $api_version, # ApiVersion
1748             $request_ref->{CorrelationId}, # CorrelationId
1749 10280         15553 );
1750 10280         18759 $request->{template} = $_Request_header_template;
1751 10280         13477 $request->{len} = $_Request_header_length;
1752 10280         21650 _encode_string( $request, $request_ref->{ClientId} ); # ClientId
1753              
1754 10280         14353 return $api_version;
1755             }
1756              
1757             # Generates a template to decrypt the fetch response body
1758             sub _decode_fetch_response_template {
1759 5021     5021   8393 my ( $response, $api_version ) = @_;
1760              
1761 5021         6960 my $is_v1 = $api_version == 1;
1762 5021         6436 my $is_v2 = $api_version == 2;
1763 5021         6149 my $is_v3 = $api_version == 3;
1764              
1765 5021 50 33     21693 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1766 0         0 $response->{template} = $_FetchResponse_header_template_v1;
1767 0         0 $response->{stream_offset} = $_FetchResponse_header_length_v1; # bytes before topics array size
1768             # [l] Size
1769             # [l] CorrelationId
1770             # [l] throttle_time_ms
1771             } else {
1772 5021         9429 $response->{template} = $_FetchResponse_header_template;
1773 5021         8043 $response->{stream_offset} = $_FetchResponse_header_length; # bytes before topics array size
1774             # [l] Size
1775             # [l] CorrelationId
1776             }
1777             my $topics_array_size = unpack(
1778             q{x[}.$response->{stream_offset} . q{]}
1779             .q{l>}, # topics array size
1780 5021         12106 ${ $response->{bin_stream} }
  5021         13799  
1781             );
1782 5021         9285 $response->{stream_offset} += 4; # bytes before TopicName length
1783             # [l] topics array size
1784              
1785 5021         7191 my ( $TopicName_length, $partitions_array_size );
1786 5021         10510 while ( $topics_array_size-- ) {
1787             $TopicName_length = unpack(
1788             q{x[}.$response->{stream_offset}
1789             .q{]s>}, # TopicName length
1790 5021         8424 ${ $response->{bin_stream} }
  5021         10539  
1791             );
1792             $response->{stream_offset} += # bytes before partitions array size
1793 5021         8308 2 # [s] TopicName length
1794             + $TopicName_length # TopicName
1795             ;
1796             $partitions_array_size = unpack(
1797             q{x[}.$response->{stream_offset}
1798             .q{]l>}, # partitions array size
1799 5021         7600 ${ $response->{bin_stream} }
  5021         8918  
1800             );
1801 5021         7155 $response->{stream_offset} += 4; # bytes before Partition
1802             # [l] partitions array size
1803              
1804 5021         9449 $response->{template} .= $_FetchResponse_topic_body_template;
1805 5021         6431 $response->{stream_offset} += $_FetchResponse_topic_body_length; # (without TopicName and partitions array size)
1806             # bytes before MessageSetSize
1807             # TopicName
1808             # [l] # partitions array size
1809             # [l] Partition
1810             # [s] ErrorCode
1811             # [q] HighwaterMarkOffset
1812              
1813 5021         8732 _decode_MessageSet_template( $response );
1814             }
1815              
1816 5021         7177 return;
1817             }
1818              
1819             # Decrypts MessageSet
1820             sub _decode_MessageSet_array {
1821 10054     10054   177441 my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
1822             # $_override_offset should not be set manually, it's used when recursing,
1823             # to decode internal compressed message, which have offsets starting at 0,
1824             # 1, etc, and instead we need to set the external wrapping message offset.
1825              
1826 10054         15067 my $data = $response->{data};
1827 10054         11985 my $data_array_size = scalar @{ $data };
  10054         15507  
1828              
1829             # NOTE: not all messages can be returned
1830 10054         15388 my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
1831 10054   100     35515 while ( $MessageSetSize && $$i_ref < $data_array_size ) {
1832              
1833 25081         43930 my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
1834 25081 100       43162 if (defined $_override_offset) {
1835 10         12 $message_offset = $_override_offset;
1836             }
1837             $Message = {
1838 25081         45244 Offset => $message_offset, # Offset
1839             };
1840              
1841 25081         35340 $MessageSize = $data->[ $$i_ref++ ]; # MessageSize
1842             # NOTE: The CRC is the CRC32 of the remainder of the message bytes.
1843             # This is used to check the integrity of the message on the broker and consumer:
1844             # MagicByte + Attributes + Key length + Key + Value length + Value
1845 25081         31172 $Crc = $data->[ $$i_ref++ ]; # Crc
1846             # WARNING: The current version of the module does not support the following:
1847             # A message set is also the unit of compression in Kafka,
1848             # and we allow messages to recursively contain compressed message sets to allow batch compression.
1849 25081         35167 $Message->{MagicByte} = $data->[ $$i_ref++ ]; # MagicByte
1850 25081         34567 my $is_v1_msg_format = $Message->{MagicByte} == 1;
1851 25081         36379 $Message->{Attributes} = $data->[ $$i_ref++ ]; # Attributes
1852 25081 50       39404 if ($is_v1_msg_format) {
1853 0         0 $Message->{Timestamp} = $data->[ $$i_ref++ ]; # Timestamp
1854             }
1855              
1856 25081         34464 $Key_length = $data->[ $$i_ref++ ]; # Key length
1857 25081 50       51381 $Message->{Key} = $Key_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Key
1858 25081         31981 $Value_length = $data->[ $$i_ref++ ]; # Value length
1859 25081 50       49335 $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Value
1860              
1861 25081 100       46409 if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
1862 6         46 my $decompressed;
1863 6 100       24 if ( $compression_codec == $COMPRESSION_GZIP ) {
    100          
1864             try {
1865 3     3   446 $decompressed = gunzip( $Message->{Value} );
1866             } catch {
1867 1     1   28 _error( $ERROR_COMPRESSION, format_message( 'gunzip failed: %s', $_ ) );
1868 3         40 };
1869             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1870 2         11 my ( $header, $x_version, $x_compatversion, undef ) = unpack( q{a[8]L>L>L>}, $Message->{Value} ); # undef - $x_length
1871              
1872             # Special thanks to Colin Blower
1873 2 50       9 if ( $header eq "\x82SNAPPY\x00" ) {
1874             # Found a xerial header.... nonstandard snappy compression header, remove the header
1875 0 0 0     0 if ( $x_compatversion == 1 && $x_version == 1 ) {
1876 0         0 $Message->{Value} = substr( $Message->{Value}, 20 ); # 20 = q{a[8]L>L>L>}
1877             } else {
1878             #warn("V $x_version and comp $x_compatversion");
1879 0         0 _error( $ERROR_COMPRESSION, format_message( 'Snappy compression with incompatible xerial header version found (x_version = %s, x_compatversion = %s)', $x_version, $x_compatversion ) );
1880             }
1881             }
1882              
1883             $decompressed = Compress::Snappy::decompress( $Message->{Value} )
1884 2   33     19 // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
1885             } else {
1886 1         8 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1887             }
1888 4 50       58 _error( $ERROR_COMPRESSION, 'Decompression produced empty result' ) unless defined $decompressed;
1889 4         7 my @data;
1890 4         8 my $Value_length = length $decompressed;
1891 4         16 my $resp = {
1892             data => \@data,
1893             bin_stream => \$decompressed,
1894             stream_offset => 0,
1895             };
1896 4         15 _decode_MessageSet_sized_template( $Value_length, $resp );
1897 4         7 @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
  4         32  
1898 4         9 my $i = 0; # i_ref
1899 4         8 my $size = length( $decompressed );
1900 4         23 _decode_MessageSet_array(
1901             $resp,
1902             $size, # message set size
1903             \$i, # i_ref
1904             $MessageSet_array_ref,
1905             $message_offset
1906             );
1907             } else {
1908 25075         39209 push( @$MessageSet_array_ref, $Message );
1909             }
1910              
1911 25079         71027 $MessageSetSize -= 12
1912             # [q] Offset
1913             # [l] MessageSize
1914             + $MessageSize # Message
1915             ;
1916             }
1917              
1918 10052         17855 return;
1919             }
1920              
1921             # Generates a template to encrypt MessageSet
1922             sub _encode_MessageSet_array {
1923 10127     10127   39209 my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;
1924              
1925 10127         14308 my ( $MessageSize, $Key, $Value, $key_length, $value_length, $message_body, $message_set );
1926              
1927 10127 100       19061 if ( $compression_codec ) {
1928 4         10 foreach my $MessageSet ( @$MessageSet_array_ref ) {
1929 10         17 $key_length = length( $Key = $MessageSet->{Key} );
1930 10         18 $value_length = length( $Value = $MessageSet->{Value} );
1931              
1932 10 50       96 $message_body = pack(
    50          
    50          
    50          
1933             q{ccl>} # MagicByte
1934             # Attributes
1935             # Key length
1936             .( $key_length ? qq{a[$key_length]} : q{} ) # Key
1937             .q{l>} # Value length
1938             .( $value_length ? qq{a[$value_length]} : q{} ) # Value
1939             ,
1940             0,
1941             $COMPRESSION_NONE, # According to Apache Kafka documentation:
1942             # The lowest 2 bits contain the compression codec used for the message.
1943             # The other bits should be set to 0.
1944             $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ),
1945             $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ),
1946             );
1947              
1948 10         60 $message_set .= pack( qq(x[8]l>l>), # 8 Offset ($PRODUCER_ANY_OFFSET)
1949             length( $message_body ) + 4, # [l] MessageSize ( $message_body + Crc )
1950             crc32( $message_body ) # [l] Crc
1951             ).$message_body;
1952             }
1953              
1954             $MessageSet_array_ref = [
1955             {
1956 4         19 Offset => $PRODUCER_ANY_OFFSET,
1957             Key => $Key,
1958             }
1959             ];
1960              
1961             # Compression
1962 4 100       16 if ( $compression_codec == $COMPRESSION_GZIP ) {
    50          
1963 2   33     291 $MessageSet_array_ref->[0]->{Value} = gzip( $message_set )
1964             // _error( $ERROR_COMPRESSION, 'Unable to compress gzip data' );
1965             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1966 2   33     64 $MessageSet_array_ref->[0]->{Value} = Compress::Snappy::compress( $message_set )
1967             // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
1968             } else {
1969 0         0 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1970             }
1971             }
1972              
1973 10127         12960 my $data = $request->{data};
1974 10127         13332 my $MessageSetSize = 0;
1975 10127         11415 my %sizes;
1976 10127         15162 foreach my $MessageSet ( @$MessageSet_array_ref ) {
1977             $MessageSetSize +=
1978             12 # [q] Offset
1979             # [l] MessageSize
1980             + ( $sizes{ $MessageSet } = # MessageSize
1981             10 # [l] Crc
1982             # [c] MagicByte
1983             # [c] Attributes
1984             # [l] Key length
1985             + length( $MessageSet->{Key} //= q{} ) # Key
1986             + 4 # [l] Value length
1987 25154   50     110054 + length( $MessageSet->{Value} //= q{} ) # Value
      50        
1988             ) # MessageSize
1989             ;
1990             }
1991 10127         15585 push( @$data, $MessageSetSize );
1992 10127         15650 $request->{template} .= q{l>}; # MessageSetSize
1993 10127         13261 $request->{len} += 4;
1994              
1995 10127         14050 foreach my $MessageSet ( @$MessageSet_array_ref ) {
1996             push( @$data,
1997             _pack64( $MessageSet->{Offset} ), # Offset (It may be $PRODUCER_ANY_OFFSET)
1998 25154         45005 ( $MessageSize = $sizes{ $MessageSet } ), # MessageSize
1999             );
2000 25154         39721 $request->{template} .= $_MessageSet_template;
2001 25154         29487 $request->{len} += $_MessageSet_length;
2002              
2003 25154         33134 $key_length = length( $Key = $MessageSet->{Key} );
2004 25154         32155 $value_length = length( $Value = $MessageSet->{Value} );
2005              
2006 25154 50 66     146875 $message_body = pack(
    50          
    50          
    50          
2007             q{ccl>} # MagicByte
2008             # Attributes
2009             # Key length
2010             .( $key_length ? qq{a[$key_length]} : q{} ) # Key
2011             .q{l>} # Value length
2012             .( $value_length ? qq{a[$value_length]} : q{} ) # Value
2013             ,
2014             0,
2015             $compression_codec // $COMPRESSION_NONE, # According to Apache Kafka documentation:
2016             # The lowest 2 bits contain the compression codec used for the message.
2017             # The other bits should be set to 0.
2018             $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ),
2019             $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ),
2020             );
2021              
2022 25154         83383 push( @$data, crc32( $message_body ), $message_body );
2023             # Message
2024 25154         49053 $request->{template} .= q{l>a[} # Crc
2025             .( $MessageSize - 4 ) # 4 Crc
2026             .qq{]};
2027             # Message body:
2028             # MagicByte
2029             # Attributes
2030             # Key length
2031             # Key
2032             # Value length
2033             # Value
2034 25154         42743 $request->{len} += $MessageSize; # Message
2035             }
2036              
2037 10127         84426 return;
2038             }
2039              
2040             # Generates a template to decrypt MessageSet
2041             sub _decode_MessageSet_template {
2042 10050     10050   242468 my ( $response ) = @_;
2043              
2044             my $MessageSetSize = unpack(
2045             q{x[}.$response->{stream_offset}
2046             .q{]l>}, # MessageSetSize
2047 10050         17781 ${ $response->{bin_stream} }
  10050         18544  
2048             );
2049 10050         17515 $response->{template} .= q{l>}; # MessageSetSize
2050 10050         13316 $response->{stream_offset} += 4; # bytes before Offset
2051              
2052 10050         18348 return _decode_MessageSet_sized_template($MessageSetSize, $response);
2053             }
2054              
2055             sub _decode_MessageSet_sized_template {
2056 10054     10054   16595 my ( $MessageSetSize, $response ) = @_;
2057              
2058 10054         12176 my $bin_stream_length = length ${ $response->{bin_stream} };
  10054         16270  
2059              
2060 10054         14788 my ( $local_template, $MessageSize, $MagicByte, $Key_length, $Value_length );
2061             CREATE_TEMPLATE:
2062 10054         17875 while ( $MessageSetSize ) {
2063             # Not the full MessageSet
2064             # 22 is the minimal size of a v0 message format until (and including) the Key
2065             # Length. If the message version is v1, then it's 22 + 8. We'll check later
2066 25087 100       40244 last CREATE_TEMPLATE if $MessageSetSize < 22;
2067              
2068             # [q] Offset
2069             # [l] MessageSize
2070             # [l] Crc
2071             # [c] MagicByte
2072             # [c] Attributes
2073             # ( [q] Timestamp ) message format v1 only
2074             # [l] Key length
2075             # [l] Value length
2076              
2077 25085         28616 $local_template = q{};
2078             MESSAGE_SET:
2079             {
2080 25085         24954 $response->{stream_offset} += 8; # the size of the offset data value
  25085         29121  
2081             ($MessageSize, $MagicByte) = unpack(
2082             q{x[}.$response->{stream_offset}.q{]}
2083             .q{l>} # MessageSize
2084             .q{x[l]} # Crc
2085             .q{c}, # MagicByte
2086              
2087 25085         38674 ${ $response->{bin_stream} }
  25085         60761  
2088             );
2089              
2090 25085         39762 my $is_v1_msg_format = $MagicByte == 1;
2091 25085 50 33     41522 if ($is_v1_msg_format && $MessageSetSize < (22+8)) {
2092             # Not the full MessageSet
2093 0         0 $local_template = q{};
2094 0         0 last MESSAGE_SET;
2095             }
2096 25085 50       37372 $local_template .= ( $is_v1_msg_format ? $_Message_template_with_timestamp : $_Message_template );
2097              
2098 25085 50       35799 $response->{stream_offset} += ($is_v1_msg_format ? 18: 10);
2099              
2100             $Key_length = unpack(
2101             q{x[}.$response->{stream_offset}
2102             .q{]l>}, # Key length
2103 25085         33960 ${ $response->{bin_stream} }
  25085         42603  
2104             );
2105              
2106 25085         32140 $response->{stream_offset} += 4; # bytes before Key or Value length
2107             # [l] Key length
2108 25085 50       37414 $response->{stream_offset} += $Key_length # bytes before Key
2109             if $Key_length != $NULL_BYTES_LENGTH; # Key
2110 25085 100       38261 if ( $bin_stream_length >= $response->{stream_offset} + 4 ) { # + [l] Value length
2111 25083 50       36420 $local_template .= $_Key_or_Value_template
2112             if $Key_length != $NULL_BYTES_LENGTH;
2113             } else {
2114             # Not the full MessageSet
2115 2         4 $local_template = q{};
2116 2         5 last MESSAGE_SET;
2117             }
2118              
2119 25083         27053 $local_template .= q{l>}; # Value length
2120             $Value_length = unpack(
2121             q{x[}.$response->{stream_offset}
2122             .q{]l>}, # Value length
2123 25083         31751 ${ $response->{bin_stream} }
  25083         39155  
2124             );
2125             $response->{stream_offset} += # bytes before Value or next Message
2126 25083         31277 4 # [l] Value length
2127             ;
2128 25083 50       40007 $response->{stream_offset} += $Value_length # bytes before next Message
2129             if $Value_length != $NULL_BYTES_LENGTH; # Value
2130 25083 100       34225 if ( $bin_stream_length >= $response->{stream_offset} ) {
2131 25081 50       46133 $local_template .= $_Key_or_Value_template
2132             if $Value_length != $NULL_BYTES_LENGTH;
2133             } else {
2134             # Not the full MessageSet
2135 2         5 $local_template = q{};
2136 2         5 last MESSAGE_SET;
2137             }
2138             }
2139              
2140 25085 100       33244 if ( $local_template ) {
2141 25081         35479 $response->{template} .= $local_template;
2142 25081         43900 $MessageSetSize -= 12
2143             # [q] Offset
2144             # [l] MessageSize
2145             + $MessageSize # Message
2146             ;
2147             } else {
2148 4         7 last CREATE_TEMPLATE;
2149             }
2150             }
2151              
2152 10054         23721 return;
2153             }
2154              
2155             # Generates a template to encrypt string
2156             sub _encode_string {
2157 30789     30789   1132333 my ( $request, $string ) = @_;
2158              
2159 30789 100       51455 if ( $string eq q{} ) {
2160 129         296 push( @{ $request->{data} }, 0 );
  129         489  
2161             } else {
2162 30660         37325 my $string_length = length $string;
2163 30660         33537 push( @{ $request->{data} }, $string_length, $string );
  30660         50213  
2164 30660         56792 $request->{template} .= q{a*}; # string;
2165 30660         42531 $request->{len} += $string_length;
2166             }
2167              
2168 30789         42727 return;
2169             }
2170              
2171             # Handler for errors
2172             sub _error {
2173 2     2   10 Kafka::Exception::Protocol->throw( throw_args( @_ ) );
2174              
2175 0           return;
2176             }
2177              
2178             1;
2179              
2180             __END__