File Coverage

lib/Kafka/Message.pm
Criterion Covered Total %
statement 16 16 100.0
branch n/a
condition n/a
subroutine 6 6 100.0
pod 1 1 100.0
total 23 23 100.0


line stmt bran cond sub pod time code
1             package Kafka::Message;
2              
3             =head1 NAME
4              
5             Kafka::Message - Interface to the Kafka message properties.
6              
7             =head1 VERSION
8              
9             This documentation refers to C<Kafka::Message> version 1.08 .
10              
11             =cut
12              
13              
14              
15 7     7   124145 use 5.010;
  7         35  
16 7     7   38 use strict;
  7         21  
  7         161  
17 7     7   33 use warnings;
  7         14  
  7         847  
18              
19             our $VERSION = 'v1.08';
20              
21              
22              
23             our @_standard_fields = qw(
24             Attributes
25             Timestamp
26             error
27             HighwaterMarkOffset
28             key
29             MagicByte
30             next_offset
31             payload
32             offset
33             valid
34             );
35              
36             #-- constructor ----------------------------------------------------------------
37              
38             sub new {
39 15019     15019 1 20154 my ( $class, $self ) = @_;
40              
41 15019         15194 bless $self, $class;
42              
43 15019         29699 return $self;
44             }
45              
46             #-- public attributes ----------------------------------------------------------
47              
48             {
49 7     7   66 no strict 'refs'; ## no critic
  7         21  
  7         862  
50              
51             # getters
52             foreach my $method ( @_standard_fields )
53             {
54             *{ __PACKAGE__.'::'.$method } = sub {
55 30033     30033   107709 my ( $self ) = @_;
56 30033         45197 return $self->{ $method };
57             };
58             }
59             }
60              
61             #-- public methods -------------------------------------------------------------
62              
63             #-- private attributes ---------------------------------------------------------
64              
65             #-- private methods ------------------------------------------------------------
66              
67              
68              
69             1;
70              
71             __END__
72              
73             =head1 SYNOPSIS
74              
75             use 5.010;
76             use strict;
77             use warnings;
78              
79             use Kafka qw(
80             $DEFAULT_MAX_BYTES
81             );
82             use Kafka::Connection;
83             use Kafka::Consumer;
84              
85             #-- Connection
86             my $connection = Kafka::Connection->new( host => 'localhost' );
87              
88             #-- Consumer
89             my $consumer = Kafka::Consumer->new( Connection => $connection );
90              
91             # The Kafka consumer response has an ARRAY reference type.
92             # For the fetch response array has the class name Kafka::Message elements.
93              
94             # Consuming messages
95             my $messages = $consumer->fetch(
96             'mytopic', # topic
97             0, # partition
98             0, # offset
99             $DEFAULT_MAX_BYTES # Maximum size of MESSAGE(s) to receive
100             );
101             if ( $messages ) {
102             foreach my $message ( @$messages ) {
103             if ( $message->valid ) {
104             say 'key : ', $message->key;
105             say 'payload : ', $message->payload;
106             say 'offset : ', $message->offset;
107             say 'next_offset: ', $message->next_offset;
108             } else {
109             say 'error : ', $message->error;
110             }
111             }
112             }
113              
114             # Closes and cleans up
115             undef $consumer;
116             $connection->close;
117             undef $connection;
118              
119             =head1 DESCRIPTION
120              
121             This module is not intended to be used by the end user.
122              
123             L<Kafka::Message|Kafka::Message> class implements API for L<Kafka|Kafka> message.
124              
125             C<fetch> method of the L<Consumer|Kafka::Consumer> client returns reference to an array of objects of this class.
126              
127             The main features of the C<Kafka::Message> class are:
128              
129             =over 3
130              
131             =item *
132              
133             Represents Apache Kafka Message structure. Description of the structure
134             is available at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets>
135              
136             =back
137              
138             =head2 CONSTRUCTOR
139              
140             =head3 C<new ( \%arg )>
141              
142             Creates a new C<Kafka::Message> object.
143             C<new()> takes an argument - HASH reference with the message attributes corresponding to
144             L<accessors|/METHODS>.
145              
146             =head2 METHODS
147              
148             =head3 C<payload>
149              
150             A simple message received from the Apache Kafka server.
151              
152             =head3 C<key>
153              
154             The key is an optional message key that was used for partition assignment.
155             The key can be an empty string.
156              
157             =head3 Timestamp
158              
159             Integer of BigInt on 32 bits platforms: the message timestamp ( might be -1 if
160             the message has no timestamp). Requires Kafka version > 0.10.0 and timestamp
161             enabled in the topic messages format.
162              
163             =head3 C<valid>
164              
165             Boolean value: indicates whether received message is valid or not.
166              
167             =head3 C<error>
168              
169             A description why message is invalid.
170              
171             =head3 C<offset>
172              
173             The offset of the message in the Apache Kafka server.
174              
175             =head3 C<next_offset>
176              
177             The offset of the next message in the Apache Kafka server.
178              
179             =head3 C<Attributes>
180              
181             This holds metadata attributes about the message.
182             The lowest 2 bits contain the compression codec used for the message.
183             The other bits are currently unused.
184              
185             =head3 C<HighwaterMarkOffset>
186              
187             The offset at the end of the log for this partition.
188             This can be used by the client to determine how many messages behind the end of the log they are.
189              
190             =head3 C<MagicByte>
191              
192             This is version id used to allow backwards compatible evolution of the message binary format.
193              
194             =head1 DIAGNOSTICS
195              
196             In order to achieve better performance, constructor of this module does not perform validation of
197             arguments.
198              
199             =head1 SEE ALSO
200              
201             The basic operation of the Kafka package modules:
202              
203             L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
204              
205             L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
206              
207             L<Kafka::Producer|Kafka::Producer> - interface for producing client.
208              
209             L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
210              
211             L<Kafka::Message|Kafka::Message> - interface to access Kafka message
212             properties.
213              
214             L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
215             protocol on 32 bit systems.
216              
217             L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
218             Apache Kafka's Protocol.
219              
220             L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
221              
222             L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
223              
224             L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
225             by several package modules.
226              
227             A wealth of detail about the Apache Kafka and the Kafka Protocol:
228              
229             Main page at L<http://kafka.apache.org/>
230              
231             Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
232              
233             =head1 SOURCE CODE
234              
235             Kafka package is hosted on GitHub:
236             L<https://github.com/TrackingSoft/Kafka>
237              
238             =head1 AUTHOR
239              
240             Sergey Gladkov
241              
242             Please use GitHub project link above to report problems or contact authors.
243              
244             =head1 CONTRIBUTORS
245              
246             Alexander Solovey
247              
248             Jeremy Jordan
249              
250             Sergiy Zuban
251              
252             Vlad Marchenko
253              
254             =head1 COPYRIGHT AND LICENSE
255              
256             Copyright (C) 2012-2017 by TrackingSoft LLC.
257              
258             This package is free software; you can redistribute it and/or modify it under
259             the same terms as Perl itself. See I<perlartistic> at
260             L<http://dev.perl.org/licenses/artistic.html>.
261              
262             This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
263             without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
264             PARTICULAR PURPOSE.
265              
266             =cut