File Coverage

lib/Kafka/Protocol.pm
Criterion Covered Total %
statement 441 611 72.1
branch 69 126 54.7
condition 26 63 41.2
subroutine 41 50 82.0
pod 18 18 100.0
total 595 868 68.5


line stmt bran cond sub pod time code
1             package Kafka::Protocol;
2              
3             =head1 NAME
4              
5             Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Protocol> version 1.08 .
10              
11             =cut
12              
13 13     13   19723 use 5.010;
  13         43  
14 13     13   78 use strict;
  13         25  
  13         357  
15 13     13   115 use warnings;
  13         27  
  13         768  
16              
17             our $VERSION = 'v1.08';
18              
19 13         1108 use Exporter qw(
20             import
21 13     13   88 );
  13         22  
22             our @EXPORT_OK = qw(
23             decode_fetch_response
24             decode_metadata_response
25             decode_offset_response
26             decode_produce_response
27             decode_api_versions_response
28             decode_find_coordinator_response
29             decode_offsetcommit_response
30             decode_offsetfetch_response
31             decode_saslhandshake_response
32             encode_fetch_request
33             encode_metadata_request
34             encode_offset_request
35             encode_produce_request
36             encode_api_versions_request
37             encode_find_coordinator_request
38             encode_offsetcommit_request
39             encode_offsetfetch_request
40             encode_saslhandshake_request
41             _decode_MessageSet_template
42             _decode_MessageSet_array
43             _encode_Message
44             _encode_MessageSet_array
45             _encode_string
46             _pack64
47             _unpack64
48             _verify_string
49             $DEFAULT_APIVERSION
50             $IMPLEMENTED_APIVERSIONS
51             $BAD_OFFSET
52             $COMPRESSION_CODEC_MASK
53             $CONSUMERS_REPLICAID
54             $NULL_BYTES_LENGTH
55             $_int64_template
56             );
57              
58 13     13   6247 use Compress::Snappy ();
  13         7001  
  13         329  
59 13     13   6309 use Compress::LZ4Frame ();
  13         8108  
  13         346  
60 13     13   97 use Const::Fast;
  13         21  
  13         126  
61 13     13   6697 use Gzip::Faster qw( gzip gunzip );
  13         16276  
  13         898  
62 13         709 use Params::Util qw(
63             _ARRAY
64             _HASH
65             _SCALAR
66             _STRING
67 13     13   100 );
  13         22  
68 13         534 use Scalar::Util qw(
69             dualvar
70 13     13   82 );
  13         29  
71 13     13   5942 use String::CRC32;
  13         7240  
  13         594  
72 13     13   93 use Try::Tiny;
  13         27  
  13         814  
73              
74 13         2305 use Kafka qw(
75             $BITS64
76             $BLOCK_UNTIL_IS_COMMITTED
77             $COMPRESSION_GZIP
78             $COMPRESSION_NONE
79             $COMPRESSION_SNAPPY
80             $COMPRESSION_LZ4
81             $DEFAULT_MAX_WAIT_TIME
82             %ERROR
83             $ERROR_COMPRESSION
84             $ERROR_MISMATCH_ARGUMENT
85             $ERROR_REQUEST_OR_RESPONSE
86             $NOT_SEND_ANY_RESPONSE
87             $WAIT_WRITTEN_TO_LOCAL_LOG
88 13     13   107 );
  13         21  
89 13     13   848 use Kafka::Exceptions;
  13         34  
  13         837  
90 13         101453 use Kafka::Internals qw(
91             $APIKEY_FETCH
92             $APIKEY_METADATA
93             $APIKEY_OFFSET
94             $APIKEY_PRODUCE
95             $APIKEY_APIVERSIONS
96             $APIKEY_FINDCOORDINATOR
97             $APIKEY_OFFSETCOMMIT
98             $APIKEY_OFFSETFETCH
99             $APIKEY_SASLHANDSHAKE
100             $PRODUCER_ANY_OFFSET
101             format_message
102             debug_level
103 13     13   129 );
  13         35  
