File Coverage

blib/lib/Metabrik/Server/Kafka.pm
Criterion Covered Total %
statement 9 107 8.4
branch 0 46 0.0
condition 0 6 0.0
subroutine 3 13 23.0
pod 1 10 10.0
total 13 182 7.1


line stmt bran cond sub pod time code
1             #
2             # $Id$
3             #
4             # server::kafka Brik
5             #
6             package Metabrik::Server::Kafka;
7 1     1   771 use strict;
  1         3  
  1         30  
8 1     1   5 use warnings;
  1         1  
  1         39  
9              
10 1     1   7 use base qw(Metabrik::Shell::Command Metabrik::System::Package);
  1         2  
  1         1289  
11              
12             sub brik_properties {
13             return {
14 0     0 1   revision => '$Revision$',
15             tags => [ qw(unstable) ],
16             author => 'GomoR ',
17             license => 'http://opensource.org/licenses/BSD-3-Clause',
18             attributes => {
19             datadir => [ qw(datadir) ],
20             conf_file => [ qw(file) ],
21             conf_file_zookeeper => [ qw(file) ],
22             },
23             attributes_default => {
24             conf_file => 'server.properties',
25             conf_file_zookeeper => 'zookeeper.properties',
26             },
27             commands => {
28             install => [ ],
29             generate_conf => [ ],
30             generate_conf_zookeeper => [ ],
31             start => [ ],
32             status => [ ],
33             stop => [ ],
34             start_zookeeper => [ ],
35             status_zookeeper => [ ],
36             stop_zookeeper => [ ],
37             },
38             require_modules => {
39             'Metabrik::Devel::Git' => [ ],
40             'Metabrik::File::Text' => [ ],
41             'Metabrik::System::File' => [ ],
42             'Metabrik::System::Process' => [ ],
43             },
44             require_binaries => {
45             },
46             optional_binaries => {
47             },
48             need_packages => {
49             ubuntu => [ qw(gradle openjdk-8-jdk) ],
50             freebsd => [ qw(gradle openjdk8) ],
51             },
52             };
53             }
54              
55             sub install {
56 0     0 0   my $self = shift;
57              
58 0 0         $self->SUPER::install(@_) or return;
59              
60 0           my $datadir = $self->datadir;
61 0           my $url = 'https://github.com/apache/kafka.git';
62              
63 0 0         my $dg = Metabrik::Devel::Git->new_from_brik_init($self) or return;
64 0           $dg->datadir($datadir);
65              
66 0           my $output_dir = "$datadir/kafka";
67 0 0         my $repo = $dg->update_or_clone($url, $output_dir) or return;
68              
69             #
70             # Then build with gradle:
71             #
72             # cd ~/metabrik/server-kakfa/kafka
73             # gradle
74             # ./gradlew jar
75             #
76              
77 0           return $repo;
78             }
79              
80             sub generate_conf {
81 0     0 0   my $self = shift;
82 0           my ($conf_file) = @_;
83              
84 0   0       $conf_file ||= $self->conf_file;
85 0 0         $self->brik_help_set_undef_arg('generate_conf', $conf_file) or return;
86              
87 0           my $datadir = $self->datadir;
88 0           my $basedir = "$datadir/kafka";
89 0           $conf_file = "$basedir/config/$conf_file";
90              
91             # https://kafka.apache.org/documentation/#configuration
92              
93 0           my $conf =<
94             # The id of the broker. This must be set to a unique integer for each broker.
95             broker.id=1
96              
97             # Increase the message size limit
98             message.max.bytes=20000000
99             replica.fetch.max.bytes=30000000
100              
101             # Retention time
102             log.retention.hours=24
103              
104             log.dirs=$datadir/log
105             listeners=PLAINTEXT://127.0.0.1:9092
106              
107             zookeeper.connect=localhost:2181
108              
109             # For single instance of Kakfa (no cluster), default RF on creation
110             offsets.topic.replication.factor=1
111              
112             # So we can actually delete topics
113             delete.topic.enable=true
114             EOF
115             ;
116              
117 0 0         my $ft = Metabrik::File::Text->new_from_brik_init($self) or return;
118 0           $ft->append(0);
119 0           $ft->overwrite(1);
120              
121 0 0         $ft->write($conf, $conf_file) or return;
122              
123 0 0         my $sf = Metabrik::System::File->new_from_brik_init($self) or return;
124 0 0         $sf->mkdir("$datadir/log") or return;
125              
126 0           return $conf_file;
127             }
128              
129             sub generate_conf_zookeeper {
130 0     0 0   my $self = shift;
131 0           my ($conf_file) = @_;
132              
133 0   0       $conf_file ||= $self->conf_file_zookeeper;
134 0 0         $self->brik_help_set_undef_arg('generate_conf_zookeeper', $conf_file) or return;
135              
136 0           my $datadir = $self->datadir;
137 0           my $basedir = "$datadir/kafka";
138 0           $conf_file = "$basedir/config/$conf_file";
139              
140             # https://kafka.apache.org/documentation/#configuration
141              
142 0           my $conf =<
143             # the directory where the snapshot is stored.
144             dataDir=$datadir/zookeeper
145             # the port at which the clients will connect
146             clientPort=2181
147             # disable the per-ip limit on the number of connections since this is a non-production config
148             maxClientCnxns=0
149             maxClientCnxns=0
150             server.1=localhost:2888:3888
151             initLimit=5
152             syncLimit=2
153             EOF
154             ;
155              
156 0 0         my $ft = Metabrik::File::Text->new_from_brik_init($self) or return;
157 0           $ft->append(0);
158 0           $ft->overwrite(1);
159              
160 0 0         $ft->write($conf, $conf_file) or return;
161              
162 0 0         my $sf = Metabrik::System::File->new_from_brik_init($self) or return;
163 0 0         $sf->mkdir("$datadir/zookeeper") or return;
164              
165 0           return $conf_file;
166             }
167              
168             sub start {
169 0     0 0   my $self = shift;
170              
171 0           my $datadir = $self->datadir;
172 0           my $basedir = "$datadir/kafka";
173 0           my $conf_file = $self->conf_file;
174              
175 0           my $cmd = "$basedir/bin/kafka-server-start.sh $basedir/config/$conf_file";
176              
177 0           return $self->system_in_background($cmd);
178             }
179              
180             sub start_zookeeper {
181 0     0 0   my $self = shift;
182              
183 0           my $datadir = $self->datadir;
184 0           my $basedir = "$datadir/kafka";
185 0           my $conf_file = $self->conf_file_zookeeper;
186              
187 0           my $cmd = "$basedir/bin/zookeeper-server-start.sh $basedir/config/$conf_file";
188              
189 0           return $self->system_in_background($cmd);
190             }
191              
192             sub status {
193 0     0 0   my $self = shift;
194              
195 0           my $conf_file = $self->conf_file;
196              
197 0           my $datadir = $self->datadir;
198 0           my $basedir = "$datadir/kafka";
199 0           $conf_file = "$basedir/config/$conf_file";
200              
201 0 0         my $sp = Metabrik::System::Process->new_from_brik_init($self) or return;
202              
203 0           my $string = "kafka.Kafka $conf_file";
204 0 0         if ($sp->is_running_from_string($string)) {
205 0           $sp->verbose_process_is_running;
206 0           return 1;
207             }
208              
209 0           $sp->verbose_process_is_not_running;
210 0           return 0;
211             }
212              
213             sub status_zookeeper {
214 0     0 0   my $self = shift;
215              
216 0           my $conf_file = $self->conf_file_zookeeper;
217              
218 0           my $datadir = $self->datadir;
219 0           my $basedir = "$datadir/kafka";
220 0           $conf_file = "$basedir/config/$conf_file";
221              
222 0 0         my $sp = Metabrik::System::Process->new_from_brik_init($self) or return;
223              
224 0           my $string = "org.apache.zookeeper.server.quorum.QuorumPeerMain $conf_file";
225 0 0         if ($sp->is_running_from_string($string)) {
226 0           $sp->verbose_process_is_running;
227 0           return 1;
228             }
229              
230 0           $sp->verbose_process_is_not_running;
231 0           return 0;
232             }
233              
234             sub stop {
235 0     0 0   my $self = shift;
236              
237 0 0         if (! $self->status) {
238 0           return $self->info_process_is_not_running;
239             }
240              
241 0           my $conf_file = $self->conf_file;
242              
243 0           my $datadir = $self->datadir;
244 0           my $basedir = "$datadir/kafka";
245 0           $conf_file = "$basedir/config/$conf_file";
246              
247 0 0         my $sp = Metabrik::System::Process->new_from_brik_init($self) or return;
248              
249 0           my $string = "kafka.Kafka $conf_file";
250 0 0         my $pid = $sp->get_pid_from_string($string) or return;
251              
252 0           return $sp->kill($pid);
253             }
254              
255             sub stop_zookeeper {
256 0     0 0   my $self = shift;
257              
258 0 0         if (! $self->status) {
259 0           return $self->info_process_is_not_running;
260             }
261              
262 0           my $conf_file = $self->conf_file_zookeeper;
263              
264 0           my $datadir = $self->datadir;
265 0           my $basedir = "$datadir/kafka";
266 0           $conf_file = "$basedir/config/$conf_file";
267              
268 0 0         my $sp = Metabrik::System::Process->new_from_brik_init($self) or return;
269              
270 0           my $string = "org.apache.zookeeper.server.quorum.QuorumPeerMain $conf_file";
271 0 0         my $pid = $sp->get_pid_from_string($string) or return;
272              
273 0           return $sp->kill($pid);
274             }
275              
276             1;
277              
278             __END__