104              
105             =head1 SYNOPSIS
106              
107             use 5.010;
108             use strict;
109             use warnings;
110              
111             use Data::Compare;
112             use Kafka qw(
113             $COMPRESSION_NONE
114             $ERROR_NO_ERROR
115             $REQUEST_TIMEOUT
116             $WAIT_WRITTEN_TO_LOCAL_LOG
117             );
118             use Kafka::Internals qw(
119             $PRODUCER_ANY_OFFSET
120             );
121             use Kafka::Protocol qw(
122             decode_produce_response
123             encode_produce_request
124             );
125              
126             # a encoded produce request hex stream
127             my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
128              
129             # a decoded produce request
130             my $decoded = {
131             CorrelationId => 4,
132             ClientId => q{},
133             RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG,
134             Timeout => $REQUEST_TIMEOUT * 100, # ms
135             topics => [
136             {
137             TopicName => 'mytopic',
138             partitions => [
139             {
140             Partition => 0,
141             MessageSet => [
142             {
143             Offset => $PRODUCER_ANY_OFFSET,
144             MagicByte => 0,
145             Attributes => $COMPRESSION_NONE,
146             Key => q{},
147             Value => 'Hello!',
148             },
149             ],
150             },
151             ],
152             },
153             ],
154             };
155              
156             my $encoded_request = encode_produce_request( $decoded );
157             say 'encoded correctly' if $encoded_request eq $encoded;
158              
159             # a encoded produce response hex stream
160             $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
161              
162             # a decoded produce response
163             $decoded = {
164             CorrelationId => 4,
165             topics => [
166             {
167             TopicName => 'mytopic',
168             partitions => [
169             {
170             Partition => 0,
171             ErrorCode => $ERROR_NO_ERROR,
172             Offset => 0,
173             },
174             ],
175             },
176             ],
177             };
178              
179             my $decoded_response = decode_produce_response( \$encoded );
180             say 'decoded correctly' if Compare( $decoded_response, $decoded );
181              
182             # more examples, see t/*_decode_encode.t
183              
184             =head1 DESCRIPTION
185              
186             This module is not a user module.
187              
188             In order to achieve better performance,
189             functions of this module do not perform arguments validation.
190              
191             The main features of the C<Kafka::Protocol> module are:
192              
193             =over 3
194              
195             =item *
196              
197             Supports parsing the Apache Kafka protocol.
198              
199             =item *
200              
201             Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
202             Within this package we currently support
203             access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
204              
205             =item *
206              
207             Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
208              
209             =back
210              
211             =cut
212              
213             # A Guide To The Kafka Protocol 0.8:
214             # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
215             #
216             # -- Protocol Primitive Types
217             # int8, int16, int32, int64
218             # Signed integers
219             # stored in big endian order.
220             # bytes, string
221             # consist of a signed integer
222             # giving a length N
223             # followed by N bytes of content.
224             # A length of -1 indicates null.
225             # string uses an int16 for its size,
226             # and bytes uses an int32.
227             # Arrays
228             # These will always be encoded as an int32 size containing the length N
229             # followed by N repetitions of the structure
230             # which can itself be made up of other primitive types.
231             #
232             # -- N.B.
233             # - The response will always match the paired request
234             # - One structure common to both the produce and fetch requests is the message set format.
235             # - MessageSets are not preceded by an int32 like other array elements in the protocol.
236             # - A message set is also the unit of compression in Kafka,
237             # and we allow messages to recursively contain compressed message sets.
238             #
239             # -- Protocol Fields
240             # ApiKey => int16 That identifies the API being invoked
241             # ApiVersion => int16 This is a numeric version number for this api.
242             # Currently the supported version for all APIs is 0.
243             # Attributes => int8 Metadata attributes about the message.
244             # The lowest 2 bits contain the compression codec used for the message.
245             # ClientId => string This is a user supplied identifier for the client application.
246             # CorrelationId => int32 This is a user-supplied integer.
247             # It will be passed back in the response by the server, unmodified.
248             # It is useful for matching request and response between the client and server.
249             # Crc => int32 The CRC32 of the remainder of the message bytes.
250             # ErrorCode => int16 The error from this partition, if any.
251             # Errors are given on a per-partition basis
252             # because a given partition may be unavailable or maintained on a different host,
253             # while others may have successfully accepted the produce request.
254             # FetchOffset => int64 The offset to begin this fetch from.
255             # HighwaterMarkOffset => int64 The offset at the end of the log for this partition.
256             # This can be used by the client to determine how many messages behind the end of the log they are.
257             # - 0.8 documents: Replication design
258             # The high watermark is the offset of the last committed message.
259             # Each log is periodically synced to disks.
260             # Data before the flushed offset is guaranteed to be persisted on disks.
261             # As we will see, the flush offset can be before or after high watermark.
262             # - 0.7 documents: Wire protocol
263             # If the last segment file for the partition is not empty and was modified earlier than TIME,
264             # it will return both the first offset for that segment and the high water mark.
265             # The high water mark is not the offset of the last message,
266             # but rather the offset that the next message sent to the partition will be written to.
267             # Host => string The brokers hostname
268             # Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
269             # Key => bytes An optional message key
270             # The key can be null.
271             # Leader => int32 The node id for the kafka broker currently acting as leader for this partition.
272             # If no leader exists because we are in the middle of a leader election this id will be -1.
273             # MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format.
274             # MaxBytes => int32 The maximum bytes to include in the message set for this partition.
275             # MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets
276             # MaxWaitTime => int32 The maximum amount of time (ms)
277             # to block waiting
278             # if insufficient data is available at the time the request is issued.
279             # MessageSetSize => int32 The size in bytes of the message set for this partition
280             # MessageSize => int32 The size of the subsequent request or response message in bytes
281             # MinBytes => int32 The minimum number of bytes of messages that must be available to give a response.
282             # If the client sets this to 0 the server will always respond immediately.
283             # If this is set to 1,
284             # the server will respond as soon
285             # as at least one partition
286             # has at least 1 byte of data
287             # or the specified timeout occurs.
288             # By setting higher values
289             # in combination with the timeout
290             # for reading only large chunks of data
291             # NodeId => int32 The id of the broker.
292             # This must be set to a unique integer for each broker.
293             # Offset => int64 The offset used in kafka as the log sequence number.
294             # When the producer is sending messages it doesn't actually know the offset
295             # and can fill in any value here it likes.
296             # Partition => int32 The id of the partition the fetch is for
297             # or the partition that data is being published to
298             # or the partition this response entry corresponds to.
299             # Port => int32 The brokers port
300             # ReplicaId => int32 Indicates the node id of the replica initiating this request.
301             # Normal client consumers should always specify this as -1 as they have no node id.
302             # Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition.
303             # RequiredAcks => int16 Indicates how many acknowledgements the servers should receive
304             # before responding to the request.
305             # If it is 0 the server does not send any response.
306             # If it is 1, the server will wait the data is written to the local log before sending a response.
307             # If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
308             # Size => int32 The size of the subsequent request or response message in bytes
309             # Time => int64 Used to ask for all messages before a certain time (ms).
310             # There are two special values.
311             # Specify -1 to receive the latest offset (this will only ever return one offset).
312             # Specify -2 to receive the earliest available offsets.
313             # Timeout => int32 This provides a maximum time (ms) the server can await the receipt
314             # of the number of acknowledgements in RequiredAcks.
315             # The timeout is not an exact limit on the request time for a few reasons:
316             # (1) it does not include network latency,
317             # (2) the timer begins at the beginning of the processing of this request
318             # so if many requests are queued due to server overload
319             # that wait time will not be included,
320             # (3) we will not terminate a local write
321             # so if the local write time exceeds this timeout it will not be respected.
322             # To get a hard timeout of this type the client should use the socket timeout.
323             # TopicName => string The name of the topic.
324             # Value => bytes The actual message contents
325             # Kafka supports recursive messages in which case this may itself contain a message set.
326             # The message can be null.
327              
328             our $_int64_template; # Used to unpack a 64 bit value
329             if ( $BITS64 ) {
330             $_int64_template = q{q>};
331             # unpack a big-endian signed quad (64-bit) value on 64 bit systems.
332 35284     35284   227871 *_unpack64 = sub { $_[0] };
333             # pack a big-endian signed quad (64-bit) value on 64 bit systems.
334 40328     40328   274230 *_pack64 = sub { pack( q{q>}, $_[0] ) };
335             } else {
336             eval q{ require Kafka::Int64; } ## no critic
337             or die "Cannot load Kafka::Int64 : $@";
338              
339             $_int64_template = q{a[8]};
340             # unpack a big-endian signed quad (64-bit) value on 32 bit systems.
341             *_unpack64 = \&Kafka::Int64::unpackq;
342             # pack a big-endian signed quad (64-bit) value on 32 bit systems.
343             *_pack64 = \&Kafka::Int64::packq;
344             }
345              
346             =head2 EXPORT
347              
348             The following constants are available for export
349              
350             =cut
351              
352             =head3 C<$DEFAULT_APIVERSION>
353              
354             The default API version that will be used as fallback, if it's not possible to
355             detect what the Kafka server supports. Only Kafka servers > 0.10.0.0 can be
356             queried to get which API version they implements. On Kafka servers 0.8.x and
357             0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its
358             value is '0'
359              
360             =cut
361              
362             const our $DEFAULT_APIVERSION => 0;
363              
364             # HashRef, keys are APIKEYs, values are the api version that this protocol
365             # implements. If not populated, the connection will use the $DEFAULT_APIVERSION
366             our $IMPLEMENTED_APIVERSIONS = {};
367              
368             # Attributes
369              
370             # According to Apache Kafka documentation:
371             # Attributes - Metadata attributes about the message.
372             # The lowest 2 bits contain the compression codec used for the message.
373             const our $COMPRESSION_CODEC_MASK => 0b11;
374              
375             =head3 C<$CONSUMERS_REPLICAID>
376              
377             According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
378              
379             =cut
380             const our $CONSUMERS_REPLICAID => -1;
381              
382             =head3 C<$NULL_BYTES_LENGTH>
383              
384             According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
385              
386             =cut
387             const our $NULL_BYTES_LENGTH => -1;
388              
389             =head3 C<$BAD_OFFSET>
390              
391             According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
392             and can fill in any value here it likes.'
393              
394             =cut
395             const our $BAD_OFFSET => -1;
396              
397             my ( $_Request_header_template, $_Request_header_length ) = (
398             q{l>s>s>l>s>}, # Size
399             # 2 ApiKey
400             # 2 ApiVersion
401             # 4 CorrelationId
402             # 2 ClientId length
403             10 # 'Size' is not included in the calculation of length
404             );
405             my ( $_ProduceRequest_header_template, $_ProduceRequest_header_length ) = (
406             q{s>l>l>}, # 2 RequiredAcks
407             # 4 Timeout
408             # 4 topics array size
409             10
410             );
411             my ( $_MessageSet_template, $_MessageSet_length ) = (
412             # qq{${_int64_template}l>},
413             q{a[8]l>},
414             # 8 Offset
415             # 4 MessageSize
416             12
417             );
418             my ( $_FetchRequest_header_template, $_FetchRequest_header_length ) = (
419             q{l>l>l>l>},
420             # 4 ReplicaId
421             # 4 MaxWaitTime
422             # 4 MinBytes
423             # 4 topics array size
424             16
425             );
426             my ( $_FetchRequest_header_template_v3, $_FetchRequest_header_length_v3 ) = (
427             q{l>l>l>l>l>},
428             # 4 ReplicaId
429             # 4 MaxWaitTime
430             # 4 MinBytes
431             # 4 MaxBytes
432             # 4 topics array size
433             20
434             );
435             my ( $_FetchRequest_body_template, $_FetchRequest_body_length ) = (
436             # qq{l>${_int64_template}l>},
437             q{l>a[8]l>},
438             # 4 Partition
439             # 8 FetchOffset
440             # 4 MaxBytes
441             16
442             );
443             my ( $_OffsetRequest_header_template, $_OffsetRequest_header_length ) = (
444             q{l>l>}, # 4 ReplicaId
445             # 4 topics array size
446             8
447             );
448             my ( $_OffsetRequest_body_template, $_OffsetRequest_body_length ) = (
449             # qq{l>${_int64_template}l>},
450             q{l>a[8]l>},
451             # 4 Partition
452             # 8 Time
453             # 4 MaxNumberOfOffsets
454             16
455             );
456             my ( $_OffsetRequest_body_template_v1, $_OffsetRequest_body_length_v1 ) = (
457             # qq{l>${_int64_template}},
458             q{l>a[8]},
459             # 4 Partition
460             # 8 Time
461             12
462             );
463             my ( $_FetchResponse_header_template, $_FetchResponse_header_length ) = (
464             q{x[l]l>l>}, # Size (skip)
465             # 4 CorrelationId
466             # 4 topics array size
467             8
468             );
469             my ( $_FetchResponse_header_template_v1, $_FetchResponse_header_length_v1 ) = (
470             q{x[l]l>l>l>}, # Size (skip)
471             # 4 CorrelationId
472             # 4 throttle_time_ms
473             # 4 topics array size
474             12
475             );
476             my ( $_Message_template, $_Message_length ) = (
477             qq(${_int64_template}l>l>ccl>),
478             # 8 Offset
479             # MessageSize
480             # Crc
481             # MagicByte
482             # Attributes
483             # Key length
484             8 # Only Offset length
485             );
486             my ( $_Message_template_with_timestamp, $_Message_length_with_timestamp ) = (
487             qq(${_int64_template}l>l>cc${_int64_template}l>),
488             # 8 Offset
489             # MessageSize
490             # Crc
491             # MagicByte
492             # Attributes
493             # Timestamp
494             # Key length
495             8 # Only Offset length
496             );
497              
498             my ( $_FetchResponse_topic_body_template, $_FetchResponse_topic_body_length )= (
499             qq(s>/al>l>s>${_int64_template}),
500             # TopicName
501             # partitions array size
502             # 4 Partition
503             # 2 ErrorCode
504             # 8 HighwaterMarkOffset
505             14 # without TopicName and partitions array size
506             );
507             my $_Key_or_Value_template = q{X[l]l>/a}; # Key or Value
508              
509             #-- public functions -----------------------------------------------------------
510              
511             =head2 FUNCTIONS
512              
513             The following functions are available for C<Kafka::MockProtocol> module.
514              
515             =cut
516              
517             =head3 C<encode_api_versions_request( $ApiVersions_Request )>
518              
519             Encodes the argument and returns a reference to the encoded binary string
520             representing a Request buffer.
521              
522             This function takes the following arguments:
523              
524             =over 3
525              
526             =item C<$ApiVersions_Request>
527              
528             C<$ApiVersions_Request> is a reference to the hash representing the structure
529             of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty
530             string), and ApiVersion (must be 0)
531              
532             =back
533              
534             =cut
535              
536             $IMPLEMENTED_APIVERSIONS->{$APIKEY_APIVERSIONS} = 0;
537             sub encode_api_versions_request {
538 0     0 1 0 my ( $ApiVersions_Request ) = @_;
539              
540 0         0 my @data;
541 0         0 my $request = {
542             # template => '...',
543             # len => ...,
544             data => \@data,
545             };
546              
547 0         0 _encode_request_header( $request, $APIKEY_APIVERSIONS, $ApiVersions_Request );
548             # Size
549             # ApiKey
550             # ApiVersion
551             # CorrelationId
552             # ClientId
553              
554 0         0 return pack( $request->{template}, $request->{len}, @data );
555             }
556              
557             my $_decode_api_version_response_template = q{x[l]l>s>l>X[l]l>/(s>s>s>)};
558             # x[l] # Size (skip)
559             # l> # CorrelationId
560             # s> # ErrorCode
561             # l> # ApiVersions array size
562             # X[l]
563             # l>/( # ApiVersions array
564             # s> # ApiKey
565             # s> # MinVersion
566             # s> # MaxVersion
567             # )
568              
569             =head3 C<decode_api_versions_response( $bin_stream_ref )>
570              
571             Decodes the argument and returns a reference to the hash representing
572             the structure of the APIVERSIONS Response.
573              
574             This function takes the following arguments:
575              
576             =over 3
577              
578             =item C<$bin_stream_ref>
579              
580             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
581             must be a non-empty binary string.
582              
583             =back
584              
585             =cut
586              
587             sub decode_api_versions_response {
588 0     0 1 0 my ( $bin_stream_ref ) = @_;
589              
590 0         0 my @data = unpack( $_decode_api_version_response_template, $$bin_stream_ref );
591              
592 0         0 my $i = 0;
593 0         0 my $ApiVersions_Response = {};
594              
595 0         0 $ApiVersions_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
596 0         0 $ApiVersions_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
597              
598 0         0 my $ApiVersions_array = $ApiVersions_Response->{ApiVersions} = [];
599 0         0 my $ApiVersions_array_size = $data[ $i++ ]; # ApiVersions array size
600 0         0 while ( $ApiVersions_array_size-- ) {
601 0         0 push( @$ApiVersions_array, {
602             ApiKey => $data[ $i++ ], # ApiKey
603             MinVersion => $data[ $i++ ], # MinVersion
604             MaxVersion => $data[ $i++ ], # MaxVersion
605             }
606             );
607             }
608              
609 0         0 return $ApiVersions_Response;
610             }
611              
612             =head3 C<encode_find_coordinator_request( $FindCoordinator_Request )>
613              
614             Encodes the argument and returns a reference to the encoded binary string
615             representing a Request buffer.
616              
617             This function takes the following arguments:
618              
619             =over 3
620              
621             =item C<$FindCoordinator_Request>
622              
623             C<$FindCoordinator_Request> is a reference to the hash representing the structure
624             of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty
625             string), CoordinatorKey and CoordinatorType (for version 1 of protocol)
626              
627             =back
628              
629             =cut
630              
631             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FINDCOORDINATOR} = 1;
632             sub encode_find_coordinator_request {
633 0     0 1 0 my ( $FindCoordinator_Request ) = @_;
634              
635 0         0 my @data;
636 0         0 my $request = {
637             # template => '...',
638             # len => ...,
639             data => \@data,
640             };
641              
642 0         0 my $api_version =
643             _encode_request_header( $request, $APIKEY_FINDCOORDINATOR, $FindCoordinator_Request );
644             # Size
645             # ApiKey
646             # ApiVersion
647             # CorrelationId
648             # ClientId
649              
650 0         0 my $coordinator_key = $FindCoordinator_Request->{CoordinatorKey};
651 0         0 my $coordinator_type = $FindCoordinator_Request->{CoordinatorType};
652              
653 0         0 $request->{template} .= q{s>}; # string length
654 0         0 $request->{len} += 2;
655 0         0 _encode_string( $request, $coordinator_key ); # CoordinatorKey (GroupId for version 0)
656 0 0       0 if ($api_version >= 1) {
657             # CoordinatorType (0 for groups)
658 0         0 $request->{template} .= q{c>};
659 0         0 $request->{len} += 1;
660 0         0 push( @{ $request->{data} }, $coordinator_type );
  0         0  
661             }
662              
663 0         0 return pack( $request->{template}, $request->{len}, @data );
664             }
665              
666             my $_decode_find_protocol_response_template = q{x[l]l>s>l>s>/al>};
667             # x[l] # Size (skip)
668             # l> # CorrelationId
669             # s> # ErrorCode
670             # l> # NodeId
671             # s>/a # Host
672             # l> # Port
673              
674             my $_decode_find_protocol_response_template_v1 = q{x[l]l>l>s>l>s>/al>};
675             # x[l] # Size (skip)
676             # l> # CorrelationId
677             # l> # throttle_time_ms
678             # s> # ErrorCode
679             # s>/a # ErrorMessage
680             # l> # NodeId
681             # s>/a # Host
682             # l> # Port
683              
684             =head3 C<decode_find_coordinator_response( $bin_stream_ref )>
685              
686             Decodes the argument and returns a reference to the hash representing
687             the structure of the FINDCOORDINATOR Response.
688              
689             This function takes the following arguments:
690              
691             =over 3
692              
693             =item C<$bin_stream_ref>
694              
695             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
696             must be a non-empty binary string.
697              
698             =back
699              
700             =cut
701              
702             sub decode_find_coordinator_response {
703 0     0 1 0 my ( $bin_stream_ref, $api_version ) = @_;
704              
705 0   0     0 $api_version //= $DEFAULT_APIVERSION;
706 0         0 my $is_v1 = $api_version == 1;
707              
708 0 0       0 my @data = unpack( $is_v1 ? $_decode_find_protocol_response_template_v1
709             : $_decode_find_protocol_response_template, $$bin_stream_ref );
710              
711 0         0 my $i = 0;
712 0         0 my $FindCoordinator_Response = {};
713              
714 0         0 $FindCoordinator_Response->{CorrelationId} = $data[ $i++ ];
715 0 0       0 if ($is_v1) {
716 0         0 $FindCoordinator_Response->{ThrottleTimeMs} = $data[ $i++ ]; # only v1
717             }
718 0         0 $FindCoordinator_Response->{ErrorCode} = $data[ $i++ ];
719 0 0       0 if ($is_v1) {
720 0         0 $FindCoordinator_Response->{ErrorMessage} = $data[ $i++ ]; # only v1
721             }
722 0         0 $FindCoordinator_Response->{NodeId} = $data[ $i++ ];
723 0         0 $FindCoordinator_Response->{Host} = $data[ $i++ ];
724 0         0 $FindCoordinator_Response->{Port} = $data[ $i++ ];
725              
726 0         0 return $FindCoordinator_Response;
727             }
728              
729             # PRODUCE Request --------------------------------------------------------------
730              
731             =head3 C<encode_produce_request( $Produce_Request, $compression_codec )>
732              
733             Encodes the argument and returns a reference to the encoded binary string
734             representing a Request buffer.
735              
736             This function takes the following arguments:
737              
738             =over 3
739              
740             =item C<$Produce_Request>
741              
742             C<$Produce_Request> is a reference to the hash representing
743             the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).
744              
745             =item C<$compression_codec>
746              
747             Optional.
748              
749             C<$compression_codec> sets the required type of C<$messages> compression,
750             if the compression is desirable.
751              
752             Supported codecs:
753             L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
754             L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
755             L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>,
756             L<$COMPRESSION_LZ4|Kafka/$COMPRESSION_LZ4>.
757              
758             NOTE: $COMPRESSION_LZ4 requires Kafka 0.10 or higher, as initial implementation of LZ4 in Kafka did not follow the standard LZ4 framing specification.
759              
760             =back
761              
762             =cut
763              
764             $IMPLEMENTED_APIVERSIONS->{$APIKEY_PRODUCE} = 2;
765             sub encode_produce_request {
766 5111     5111 1 13240 my ( $Produce_Request, $compression_codec ) = @_;
767              
768 5111         4510 my @data;
769 5111         7740 my $request = {
770             # template => '...',
771             # len => ...,
772             data => \@data,
773             };
774              
775 5111         9010 _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
776             # Size
777             # ApiKey
778             # ApiVersion
779             # CorrelationId
780             # ClientId
781              
782 5111         4949 my $topics_array = $Produce_Request->{topics};
783             push( @data,
784             $Produce_Request->{RequiredAcks}, # RequiredAcks
785             $Produce_Request->{Timeout}, # Timeout
786 5111         6557 scalar( @$topics_array ), # topics array size
787             );
788 5111         4888 $request->{template} .= $_ProduceRequest_header_template;
789 5111         4905 $request->{len} += $_ProduceRequest_header_length;
790              
791 5111         5989 foreach my $topic ( @$topics_array ) {
792 5111         5294 $request->{template} .= q{s>}; # string length
793 5111         4713 $request->{len} += 2;
794 5111         6769 _encode_string( $request, $topic->{TopicName} ); # TopicName
795              
796 5111         4787 my $partitions_array = $topic->{partitions};
797 5111         5064 push( @data, scalar( @$partitions_array ) );
798 5111         5118 $request->{template} .= q{l>}; # partitions array size
799 5111         4708 $request->{len} += 4;
800 5111         5278 foreach my $partition ( @$partitions_array ) {
801 5111         6791 push( @data, $partition->{Partition} );
802 5111         4561 $request->{template} .= q{l>}; # Partition
803 5111         4310 $request->{len} += 4;
804              
805 5111         7012 _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );
806             }
807             }
808              
809 5111         34414 return pack( $request->{template}, $request->{len}, @data );
810             }
811              
812             # PRODUCE Response -------------------------------------------------------------
813              
814             my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
815             # x[l] # Size (skip)
816             # l> # CorrelationId
817              
818             # l> # topics array size
819             # X[l]
820             # l>/( # topics array
821             # s>/a # TopicName
822              
823             # l> # partitions array size
824             # X[l]
825             # l>/( # partitions array
826             # l> # Partition
827             # s> # ErrorCode
828             # $_int64_template # Offset
829             # )
830             # )
831              
832             my $_decode_produce_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))l>};
833             # x[l] # Size (skip)
834             # l> # CorrelationId
835              
836             # l> # topics array size
837             # X[l]
838             # l>/( # topics array
839             # s>/a # TopicName
840              
841             # l> # partitions array size
842             # X[l]
843             # l>/( # partitions array
844             # l> # Partition
845             # s> # ErrorCode
846             # $_int64_template # Offset
847             # )
848             # )
849             # l> # Throttle_Time_Ms
850              
851             my $_decode_produce_response_template_v2 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))l>};
852             # x[l] # Size (skip)
853             # l> # CorrelationId
854              
855             # l> # topics array size
856             # X[l]
857             # l>/( # topics array
858             # s>/a # TopicName
859              
860             # l> # partitions array size
861             # X[l]
862             # l>/( # partitions array
863             # l> # Partition
864             # s> # ErrorCode
865             # $_int64_template # Offset
866             # $_int64_template # Log_Append_Time
867             # )
868             # )
869             # l> # Throttle_Time_Ms
870              
871             =head3 C<decode_produce_response( $bin_stream_ref )>
872              
873             Decodes the argument and returns a reference to the hash representing
874             the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).
875              
876             This function takes the following arguments:
877              
878             =over 3
879              
880             =item C<$bin_stream_ref>
881              
882             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
883             must be a non-empty binary string.
884              
885             =back
886              
887             =cut
888             sub decode_produce_response {
889 98     98 1 573 my ( $bin_stream_ref, $api_version ) = @_;
890              
891 98   33     511 $api_version //= $DEFAULT_APIVERSION;
892 98         173 my $is_v1 = $api_version == 1;
893 98         170 my $is_v2 = $api_version == 2;
894              
895 98 50       792 my @data = unpack( $is_v1 ? $_decode_produce_response_template_v1
    50          
896             : $is_v2 ? $_decode_produce_response_template_v2
897             : $_decode_produce_response_template, $$bin_stream_ref );
898              
899 98         232 my $i = 0;
900 98         210 my $Produce_Response = {};
901              
902 98         296 $Produce_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
903              
904 98         217 my $topics_array = $Produce_Response->{topics} = [];
905 98         197 my $topics_array_size = $data[ $i++ ]; # topics array size
906 98         294 while ( $topics_array_size-- ) {
907 98         260 my $topic = {
908             TopicName => $data[ $i++ ],
909             };
910              
911 98         251 my $partitions_array = $topic->{partitions} = [];
912 98         203 my $partitions_array_size = $data[ $i++ ]; # partitions array size
913 98         231 while ( $partitions_array_size-- ) {
914 98 50       425 my $partition = {
915             Partition => $data[ $i++ ], # Partition
916             ErrorCode => $data[ $i++ ], # ErrorCode
917             Offset => _unpack64( $data[ $i++ ] ), # Offset
918             ( $is_v2 ? (Log_Append_Time => _unpack64( $data[ $i++ ] )) : () ), # Log_Append_Time
919             };
920              
921 98         344 push( @$partitions_array, $partition );
922             }
923              
924 98         241 push( @$topics_array, $topic );
925             }
926             defined $data[ $i ]
927 98 50       338 and $Produce_Response->{Throttle_Time_Ms} = $data[ $i++ ];
928              
929 98         292 return $Produce_Response;
930             }
931              
932             # FETCH Request ----------------------------------------------------------------
933              
934             =head3 C<encode_fetch_request( $Fetch_Request )>
935              
936             Encodes the argument and returns a reference to the encoded binary string
937             representing a Request buffer.
938              
939             This function takes the following arguments:
940              
941             =over 3
942              
943             =item C<$Fetch_Request>
944              
945             C<$Fetch_Request> is a reference to the hash representing
946             the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).
947              
948             =back
949              
950             =cut
951              
952             # version 0, 1, 2 have the same request protocol.
953             $IMPLEMENTED_APIVERSIONS->{$APIKEY_FETCH} = 3;
954             sub encode_fetch_request {
955 5015     5015 1 9349 my ( $Fetch_Request ) = @_;
956              
957 5015         4794 my @data;
958 5015         7889 my $request = {
959             # template => '...',
960             # len => ...,
961             data => \@data,
962             };
963 5015         8105 my $api_version = _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
964             # Size
965             # ApiKey
966             # ApiVersion
967             # CorrelationId
968             # ClientId
969 5015         5941 my $is_v3 = $api_version == 3;
970              
971 5015         5218 push( @data, $CONSUMERS_REPLICAID ); # ReplicaId
972 5015         5170 my $topics_array = $Fetch_Request->{topics};
973             push( @data,
974             $Fetch_Request->{MaxWaitTime}, # MaxWaitTime
975             $Fetch_Request->{MinBytes}, # MinBytes
976 5015 50       8821 ( $is_v3 ? $Fetch_Request->{MaxBytes} : () ), # MaxBytes (version 3 only)
977             scalar( @$topics_array ), # topics array size
978             );
979 5015 50       7284 if ($is_v3) {
980 0         0 $request->{template} .= $_FetchRequest_header_template_v3;
981 0         0 $request->{len} += $_FetchRequest_header_length_v3;
982             } else {
983 5015         4974 $request->{template} .= $_FetchRequest_header_template;
984 5015         4400 $request->{len} += $_FetchRequest_header_length;
985             }
986              
987 5015         6601 foreach my $topic ( @$topics_array ) {
988 5015         6175 $request->{template} .= q{s>}; # string length
989 5015         5045 $request->{len} += 2;
990 5015         7312 _encode_string( $request, $topic->{TopicName} ); # TopicName
991              
992 5015         4955 my $partitions_array = $topic->{partitions};
993 5015         4766 push( @data, scalar( @$partitions_array ) );
994 5015         4793 $request->{template} .= q{l>}; # partitions array size
995 5015         5052 $request->{len} += 4;
996 5015         5303 foreach my $partition ( @$partitions_array ) {
997             push( @data,
998             $partition->{Partition}, # Partition
999             _pack64( $partition->{FetchOffset} ), # FetchOffset
1000             $partition->{MaxBytes}, # MaxBytes
1001 5015         8260 );
1002 5015         5702 $request->{template} .= $_FetchRequest_body_template;
1003 5015         7601 $request->{len} += $_FetchRequest_body_length;
1004             }
1005             }
1006              
1007 5015         24605 return pack( $request->{template}, $request->{len}, @data );
1008             }
1009              
1010             # FETCH Response ---------------------------------------------------------------
1011              
1012             =head3 C<decode_fetch_response( $bin_stream_ref )>
1013              
1014             Decodes the argument and returns a reference to the hash representing
1015             the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).
1016              
1017             This function takes the following arguments:
1018              
1019             =over 3
1020              
1021             =item C<$bin_stream_ref>
1022              
1023             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1024             must be a non-empty binary string.
1025              
1026             =back
1027              
1028             =cut
1029             sub decode_fetch_response {
1030 5021     5021 1 12410 my ( $bin_stream_ref, $api_version ) = @_;
1031              
1032 5021   33     13702 $api_version //= $DEFAULT_APIVERSION;
1033 5021         5085 my $is_v1 = $api_version == 1;
1034 5021         5018 my $is_v2 = $api_version == 2;
1035 5021         4987 my $is_v3 = $api_version == 3;
1036              
1037              
1038             # According to Apache Kafka documentation:
1039             # As an optimization the server is allowed to return a partial message at the end of the message set.
1040             # Clients should handle this case.
1041             # NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
1042              
1043 5021         4173 my @data;
1044 5021         10153 my $response = {
1045             # template => '...',
1046             # stream_offset => ...,
1047             data => \@data,
1048             bin_stream => $bin_stream_ref,
1049             };
1050              
1051 5021         9992 _decode_fetch_response_template( $response, $api_version );
1052 5021         33664 @data = unpack( $response->{template}, $$bin_stream_ref );
1053              
1054 5021         7233 my $i = 0;
1055 5021         5543 my $Fetch_Response = {};
1056              
1057 5021         7947 $Fetch_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1058 5021 50 33     19431 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1059 0         0 $Fetch_Response->{ThrottleTimeMs} = $data[ $i++ ]; # ( ThrottleTimeMs ) only v1 and above
1060             }
1061              
1062 5021         7724 my $topics_array = $Fetch_Response->{topics} = [];
1063 5021         5278 my $topics_array_size = $data[ $i++ ]; # topics array size
1064 5021         8014 while ( $topics_array_size-- ) {
1065 5021         8148 my $topic = {
1066             TopicName => $ data[ $i++ ], # TopicName
1067             };
1068              
1069 5021         7403 my $partitions_array = $topic->{partitions} = [];
1070 5021         5403 my $partitions_array_size = $data[ $i++ ]; # partitions array size
1071 5021         5011 my ( $MessageSetSize, $MessageSet_array );
1072 5021         7258 while ( $partitions_array_size-- ) {
1073 5021         7619 my $partition = {
1074             Partition => $data[ $i++ ], # Partition
1075             ErrorCode => $data[ $i++ ], # ErrorCode
1076             HighwaterMarkOffset => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset
1077             };
1078              
1079 5021         5696 $MessageSetSize = $data[ $i++ ]; # MessageSetSize
1080 5021         5961 $MessageSet_array = $partition->{MessageSet} = [];
1081              
1082 5021         9146 _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );
1083              
1084 5021         8279 push( @$partitions_array, $partition );
1085             }
1086              
1087 5021         8442 push( @$topics_array, $topic );
1088             }
1089              
1090 5021         17287 return $Fetch_Response;
1091             }
1092              
1093             # OFFSET Request ---------------------------------------------------------------
1094              
1095             =head3 C<encode_offset_request( $Offset_Request )>
1096              
1097             Encodes the argument and returns a reference to the encoded binary string
1098             representing a Request buffer.
1099              
1100             This function takes the following arguments:
1101              
1102             =over 3
1103              
1104             =item C<$Offset_Request>
1105              
1106             C<$Offset_Request> is a reference to the hash representing
1107             the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).
1108              
1109             =back
1110              
1111             =cut
1112             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSET} = 1;
1113             sub encode_offset_request {
1114 27     27 1 1351 my ( $Offset_Request ) = @_;
1115              
1116 27         40 my @data;
1117 27         71 my $request = {
1118             # template => '...',
1119             # len => ...,
1120             data => \@data,
1121             };
1122 27         77 my $api_version = _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
1123             # Size
1124             # ApiKey
1125             # ApiVersion
1126             # CorrelationId
1127             # ClientId
1128              
1129 27         53 my $is_v1 = $api_version == 1;
1130              
1131 27         47 my $topics_array = $Offset_Request->{topics};
1132 27         362 push( @data,
1133             $CONSUMERS_REPLICAID, # ReplicaId
1134             scalar( @$topics_array ), # topics array size
1135             );
1136 27         57 $request->{template} .= $_OffsetRequest_header_template;
1137 27         41 $request->{len} += $_OffsetRequest_header_length;
1138              
1139 27 50       106 my $body_template = $is_v1 ? $_OffsetRequest_body_template_v1 : $_OffsetRequest_body_template;
1140 27 50       71 my $body_length = $is_v1 ? $_OffsetRequest_body_length_v1 : $_OffsetRequest_body_length;
1141 27         60 foreach my $topic ( @$topics_array ) {
1142 27         46 $request->{template} .= q{s>}; # string length
1143 27         37 $request->{len} += 2;
1144 27         56 _encode_string( $request, $topic->{TopicName} ); # TopicName
1145              
1146 27         48 my $partitions_array = $topic->{partitions};
1147 27         50 push( @data, scalar( @$partitions_array ) );
1148 27         55 $request->{template} .= q{l>}; # partitions array size
1149 27         45 $request->{len} += 4; # [l] partitions array size
1150 27         45 foreach my $partition ( @$partitions_array ) {
1151             push( @data,
1152             $partition->{Partition}, # Partition
1153             _pack64( $partition->{Time} ), # Time
1154             $is_v1 ? () : $partition->{MaxNumberOfOffsets}, # MaxNumberOfOffsets
1155 27 50       101 );
1156 27         55 $request->{template} .= $body_template;
1157 27         72 $request->{len} += $body_length;
1158             }
1159             }
1160              
1161             # say STDERR $request->{template};
1162             # say STDERR HexDump($_) foreach @data; use Data::HexDump;
1163 27         330 return pack( $request->{template}, $request->{len}, @data );
1164             }
1165              
1166             # OFFSET Response --------------------------------------------------------------
1167              
1168             my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
1169             # x[l] # Size (skip)
1170             # l> # CorrelationId
1171              
1172             # l> # topics array size
1173             # X[l]
1174             # l>/( # topics array
1175             # s>/a # TopicName
1176              
1177             # l> # PartitionOffsets array size
1178             # X[l]
1179             # l>/( # PartitionOffsets array
1180             # l> # Partition
1181             # s> # ErrorCode
1182              
1183             # l> # Offset array size
1184             # X[l]
1185             # l>/( # Offset array
1186             # $_int64_template # Offset
1187             # )
1188             # )
1189             # )
1190              
1191             my $_decode_offset_response_template_v1 = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}${_int64_template}))};
1192             # x[l] # Size (skip)
1193             # l> # CorrelationId
1194              
1195             # l> # topics array size
1196             # X[l]
1197             # l>/( # topics array
1198             # s>/a # TopicName
1199              
1200             # l> # PartitionOffsets array size
1201             # X[l]
1202             # l>/( # PartitionOffsets array
1203             # l> # Partition
1204             # s> # ErrorCode
1205             # $_int64_template # Timestamp
1206             # $_int64_template # Offset
1207             # )
1208             # )
1209              
1210             =head3 C<decode_offset_response( $bin_stream_ref )>
1211              
1212             Decodes the argument and returns a reference to the hash representing
1213             the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).
1214              
1215             This function takes the following arguments:
1216              
1217             =over 3
1218              
1219             =item C<$bin_stream_ref>
1220              
1221             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1222             must be a non-empty binary string.
1223              
1224             =back
1225              
1226             =cut
1227             sub decode_offset_response {
1228 23     23 1 406 my ( $bin_stream_ref, $api_version ) = @_;
1229              
1230 23   66     62 $api_version //= $DEFAULT_APIVERSION;
1231 23         42 my $is_v1 = $api_version == 1;
1232 23 50       67 my $template = $is_v1 ? $_decode_offset_response_template_v1 : $_decode_offset_response_template;
1233              
1234 23         127 my @data = unpack( $template, $$bin_stream_ref );
1235              
1236 23         59 my $i = 0;
1237 23         37 my $Offset_Response = { };
1238              
1239 23         50 $Offset_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1240              
1241 23         47 my $topics_array = $Offset_Response->{topics} = [];
1242 23         35 my $topics_array_size = $data[ $i++ ]; # topics array size
1243 23         60 while ( $topics_array_size-- ) {
1244 23         51 my $topic = {
1245             TopicName => $data[ $i++ ], # TopicName
1246             };
1247              
1248 23         37 my $PartitionOffsets_array = $topic->{PartitionOffsets} = [];
1249 23         39 my $PartitionOffsets_array_size = $data[ $i++ ]; # PartitionOffsets array size
1250 23         41 my ( $PartitionOffset, $Offset_array, $Offset_array_size );
1251 23         44 while ( $PartitionOffsets_array_size-- ) {
1252 23         65 $PartitionOffset = {
1253             Partition => $data[ $i++ ], # Partition
1254             ErrorCode => $data[ $i++ ], # ErrorCode
1255             };
1256 23 50       44 if ($is_v1) {
1257 0         0 $PartitionOffset->{Timestamp} = _unpack64($data[ $i++ ]); # Timestamp (v1 only)
1258 0         0 $PartitionOffset->{Offset} = _unpack64($data[ $i++ ]); # Offset (v1 only)
1259              
1260             } else {
1261 23         49 $Offset_array = $PartitionOffset->{Offset} = [];
1262 23         32 $Offset_array_size = $data[ $i++ ]; # Offset array size
1263 23         145 while ( $Offset_array_size-- ) {
1264 41         79 push( @$Offset_array, _unpack64( $data[ $i++ ] ) ); # Offset
1265             }
1266             }
1267              
1268 23         49 push( @$PartitionOffsets_array, $PartitionOffset );
1269             }
1270              
1271 23         63 push( @$topics_array, $topic );
1272             }
1273              
1274 23         64 return $Offset_Response;
1275             }
1276              
1277             # METADATA Request -------------------------------------------------------------
1278              
1279             =head3 C<encode_metadata_request( $Metadata_Request )>
1280              
1281             Encodes the argument and returns a reference to the encoded binary string
1282             representing a Request buffer.
1283              
1284             This function takes the following arguments:
1285              
1286             =over 3
1287              
1288             =item C<$Metadata_Request>
1289              
1290             C<$Metadata_Request> is a reference to the hash representing
1291             the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).
1292              
1293             =back
1294              
1295             =cut
1296             $IMPLEMENTED_APIVERSIONS->{$APIKEY_METADATA} = 0;
1297             sub encode_metadata_request {
1298 128     128 1 2418 my ( $Metadata_Request ) = @_;
1299              
1300 128         372 my @data;
1301 128         501 my $request = {
1302             # template => '...',
1303             # len => ...,
1304             data => \@data,
1305             };
1306              
1307 128         707 _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
1308             # Size
1309             # ApiKey
1310             # ApiVersion
1311             # CorrelationId
1312             # ClientId
1313              
1314 128         246 my $topics_array = $Metadata_Request->{topics};
1315 128         247 push( @data, scalar( @$topics_array ) ); # topics array size
1316 128         602 $request->{template} .= q{l>};
1317 128         287 $request->{len} += 4;
1318              
1319 128         444 foreach my $topic ( @$topics_array ) {
1320 127         258 $request->{template} .= q{s>}; # string length
1321 127         215 $request->{len} += 2;
1322 127         316 _encode_string( $request, $topic ); # TopicName
1323             }
1324              
1325 128         2001 return pack( $request->{template}, $request->{len}, @data );
1326             }
1327              
1328             # METADATA Response ------------------------------------------------------------
1329              
1330             my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
1331             # x[l] # Size (skip)
1332             # l> # CorrelationId
1333              
1334             # l> # Broker array size
1335             # X[l]
1336             # l>/( # Broker array
1337             # l> # NodeId
1338             # s>/a # Host
1339             # l> # Port
1340             # )
1341              
1342             # l> # TopicMetadata array size
1343             # X[l]
1344             # l>/( # TopicMetadata array
1345             # s> # ErrorCode
1346             # s>/a # TopicName
1347              
1348             # l> # PartitionMetadata array size
1349             # X[l]
1350             # l>/( # PartitionMetadata array
1351             # s> # ErrorCode
1352             # l> # Partition
1353             # l> # Leader
1354              
1355             # l> # Replicas array size
1356             # X[l]
1357             # l>/( # Replicas array
1358             # l> # ReplicaId
1359             # )
1360              
1361             # l> # Isr array size
1362             # X[l]
1363             # l>/( # Isr array
1364             # l> # ReplicaId
1365             # )
1366             # )
1367             # )
1368              
1369             =head3 C<decode_metadata_response( $bin_stream_ref )>
1370              
1371             Decodes the argument and returns a reference to the hash representing
1372             the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).
1373              
1374             This function takes the following arguments:
1375              
1376             =over 3
1377              
1378             =item C<$bin_stream_ref>
1379              
1380             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1381             must be a non-empty binary string.
1382              
1383             =back
1384              
1385             =cut
1386             sub decode_metadata_response {
1387 121     121 1 604 my ( $bin_stream_ref ) = @_;
1388              
1389 121         1393 my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );
1390              
1391 121         318 my $i = 0;
1392 121         229 my $Metadata_Response = {};
1393              
1394 121         441 $Metadata_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1395              
1396 121         316 my $Broker_array = $Metadata_Response->{Broker} = [];
1397 121         206 my $Broker_array_size = $data[ $i++ ]; # Broker array size
1398 121         380 while ( $Broker_array_size-- ) {
1399 363         1395 push( @$Broker_array, {
1400             NodeId => $data[ $i++ ], # NodeId
1401             Host => $data[ $i++ ], # Host
1402             Port => $data[ $i++ ], # Port
1403             }
1404             );
1405             }
1406              
1407 121         345 my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} = [];
1408 121         233 my $TopicMetadata_array_size = $data[ $i++ ]; # TopicMetadata array size
1409 121         328 while ( $TopicMetadata_array_size-- ) {
1410 121         490 my $TopicMetadata = {
1411             ErrorCode => $data[ $i++ ], # ErrorCode
1412             TopicName => $data[ $i++ ], # TopicName
1413             };
1414              
1415 121         341 my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} = [];
1416 121         215 my $PartitionMetadata_array_size = $data[ $i++ ]; # PartitionMetadata array size
1417 121         351 while ( $PartitionMetadata_array_size-- ) {
1418 121         543 my $PartitionMetadata = {
1419             ErrorCode => $data[ $i++ ], # ErrorCode
1420             Partition => $data[ $i++ ], # Partition
1421             Leader => $data[ $i++ ], # Leader
1422             };
1423              
1424 121         316 my $Replicas_array = $PartitionMetadata->{Replicas} = [];
1425 121         251 my $Replicas_array_size = $data[ $i++ ]; # Replicas array size
1426 121         309 while ( $Replicas_array_size-- ) {
1427 363         694 push( @$Replicas_array, $data[ $i++ ] ); # ReplicaId
1428             }
1429              
1430 121         285 my $Isr_array = $PartitionMetadata->{Isr} = [];
1431 121         218 my $Isr_array_size = $data[ $i++ ]; # Isr array size
1432 121         303 while ( $Isr_array_size-- ) {
1433 363         669 push( @$Isr_array, $data[ $i++ ] ); # ReplicaId
1434             }
1435              
1436 121         381 push( @$PartitionMetadata_array, $PartitionMetadata );
1437             }
1438              
1439 121         291 push( @$TopicMetadata_array, $TopicMetadata );
1440             }
1441              
1442 121         406 return $Metadata_Response;
1443             }
1444              
1445             # OffsetCommit Request -------------------------------------------------------------
1446              
1447             =head3 C<encode_offsetcommit_request( $OffsetCommit_Request )>
1448              
1449             Encodes the argument and returns a reference to the encoded binary string
1450             representing a Request buffer.
1451              
1452             This function takes the following arguments:
1453              
1454             =over 3
1455              
1456             =item C<$OffsetCommit_Request>
1457              
1458             C<$OffsetCommit_Request> is a reference to the hash representing
1459             the structure of the OffsetCommit Request (examples see C<t/*_decode_encode.t>).
1460              
1461             =back
1462              
1463             =cut
1464             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETCOMMIT} = 1;
1465             sub encode_offsetcommit_request {
1466 0     0 1 0 my ( $OffsetCommit_Request ) = @_;
1467              
1468 0         0 my @data;
1469 0         0 my $request = {
1470             # template => '...',
1471             # len => ...,
1472             data => \@data,
1473             };
1474              
1475 0         0 my $api_version = _encode_request_header( $request, $APIKEY_OFFSETCOMMIT, $OffsetCommit_Request );
1476             # Size
1477             # ApiKey
1478             # ApiVersion
1479             # CorrelationId
1480             # ClientId
1481              
1482 0         0 my $is_v1 = $api_version == 1;
1483              
1484 0         0 $request->{template} .= q{s>};
1485 0         0 $request->{len} += 2;
1486 0         0 _encode_string( $request, $OffsetCommit_Request->{GroupId} ); # GroupId
1487              
1488 0 0       0 if ($is_v1) {
1489 0         0 $request->{template} .= q{l>}; # GroupGenerationId
1490 0         0 $request->{len} += 4;
1491             ### WARNING TODO: we don't support consumer groups properly, so we set
1492             ### generation id to -1 and member id null (see
1493             ### https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol )
1494 0         0 push( @data, -1 );
1495              
1496 0         0 $request->{template} .= q{s>}; # string length
1497 0         0 $request->{len} += 2;
1498 0         0 _encode_string( $request, '' ); # MemberId
1499             }
1500              
1501 0         0 my $topics_array = $OffsetCommit_Request->{topics};
1502 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1503 0         0 $request->{template} .= q{l>};
1504 0         0 $request->{len} += 4;
1505              
1506 0         0 foreach my $topic ( @$topics_array ) {
1507 0         0 $request->{template} .= q{s>};
1508 0         0 $request->{len} += 2;
1509 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1510              
1511 0         0 my $partitions_array = $topic->{partitions};
1512 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1513 0         0 $request->{template} .= q{l>};
1514 0         0 $request->{len} += 4;
1515              
1516 0         0 foreach my $partition ( @$partitions_array ){
1517             push( @data,
1518             $partition->{Partition}, # Partition
1519             $partition->{Offset}, # Offset
1520 0         0 );
1521 0         0 $request->{template} .= qq{l>${_int64_template}};
1522 0         0 $request->{len} += 12;
1523              
1524 0 0       0 if ($is_v1) { # timestamp
1525             ### WARNING TODO: currently we hardcode "now"
1526 0         0 push( @data, time() * 1000);
1527 0         0 $request->{template} .= qq{${_int64_template}};
1528 0         0 $request->{len} += 8;
1529             }
1530              
1531 0         0 $request->{template} .= q{s>};
1532 0         0 $request->{len} += 2;
1533 0         0 _encode_string( $request, $partition->{Metadata} ), # Metadata
1534             }
1535             }
1536 0         0 return pack( $request->{template}, $request->{len}, @data );
1537             }
1538              
1539             # OFFSETCOMMIT Response --------------------------------------------------------------
1540              
1541             my $_decode_offsetcommit_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>))};
1542             # x[l] # Size (skip)
1543             # l> # CorrelationId
1544             # l> # topics array size
1545             # X[l]
1546             # l>/( # topics array
1547             # s>/a # TopicName
1548             # l> # partitions array size
1549             # X[l]
1550             # l>/( # partitions array
1551             # l> # Partition
1552             # s> # ErrorCode
1553             # )
1554             # )
1555              
1556             =head3 C<decode_offsetcommit_response( $bin_stream_ref )>
1557              
1558             Decodes the argument and returns a reference to the hash representing
1559             the structure of the OFFSETCOMMIT Response (examples see C<t/*_decode_encode.t>).
1560              
1561             This function takes the following arguments:
1562              
1563             =over 3
1564              
1565             =item C<$bin_stream_ref>
1566              
1567             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1568             must be a non-empty binary string.
1569              
1570             =back
1571              
1572             =cut
1573             sub decode_offsetcommit_response {
1574 0     0 1 0 my ( $bin_stream_ref ) = @_;
1575              
1576 0         0 my @data = unpack( $_decode_offsetcommit_response_template, $$bin_stream_ref );
1577              
1578 0         0 my $i = 0;
1579 0         0 my $OffsetCommit_Response = {};
1580              
1581 0         0 $OffsetCommit_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1582              
1583 0         0 my $topics_array = $OffsetCommit_Response->{topics} = [];
1584 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1585 0         0 while ( $topics_array_size-- ) {
1586 0         0 my $topic = {
1587             TopicName => $data[ $i++ ], # TopicName
1588             };
1589              
1590 0         0 my $Partition_array = $topic->{partitions} = [];
1591 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1592 0         0 while ( $Partition_array_size-- ) {
1593 0         0 my $Partition = {
1594             Partition => $data[ $i++ ], # Partition
1595             ErrorCode => $data[ $i++ ], # ErrorCode
1596             };
1597 0         0 push( @$Partition_array, $Partition);
1598             }
1599 0         0 push( @$topics_array, $topic);
1600             }
1601 0         0 return $OffsetCommit_Response;
1602             }
1603              
1604             # OffsetFetch Request -------------------------------------------------------------
1605              
1606             =head3 C<encode_offsetfetch_request( $OffsetFetch_Request )>
1607              
1608             Encodes the argument and returns a reference to the encoded binary string
1609             representing a Request buffer.
1610              
1611             This function takes the following arguments:
1612              
1613             =over 3
1614              
1615             =item C<$OffsetFetch_Request>
1616              
1617             C<$OffsetFetch_Request> is a reference to the hash representing
1618             the structure of the OffsetFetch Request (examples see C<t/*_decode_encode.t>).
1619              
1620             =back
1621              
1622             =cut
1623             $IMPLEMENTED_APIVERSIONS->{$APIKEY_OFFSETFETCH} = 1;
1624             sub encode_offsetfetch_request {
1625 0     0 1 0 my ( $OffsetFetch_Request ) = @_;
1626              
1627 0         0 my @data;
1628 0         0 my $request = {
1629             # template => '...',
1630             # len => ...,
1631             data => \@data,
1632             };
1633              
1634 0         0 _encode_request_header( $request, $APIKEY_OFFSETFETCH, $OffsetFetch_Request );
1635             # Size
1636             # ApiKey
1637             # ApiVersion
1638             # CorrelationId
1639             # ClientId
1640              
1641 0         0 $request->{template} .= q{s>};
1642 0         0 $request->{len} += 2;
1643 0         0 _encode_string( $request, $OffsetFetch_Request->{GroupId} ); # GroupId
1644              
1645 0         0 my $topics_array = $OffsetFetch_Request->{topics};
1646 0         0 push( @data, scalar( @$topics_array ) ); # topics array size
1647 0         0 $request->{template} .= q{l>};
1648 0         0 $request->{len} += 4;
1649              
1650 0         0 foreach my $topic ( @$topics_array ) {
1651 0         0 $request->{template} .= q{s>};
1652 0         0 $request->{len} += 2;
1653 0         0 _encode_string( $request, $topic->{TopicName} ); # TopicName
1654              
1655 0         0 my $partitions_array = $topic->{partitions};
1656 0         0 push( @data, scalar( @$partitions_array ) ); # partitions array size
1657 0         0 $request->{template} .= q{l>};
1658 0         0 $request->{len} += 4;
1659              
1660 0         0 foreach my $partition ( @$partitions_array ){
1661             push( @data,
1662             $partition->{Partition}, # Partition
1663 0         0 );
1664 0         0 $request->{template} .= qq{l>};
1665 0         0 $request->{len} += 4;
1666             }
1667             }
1668 0         0 return pack( $request->{template}, $request->{len}, @data );
1669             }
1670              
1671             # OFFSETFETCH Response --------------------------------------------------------------
1672              
1673             my $_decode_offsetfetch_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>${_int64_template}s>/as>))};
1674             # x[l] # Size (skip)
1675             # l> # CorrelationId
1676             # l> # topics array size
1677             # X[l]
1678             # l>/( # topics array
1679             # s>/a # TopicName
1680             # l> # partitions array size
1681             # X[l]
1682             # l>/( # partitions array
1683             # l> # Partition
1684             # $_int64_template # Offset
1685             # s>/a # Metadata
1686             # s> # ErrorCode
1687             # )
1688             # )
1689              
1690             =head3 C<decode_offsetfetch_response( $bin_stream_ref )>
1691              
1692             Decodes the argument and returns a reference to the hash representing
1693             the structure of the OFFSETFETCH Response (examples see C<t/*_decode_encode.t>).
1694              
1695             This function takes the following arguments:
1696              
1697             =over 3
1698              
1699             =item C<$bin_stream_ref>
1700              
1701             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1702             must be a non-empty binary string.
1703              
1704             =back
1705              
1706             =cut
1707             sub decode_offsetfetch_response {
1708 0     0 1 0 my ( $bin_stream_ref ) = @_;
1709              
1710 0         0 my @data = unpack( $_decode_offsetfetch_response_template, $$bin_stream_ref );
1711              
1712 0         0 my $i = 0;
1713 0         0 my $OffsetFetch_Response = {};
1714              
1715 0         0 $OffsetFetch_Response->{CorrelationId} = $data[ $i++ ]; #CorrelationId
1716              
1717 0         0 my $topics_array = $OffsetFetch_Response->{topics} = [];
1718 0         0 my $topics_array_size = $data[ $i++ ]; # topics array size
1719 0         0 while ( $topics_array_size-- ) {
1720 0         0 my $topic = {
1721             TopicName => $data[ $i++ ], # TopicName
1722             };
1723              
1724 0         0 my $Partition_array = $topic->{partitions} = [];
1725 0         0 my $Partition_array_size = $data[ $i++ ]; # Partitions array size
1726 0         0 while ( $Partition_array_size-- ) {
1727 0         0 my $Partition = {
1728             Partition => $data[ $i++ ], # Partition
1729             Offset => $data[ $i++ ], # Partition
1730             Metadata => $data[ $i++ ], # Partition
1731             ErrorCode => $data[ $i++ ], # ErrorCode
1732             };
1733 0         0 push( @$Partition_array, $Partition);
1734             }
1735 0         0 push( @$topics_array, $topic);
1736             }
1737 0         0 return $OffsetFetch_Response;
1738             }
1739              
1740             # SaslHandshake Request -------------------------------------------------------------
1741              
1742             =head3 C<encode_saslhandshake_request( $SaslHandshake_Request )>
1743              
1744             Encodes the argument and returns a reference to the encoded binary string
1745             representing a Request buffer.
1746              
1747             This function takes the following arguments:
1748              
1749             =over 3
1750              
1751             =item C<$SaslHandshake_Request>
1752              
1753             C<$SaslHandshake_Request> is a reference to the hash representing
1754             the structure of the SaslHandshake Request.
1755              
1756             =back
1757              
1758             =cut
1759             $IMPLEMENTED_APIVERSIONS->{$APIKEY_SASLHANDSHAKE} = 0;
1760             sub encode_saslhandshake_request {
1761 1     1 1 3 my ( $SaslHandshake_Request ) = @_;
1762              
1763 1         2 my @data;
1764 1         3 my $request = {
1765             # template => '...',
1766             # len => ...,
1767             data => \@data,
1768             };
1769              
1770 1         3 _encode_request_header( $request, $APIKEY_SASLHANDSHAKE, $SaslHandshake_Request );
1771             # Size
1772             # ApiKey
1773             # ApiVersion
1774             # CorrelationId
1775             # ClientId
1776              
1777              
1778 1         4 $request->{template} .= q{s>}; # string length
1779 1         2 $request->{len} += 2;
1780 1         3 _encode_string( $request, $SaslHandshake_Request->{Mechanism} ); # SASL Mechanism
1781              
1782 1         12 return pack( $request->{template}, $request->{len}, @data );
1783             }
1784              
1785             # SASLHANDSHAKE Response --------------------------------------------------------------
1786              
1787             my $_decode_saslhandshake_response_template = q{x[l]l>s>l>X[l]l>/(s>/a)};
1788             # x[l] # Size (skip)
1789             # l> # CorrelationId
1790             # s> # ErrorCode
1791             # l> # ApiVersions array size
1792             # X[l]
1793             # l>/( # Enabled Mechanisms
1794             # s>/a # Mechanism
1795             # )
1796              
1797             =head3 C<decode_saslhandshake_response( $bin_stream_ref )>
1798              
1799             Decodes the argument and returns a reference to the hash representing
1800             the structure of the SaslHandshake Response.
1801              
1802             This function takes the following arguments:
1803              
1804             =over 3
1805              
1806             =item C<$bin_stream_ref>
1807              
1808             C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1809             must be a non-empty binary string.
1810              
1811             =back
1812              
1813             =cut
1814             sub decode_saslhandshake_response {
1815 0     0 1 0 my ( $bin_stream_ref ) = @_;
1816              
1817 0         0 my @data = unpack( $_decode_saslhandshake_response_template, $$bin_stream_ref );
1818              
1819 0         0 my $i = 0;
1820 0         0 my $SaslHandshake_Response = {};
1821              
1822 0         0 $SaslHandshake_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId
1823 0         0 $SaslHandshake_Response->{ErrorCode} = $data[ $i++ ]; # ErrorCode
1824              
1825 0         0 my $Mechanisms_array = $SaslHandshake_Response->{Mechanisms} = [];
1826 0         0 my $Mechanisms_array_size = $data[ $i++ ]; # ApiVersions array size
1827 0         0 while ( $Mechanisms_array_size-- ) {
1828 0         0 push( @$Mechanisms_array, $data[ $i++ ] ); # Mechanism
1829             }
1830              
1831 0         0 return $SaslHandshake_Response;
1832             }
1833              
1834             #-- private functions ----------------------------------------------------------
1835              
1836             # Generates a template to encrypt the request header
1837             sub _encode_request_header {
1838 10282     10282   15416 my ( $request, $api_key, $request_ref ) = @_;
1839              
1840             # we need to find out which API version to use for the request (and the
1841             # response). $request_ref->{ApiVersion} can be specified by the end user,
1842             # or providede by by Kafka::Connection ( see
1843             # Kafka::Connection::_get_api_versions() ). If not provided, we
1844             # default to $DEFAULT_APIVERSION
1845 10282   66     22963 my $api_version = $request_ref->{ApiVersion} // $DEFAULT_APIVERSION;
1846 10282         18405 @{ $request->{data} } = (
1847             # Size
1848             $api_key, # ApiKey
1849             $api_version, # ApiVersion
1850             $request_ref->{CorrelationId}, # CorrelationId
1851 10282         12270 );
1852 10282         13192 $request->{template} = $_Request_header_template;
1853 10282         10372 $request->{len} = $_Request_header_length;
1854 10282         18720 _encode_string( $request, $request_ref->{ClientId} ); # ClientId
1855              
1856 10282         10938 return $api_version;
1857             }
1858              
1859             # Generates a template to decrypt the fetch response body
1860             sub _decode_fetch_response_template {
1861 5021     5021   6131 my ( $response, $api_version ) = @_;
1862              
1863 5021         5149 my $is_v1 = $api_version == 1;
1864 5021         4821 my $is_v2 = $api_version == 2;
1865 5021         4247 my $is_v3 = $api_version == 3;
1866              
1867 5021 50 33     16095 if ($is_v1 || $is_v2 || $is_v3) {
      33        
1868 0         0 $response->{template} = $_FetchResponse_header_template_v1;
1869 0         0 $response->{stream_offset} = $_FetchResponse_header_length_v1; # bytes before topics array size
1870             # [l] Size
1871             # [l] CorrelationId
1872             # [l] throttle_time_ms
1873             } else {
1874 5021         6461 $response->{template} = $_FetchResponse_header_template;
1875 5021         5757 $response->{stream_offset} = $_FetchResponse_header_length; # bytes before topics array size
1876             # [l] Size
1877             # [l] CorrelationId
1878             }
1879             my $topics_array_size = unpack(
1880             q{x[}.$response->{stream_offset} . q{]}
1881             .q{l>}, # topics array size
1882 5021         7662 ${ $response->{bin_stream} }
  5021         10450  
1883             );
1884 5021         6035 $response->{stream_offset} += 4; # bytes before TopicName length
1885             # [l] topics array size
1886              
1887 5021         5256 my ( $TopicName_length, $partitions_array_size );
1888 5021         7277 while ( $topics_array_size-- ) {
1889             $TopicName_length = unpack(
1890             q{x[}.$response->{stream_offset}
1891             .q{]s>}, # TopicName length
1892 5021         5548 ${ $response->{bin_stream} }
  5021         7192  
1893             );
1894             $response->{stream_offset} += # bytes before partitions array size
1895 5021         5751 2 # [s] TopicName length
1896             + $TopicName_length # TopicName
1897             ;
1898             $partitions_array_size = unpack(
1899             q{x[}.$response->{stream_offset}
1900             .q{]l>}, # partitions array size
1901 5021         5415 ${ $response->{bin_stream} }
  5021         6154  
1902             );
1903 5021         5498 $response->{stream_offset} += 4; # bytes before Partition
1904             # [l] partitions array size
1905              
1906 5021         6431 $response->{template} .= $_FetchResponse_topic_body_template;
1907 5021         5074 $response->{stream_offset} += $_FetchResponse_topic_body_length; # (without TopicName and partitions array size)
1908             # bytes before MessageSetSize
1909             # TopicName
1910             # [l] # partitions array size
1911             # [l] Partition
1912             # [s] ErrorCode
1913             # [q] HighwaterMarkOffset
1914              
1915 5021         6566 _decode_MessageSet_template( $response );
1916             }
1917              
1918 5021         4960 return;
1919             }
1920              
1921             # kafka uses a snappy-java compressor, with it's own headers and frame formats
1922             my $XERIAL_SNAPPY_MAGIC_HEADER = "\x82SNAPPY\x00";
1923             my $XERIAL_SNAPPY_BLOCK_SIZE = 0x8000; # 32Kb
1924             my $XERIAL_SNAPPY_FILE_VERSION = 1;
1925              
1926             # https://github.com/xerial/snappy-java
1927             # https://github.com/kubo/snzip/blob/master/snappy-java-format.c
1928             sub _snappy_xerial_decompress {
1929 4     4   10 my ( $data ) = @_;
1930 4         6 my $uncompressed;
1931 4         8 my $raw_format_suspected = 1;
1932 4 50       13 if ( length($data) > 16 ) {
1933 4         18 my ( $header, $x_version, $x_compatversion) = unpack( q{a[8]L>L>}, $data );
1934 4 100       12 if ( $header eq $XERIAL_SNAPPY_MAGIC_HEADER ) {
1935 3         13 $raw_format_suspected = 0;
1936 3 50       10 _error( $ERROR_COMPRESSION, 'snappy: bad compatversion in snappy xerial header' ) unless $x_compatversion == $XERIAL_SNAPPY_FILE_VERSION;
1937 3 50       9 _error( $ERROR_COMPRESSION, 'snappy: bad version in snappy xerial header' ) unless $x_version == $XERIAL_SNAPPY_FILE_VERSION;
1938 3         9 $data = substr( $data, 16 );
1939 3         9 while ( length($data) > 0 ) {
1940 4   100     18 $uncompressed //= '';
1941 4 50       9 _error( $ERROR_COMPRESSION, 'snappy: bad frame length header' ) if length($data) < 4;
1942 4         9 my $compressed_frame_length = unpack( q{L>}, $data );
1943 4         10 $data = substr( $data, 4 );
1944 4 50       11 _error( $ERROR_COMPRESSION, 'snappy: partial frame ' ) if length($data) < $compressed_frame_length;
1945 4         11 my $compressed_frame = substr( $data, 0, $compressed_frame_length, '');
1946 4         122 my $uncompressed_frame = Compress::Snappy::decompress( $compressed_frame );
1947 4 50       14 _error( $ERROR_COMPRESSION, 'snappy: can\'t uncompress frame ' ) if not defined $uncompressed_frame;
1948 4         120 $uncompressed .= $uncompressed_frame;
1949             }
1950             }
1951             }
1952 4 100       10 if ( $raw_format_suspected ) {
1953 1         16 $uncompressed = Compress::Snappy::decompress( $data );
1954             }
1955 4         13 return $uncompressed;
1956             }
1957              
1958             sub _snappy_xerial_compress {
1959 3     3   586 my ( $data ) = @_;
1960 3         7 my $compressed_data;
1961 3         9 while ( length($data) ) {
1962 4         68 my $block = substr $data, 0, $XERIAL_SNAPPY_BLOCK_SIZE, '';
1963 4         310 my $compressed_block = Compress::Snappy::compress( $block );
1964 4 50       21 _error( $ERROR_COMPRESSION, 'snappy: can\'t compress frame!' ) if not defined $compressed_block;
1965 4   66     30 $compressed_data //= pack( q{a[8]L>L>}, $XERIAL_SNAPPY_MAGIC_HEADER, $XERIAL_SNAPPY_FILE_VERSION, $XERIAL_SNAPPY_FILE_VERSION );
1966 4         24 $compressed_data .= pack( q{L>}, length($compressed_block) ) . $compressed_block;
1967             }
1968 3         16 return $compressed_data;
1969             }
1970              
1971             sub _decompress_data {
1972 7     7   26 my ( $data, $compression_codec ) = @_;
1973 7 50 50     36 say STDERR format_message( '[%s] decompress_data request, compression_codec: %s, data: %s',
1974             scalar( localtime ),
1975             $compression_codec,
1976             unpack( 'H*', $data ),
1977             ) if Kafka::Protocol->debug_level // 0 >= 3;
1978              
1979 7         13 my $decompressed;
1980 7 100       31 if ( $compression_codec == $COMPRESSION_GZIP ) {
    100          
    50          
1981             try {
1982 3     3   421 $decompressed = gunzip( $data );
1983             } catch {
1984 1     1   20 _error( $ERROR_COMPRESSION, format_message( 'gunzip failed: %s', $_ ) );
1985 3         32 };
1986             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1987 3   33     9 $decompressed = _snappy_xerial_decompress( $data )
1988             // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
1989             } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
1990             # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
1991             # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
1992 1 50       21 if ( Compress::LZ4Frame::looks_like_lz4frame( $data ) ) {
1993 0   0     0 $decompressed = Compress::LZ4Frame::decompress( $data ) // _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data' );
1994             } else {
1995 1         4 _error( $ERROR_COMPRESSION, 'Unable to decompress LZ4 compressed data. Frame is not valid' );
1996             }
1997              
1998             } else {
1999 0         0 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
2000             }
2001 5 50       69 _error( $ERROR_COMPRESSION, 'Decompression produced empty result' ) unless defined $decompressed;
2002 5         13 return $decompressed;
2003             }
2004              
2005             sub _compress_data {
2006 4     4   11 my ( $data, $compression_codec ) = @_;
2007 4         5 my $compressed;
2008              
2009             # Compression
2010 4 100       16 if ( $compression_codec == $COMPRESSION_GZIP ) {
    50          
    0          
2011 2   33     426 $compressed = gzip( $data )
2012             // _error( $ERROR_COMPRESSION, 'Unable to compress gzip data' );
2013             } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
2014 2   33     8 $compressed = _snappy_xerial_compress( $data )
2015             // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
2016             } elsif ( $compression_codec == $COMPRESSION_LZ4 ) {
2017             # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
2018             # New 0.10 clients (proposed behavior) - produce and consume v1 messages w/ correct LZ4F checksum
2019 0   0     0 $compressed = Compress::LZ4Frame::compress_checksum( $data )
2020             // _error( $ERROR_COMPRESSION, 'Unable to compress LZ4 data' );
2021             } else {
2022 0         0 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
2023             }
2024 4         36 return $compressed;
2025             }
2026              
2027              
2028             # Decrypts MessageSet
2029             sub _decode_MessageSet_array {
2030 10056     10056   137556 my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref, $_override_offset) = @_;
2031             # $_override_offset should not be set manually, it's used when recursing,
2032             # to decode internal compressed message, which have offsets starting at 0,
2033             # 1, etc, and instead we need to set the external wrapping message offset.
2034              
2035 10056         11028 my $data = $response->{data};
2036 10056         8214 my $data_array_size = scalar @{ $data };
  10056         12324  
2037              
2038             # NOTE: not all messages can be returned
2039 10056         10804 my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
2040 10056   100     27619 while ( $MessageSetSize && $$i_ref < $data_array_size ) {
2041              
2042 25084         31817 my $message_offset = _unpack64( $data->[ $$i_ref++ ] );
2043 25084 100       31288 if (defined $_override_offset) {
2044 12         16 $message_offset = $_override_offset;
2045             }
2046             $Message = {
2047 25084         33756 Offset => $message_offset, # Offset
2048             };
2049              
2050 25084         25208 $MessageSize = $data->[ $$i_ref++ ]; # MessageSize
2051             # NOTE: The CRC is the CRC32 of the remainder of the message bytes.
2052             # This is used to check the integrity of the message on the broker and consumer:
2053             # MagicByte + Attributes + Key length + Key + Value length + Value
2054 25084         23741 $Crc = $data->[ $$i_ref++ ]; # Crc
2055             # WARNING: The current version of the module does not support the following:
2056             # A message set is also the unit of compression in Kafka,
2057             # and we allow messages to recursively contain compressed message sets to allow batch compression.
2058 25084         26659 $Message->{MagicByte} = $data->[ $$i_ref++ ]; # MagicByte
2059 25084         25607 my $is_v1_msg_format = $Message->{MagicByte} == 1;
2060 25084         26984 $Message->{Attributes} = $data->[ $$i_ref++ ]; # Attributes
2061 25084 50       28869 if ($is_v1_msg_format) {
2062 0         0 $Message->{Timestamp} = $data->[ $$i_ref++ ]; # Timestamp
2063             }
2064              
2065 25084         24413 $Key_length = $data->[ $$i_ref++ ]; # Key length
2066 25084 50       37018 $Message->{Key} = $Key_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Key
2067 25084         22426 $Value_length = $data->[ $$i_ref++ ]; # Value length
2068 25084 50       38728 $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Value
2069              
2070 25084 100       33024 if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
2071 7         18 my $decompressed = _decompress_data($Message->{Value}, $compression_codec);
2072 5         9 my @data;
2073 5         8 my $Value_length = length $decompressed;
2074 5         16 my $resp = {
2075             data => \@data,
2076             bin_stream => \$decompressed,
2077             stream_offset => 0,
2078             };
2079 5         12 _decode_MessageSet_sized_template( $Value_length, $resp );
2080 5         7 @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
  5         30  
2081 5         10 my $i = 0; # i_ref
2082 5         8 my $size = length( $decompressed );
2083 5         20 _decode_MessageSet_array(
2084             $resp,
2085             $size, # message set size
2086             \$i, # i_ref
2087             $MessageSet_array_ref,
2088             $message_offset
2089             );
2090             } else {
2091 25077         29170 push( @$MessageSet_array_ref, $Message );
2092             }
2093              
2094 25082         50672 $MessageSetSize -= 12
2095             # [q] Offset
2096             # [l] MessageSize
2097             + $MessageSize # Message
2098             ;
2099             }
2100              
2101 10054         14094 return;
2102             }
2103              
2104             # Generates a template to encode single message within MessageSet
2105             sub _encode_Message {
2106 25164     25164   51156 my ($request, $Key, $Value, $Attributes, $Offset, $v1_format, $Timestamp) = @_;
2107              
2108             # v1 format - supported since 0.10.0, introduces Timestamps
2109 25164 50       31781 my $MagicByte = $v1_format ? 1 : 0;
2110 25164   50     58416 $Timestamp //= -1;
2111 25164   33     28372 $Attributes //= $COMPRESSION_NONE; # According to Apache Kafka documentation:
2112             # The lowest 2 bits contain the compression codec used for the message.
2113             # The other bits should be set to 0.
2114              
2115              
2116 25164         22643 my $key_length = length( $Key );
2117 25164         20373 my $value_length = length( $Value );
2118              
2119 25164 50       107423 my $message_body = pack(
    50          
    50          
    50          
    50          
    50          
2120             q{cc} # MagicByte
2121             # Attributes
2122             .( $v1_format ? q{a[8]} : q{} ) # Timestamp - v1 only (when MagicByte > 0) (#$_int64_template)
2123             .q{l>} # Key length
2124             .( $key_length ? qq{a[$key_length]} : q{} ) # Key
2125             .q{l>} # Value length
2126             .( $value_length ? qq{a[$value_length]} : q{} ) # Value
2127             ,
2128             $MagicByte,
2129             $Attributes,
2130             $v1_format ? ( _pack64( $Timestamp ) ) : (),
2131             $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ),
2132             $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ),
2133             );
2134              
2135 25164         24694 my $MessageBodySize = length($message_body);
2136 25164         21375 my $MessageSize = $MessageBodySize + 4;
2137              
2138 25164         22447 my $data = $request->{data};
2139 25164         33568 $request->{template} .= $_MessageSet_template . qq{l>a[$MessageBodySize]}; # crc + message_body
2140 25164         24967 $request->{len} += $_MessageSet_length + $MessageSize;
2141              
2142 25164         29681 push @$data, (_pack64( $Offset ), $MessageSize, crc32( $message_body ), $message_body );
2143 25164         52992 return;
2144             }
2145              
2146             # Generates a template to encode MessageSet
2147             sub _encode_MessageSet_array {
2148 10127     10127   29323 my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;
2149              
2150             # not sure if it would be good to mix different formats in different messages in the same set
2151             # so detect if we should use v1 format
2152 10127         8892 my $use_v1_format;
2153 10127         10666 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2154 25160 50       35307 if ( $MessageSet->{Timestamp} ) {
2155 0         0 $use_v1_format = 1;
2156 0         0 last;
2157             }
2158             }
2159              
2160 10127 100       12826 if ( $compression_codec ) {
2161 4         13 my $subrequest = {
2162             data => [],
2163             template => '',
2164             len => 0
2165             };
2166 4         8 my ( $Key, $Value );
2167              
2168 4         9 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2169             _encode_Message( $subrequest,
2170             $Key = $MessageSet->{Key},
2171             $MessageSet->{Value},
2172             $COMPRESSION_NONE,
2173             $PRODUCER_ANY_OFFSET,
2174             $use_v1_format,
2175             $MessageSet->{Timestamp}
2176 10         30 );
2177             }
2178              
2179 4         8 $Value = pack($subrequest->{template}, @{$subrequest->{data}});
  4         16  
2180              
2181 4         27 $MessageSet_array_ref = [
2182             {
2183             Offset => $PRODUCER_ANY_OFFSET,
2184             Key => $Key,
2185             Value => _compress_data($Value, $compression_codec),
2186             }
2187             ];
2188              
2189             }
2190              
2191 10127         19578 my $subrequest = {
2192             data => [],
2193             template => '',
2194             len => 0
2195             };
2196              
2197 10127         13905 foreach my $MessageSet ( @$MessageSet_array_ref ) {
2198             _encode_Message( $subrequest,
2199             $MessageSet->{Key},
2200             $MessageSet->{Value},
2201             $compression_codec // $COMPRESSION_NONE,
2202             $MessageSet->{Offset},
2203             $use_v1_format,
2204             $MessageSet->{Timestamp}
2205 25154   66     84704 );
2206             }
2207              
2208 10127         12219 my $data = $request->{data};
2209 10127         15795 $request->{template} .= q{l>} . $subrequest->{template};
2210 10127         10621 $request->{len} += 4 + $subrequest->{len};
2211 10127         10852 push @$data, ( $subrequest->{len}, @{ $subrequest->{data} } );
  10127         21264  
2212              
2213 10127         24685 return;
2214             }
2215              
2216             # Generates a template to decrypt MessageSet
2217             sub _decode_MessageSet_template {
2218 10051     10051   202049 my ( $response ) = @_;
2219              
2220             my $MessageSetSize = unpack(
2221             q{x[}.$response->{stream_offset}
2222             .q{]l>}, # MessageSetSize
2223 10051         12305 ${ $response->{bin_stream} }
  10051         13723  
2224             );
2225 10051         12633 $response->{template} .= q{l>}; # MessageSetSize
2226 10051         8961 $response->{stream_offset} += 4; # bytes before Offset
2227              
2228 10051         13631 return _decode_MessageSet_sized_template($MessageSetSize, $response);
2229             }
2230              
2231             sub _decode_MessageSet_sized_template {
2232 10056     10056   11688 my ( $MessageSetSize, $response ) = @_;
2233              
2234 10056         8722 my $bin_stream_length = length ${ $response->{bin_stream} };
  10056         11734  
2235              
2236 10056         10997 my ( $local_template, $MessageSize, $MagicByte, $Key_length, $Value_length );
2237             CREATE_TEMPLATE:
2238 10056         13581 while ( $MessageSetSize ) {
2239             # Not the full MessageSet
2240             # 22 is the minimal size of a v0 message format until (and including) the Key
2241             # Length. If the message version is v1, then it's 22 + 8. We'll check later
2242 25090 100       28152 last CREATE_TEMPLATE if $MessageSetSize < 22;
2243              
2244             # [q] Offset
2245             # [l] MessageSize
2246             # [l] Crc
2247             # [c] MagicByte
2248             # [c] Attributes
2249             # ( [q] Timestamp ) message format v1 only
2250             # [l] Key length
2251             # [l] Value length
2252              
2253 25088         21830 $local_template = q{};
2254             MESSAGE_SET:
2255             {
2256 25088         19318 $response->{stream_offset} += 8; # the size of the offset data value
  25088         20642  
2257             ($MessageSize, $MagicByte) = unpack(
2258             q{x[}.$response->{stream_offset}.q{]}
2259             .q{l>} # MessageSize
2260             .q{x[l]} # Crc
2261             .q{c}, # MagicByte
2262              
2263 25088         28906 ${ $response->{bin_stream} }
  25088         43248  
2264             );
2265              
2266 25088         28951 my $is_v1_msg_format = $MagicByte == 1;
2267 25088 50 33     31695 if ($is_v1_msg_format && $MessageSetSize < (22+8)) {
2268             # Not the full MessageSet
2269 0         0 $local_template = q{};
2270 0         0 last MESSAGE_SET;
2271             }
2272 25088 50       27513 $local_template .= ( $is_v1_msg_format ? $_Message_template_with_timestamp : $_Message_template );
2273              
2274 25088 50       26590 $response->{stream_offset} += ($is_v1_msg_format ? 18: 10);
2275              
2276             $Key_length = unpack(
2277             q{x[}.$response->{stream_offset}
2278             .q{]l>}, # Key length
2279 25088         24818 ${ $response->{bin_stream} }
  25088         30689  
2280             );
2281              
2282 25088         26497 $response->{stream_offset} += 4; # bytes before Key or Value length
2283             # [l] Key length
2284 25088 50       28484 $response->{stream_offset} += $Key_length # bytes before Key
2285             if $Key_length != $NULL_BYTES_LENGTH; # Key
2286 25088 100       27586 if ( $bin_stream_length >= $response->{stream_offset} + 4 ) { # + [l] Value length
2287 25086 50       28259 $local_template .= $_Key_or_Value_template
2288             if $Key_length != $NULL_BYTES_LENGTH;
2289             } else {
2290             # Not the full MessageSet
2291 2         4 $local_template = q{};
2292 2         3 last MESSAGE_SET;
2293             }
2294              
2295 25086         21213 $local_template .= q{l>}; # Value length
2296             $Value_length = unpack(
2297             q{x[}.$response->{stream_offset}
2298             .q{]l>}, # Value length
2299 25086         24080 ${ $response->{bin_stream} }
  25086         31747  
2300             );
2301             $response->{stream_offset} += # bytes before Value or next Message
2302 25086         25333 4 # [l] Value length
2303             ;
2304 25086 50       29557 $response->{stream_offset} += $Value_length # bytes before next Message
2305             if $Value_length != $NULL_BYTES_LENGTH; # Value
2306 25086 100       27178 if ( $bin_stream_length >= $response->{stream_offset} ) {
2307 25084 50       31594 $local_template .= $_Key_or_Value_template
2308             if $Value_length != $NULL_BYTES_LENGTH;
2309             } else {
2310             # Not the full MessageSet
2311 2         4 $local_template = q{};
2312 2         4 last MESSAGE_SET;
2313             }
2314             }
2315              
2316 25088 100       24560 if ( $local_template ) {
2317 25084         25312 $response->{template} .= $local_template;
2318 25084         33773 $MessageSetSize -= 12
2319             # [q] Offset
2320             # [l] MessageSize
2321             + $MessageSize # Message
2322             ;
2323             } else {
2324 4         6 last CREATE_TEMPLATE;
2325             }
2326             }
2327              
2328 10056         17328 return;
2329             }
2330              
2331             # Generates a template to encrypt string
2332             sub _encode_string {
2333 30793     30793   863231 my ( $request, $string ) = @_;
2334              
2335 30793 100       38447 if ( $string eq q{} ) {
2336 131         216 push( @{ $request->{data} }, 0 );
  131         500  
2337             } else {
2338 30662         27319 my $string_length = length $string;
2339 30662         25517 push( @{ $request->{data} }, $string_length, $string );
  30662         37724  
2340 30662         41291 $request->{template} .= q{a*}; # string;
2341 30662         30807 $request->{len} += $string_length;
2342             }
2343              
2344 30793         33200 return;
2345             }
2346              
2347             # Handler for errors
2348             sub _error {
2349 2     2   10 Kafka::Exception::Protocol->throw( throw_args( @_ ) );
2350              
2351 0           return;
2352             }
2353              
2354             1;
2355              
2356             __END__
2357              
2358             =head1 DIAGNOSTICS
2359              
2360             In order to achieve better performance, functions of this module do not perform
2361             arguments validation.
2362              
2363             =head1 SEE ALSO
2364              
2365             The basic operation of the Kafka package modules:
2366              
2367             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
2368              
2369             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
2370              
2371             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
2372              
2373             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
2374              
2375             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
2376             properties.
2377              
2378             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
2379             protocol on 32 bit systems.
2380              
2381             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
2382             Apache Kafka's Protocol.
2383              
2384             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
2385              
2386             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
2387              
2388             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
2389             by several package modules.
2390              
2391             A wealth of detail about the Apache Kafka and the Kafka Protocol:
2392              
2393             Main page at L<http://kafka.apache.org/>
2394              
2395             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
2396              
2397             =head1 SOURCE CODE
2398              
2399             Kafka package is hosted on GitHub:
2400             L<https://github.com/TrackingSoft/Kafka>
2401              
2402             =head1 AUTHOR
2403              
2404             Sergey Gladkov
2405              
2406             Please use GitHub project link above to report problems or contact authors.
2407              
2408             =head1 CONTRIBUTORS
2409              
2410             Alexander Solovey
2411              
2412             Jeremy Jordan
2413              
2414             Sergiy Zuban
2415              
2416             Vlad Marchenko
2417              
2418             Damien Krotkine
2419              
2420             Greg Franklin
2421              
2422             =head1 COPYRIGHT AND LICENSE
2423              
2424             Copyright (C) 2012-2017 by TrackingSoft LLC.
2425              
2426             This package is free software; you can redistribute it and/or modify it under
2427             the same terms as Perl itself. See I<perlartistic> at
2428             L<http://dev.perl.org/licenses/artistic.html>.
2429              
2430             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
2431             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
2432             PARTICULAR PURPOSE.
2433              
2434             =cut