|
| 1 | +# Licensed to the Apache Software Foundation (ASF) under one or more |
| 2 | +# contributor license agreements. See the NOTICE file distributed with |
| 3 | +# this work for additional information regarding copyright ownership. |
| 4 | +# The ASF licenses this file to You under the Apache License, Version 2.0 |
| 5 | +# (the "License"); you may not use this file except in compliance with |
| 6 | +# the License. You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | + |
| 16 | +import os |
| 17 | +from looseversion import LooseVersion |
| 18 | +from typing import NamedTuple |
| 19 | + |
| 20 | +from ducktape.utils.util import wait_until |
| 21 | + |
| 22 | +from ignitetest.services.utils.ducktests_service import DucktestsService |
| 23 | +from ignitetest.services.utils.log_utils import monitor_log |
| 24 | +from ignitetest.services.utils.path import PathAware |
| 25 | + |
| 26 | + |
| 27 | +class KafkaSettings: |
| 28 | + """ |
| 29 | + Settings for kafka nodes. |
| 30 | + """ |
| 31 | + def __init__(self, **kwargs): |
| 32 | + self.zookeeper_connection_string = kwargs.get("zookeeper_connection_string") |
| 33 | + self.port = kwargs.get("port", 9092) |
| 34 | + |
| 35 | + self.host = None |
| 36 | + self.broker_id = None |
| 37 | + |
| 38 | + version = kwargs.get("version") |
| 39 | + if version: |
| 40 | + if isinstance(version, str): |
| 41 | + version = LooseVersion(version) |
| 42 | + |
| 43 | + self.version = version |
| 44 | + else: |
| 45 | + self.version = LooseVersion("3.9.1") |
| 46 | + |
| 47 | + |
| 48 | +class KafkaService(DucktestsService, PathAware): |
| 49 | + """ |
| 50 | + Kafka service. |
| 51 | + """ |
| 52 | + LOG_FILENAME = "kafka.log" |
| 53 | + |
| 54 | + def __init__(self, context, num_nodes, settings: KafkaSettings, start_timeout_sec=60): |
| 55 | + super().__init__(context, num_nodes) |
| 56 | + self.settings = settings |
| 57 | + self.start_timeout_sec = start_timeout_sec |
| 58 | + self.init_logs_attribute() |
| 59 | + |
| 60 | + @property |
| 61 | + def product(self): |
| 62 | + return "%s-%s" % ("kafka", self.settings.version) |
| 63 | + |
| 64 | + @property |
| 65 | + def globals(self): |
| 66 | + return self.context.globals |
| 67 | + |
| 68 | + @property |
| 69 | + def log_config_file(self): |
| 70 | + return os.path.join(self.config_dir, "log4j.properties") |
| 71 | + |
| 72 | + @property |
| 73 | + def config_file(self): |
| 74 | + return os.path.join(self.config_dir, "server.properties") |
| 75 | + |
| 76 | + def start(self, **kwargs): |
| 77 | + super().start(**kwargs) |
| 78 | + self.logger.info("Waiting for Kafka ...") |
| 79 | + |
| 80 | + for node in self.nodes: |
| 81 | + self.await_kafka(node, self.start_timeout_sec) |
| 82 | + |
| 83 | + self.logger.info("Kafka cluster is formed.") |
| 84 | + |
| 85 | + def start_node(self, node, **kwargs): |
| 86 | + idx = self.idx(node) |
| 87 | + |
| 88 | + self.logger.info("Starting Kafka broker %d on %s", idx, node.account.hostname) |
| 89 | + |
| 90 | + self.init_persistent(node) |
| 91 | + |
| 92 | + self.settings.host = node.account.externally_routable_ip |
| 93 | + self.settings.broker_id = idx |
| 94 | + |
| 95 | + config_file = self.render('server.properties.j2', settings=self.settings, data_dir=self.work_dir) |
| 96 | + node.account.create_file(self.config_file, config_file) |
| 97 | + self.logger.info("Kafka config %s", config_file) |
| 98 | + |
| 99 | + log_config_file = self.render('log4j.properties.j2', log_dir=self.log_dir) |
| 100 | + node.account.create_file(self.log_config_file, log_config_file) |
| 101 | + |
| 102 | + start_cmd = f"nohup java -Dlog4j.configuration=file:{self.log_config_file} " \ |
| 103 | + f"-cp {os.path.join(self.home_dir, 'libs')}/*:{self.config_dir} " \ |
| 104 | + f"kafka.Kafka {self.config_file} >/tmp/log 2>&1 &" |
| 105 | + |
| 106 | + node.account.ssh(start_cmd) |
| 107 | + |
| 108 | + def wait_node(self, node, timeout_sec=20): |
| 109 | + wait_until(lambda: not self.alive(node), timeout_sec=timeout_sec) |
| 110 | + |
| 111 | + return not self.alive(node) |
| 112 | + |
| 113 | + def await_kafka(self, node, timeout): |
| 114 | + """ |
| 115 | + Await kafka broker started on node. |
| 116 | + :param node: Kafka service node. |
| 117 | + :param timeout: Wait timeout. |
| 118 | + """ |
| 119 | + with monitor_log(node, self.log_file, from_the_beginning=True) as monitor: |
| 120 | + monitor.wait_until("KafkaServer.*started", |
| 121 | + timeout_sec=timeout, |
| 122 | + err_msg=f"Kafka cluster was not formed on {node.account.hostname}") |
| 123 | + |
| 124 | + def create_topic(self, name, partitions=1, replication_factor=1, retention_ms=None): |
| 125 | + """ |
| 126 | + Create kafka topic |
| 127 | + :param name: Topic name |
| 128 | + :param partitions: Number of partitions |
| 129 | + :param replication_factor: Replication factor |
| 130 | + :param retention_ms: Retention in milliseconds |
| 131 | + """ |
| 132 | + create_topic_cmd = f"{os.path.join(self.home_dir, 'bin', 'kafka-topics.sh')} --create " \ |
| 133 | + f"--topic {name} --bootstrap-server {self.connection_string()} " \ |
| 134 | + f"--partitions {partitions} --replication-factor {replication_factor}" |
| 135 | + |
| 136 | + if retention_ms is not None: |
| 137 | + create_topic_cmd = create_topic_cmd + f" --config retention.ms={retention_ms}" |
| 138 | + |
| 139 | + self.nodes[0].account.ssh(create_topic_cmd) |
| 140 | + |
| 141 | + def offsets(self, topics=None): |
| 142 | + """ |
| 143 | + Return offset info for all consumer groups and partitions. |
| 144 | + :param topics: List of topics to process. Return info about all topics if None. |
| 145 | + """ |
| 146 | + kafka_consumer_groups_cmd =\ |
| 147 | + f"{os.path.join(self.home_dir, 'bin', 'kafka-consumer-groups.sh')} --describe " \ |
| 148 | + f"--all-groups --bootstrap-server {self.connection_string()} 2>/dev/null |" \ |
| 149 | + f"grep . | grep -v GROUP | sed -E 's/ */ /g'" |
| 150 | + |
| 151 | + def callback(line): |
| 152 | + fields = line.strip().split(' ') |
| 153 | + |
| 154 | + return ConsumerOffsetInfo( |
| 155 | + group=fields[0], |
| 156 | + topic=fields[1], |
| 157 | + part=int(fields[2]), |
| 158 | + current_offset=int(fields[3]), |
| 159 | + log_end_offset=int(fields[4]), |
| 160 | + lag=int(fields[5]), |
| 161 | + consumer_id=fields[6] if fields[6] != '-' else None, |
| 162 | + host=fields[7] if fields[7] != '-' else None, |
| 163 | + client_id=fields[8] if fields[8] != '-' else None |
| 164 | + ) |
| 165 | + |
| 166 | + offsets = self.nodes[0].account.ssh_capture(kafka_consumer_groups_cmd, callback=callback) |
| 167 | + |
| 168 | + return [o for o in offsets if not topics or (o.topic in topics)] |
| 169 | + |
| 170 | + @property |
| 171 | + def log_file(self): |
| 172 | + """ |
| 173 | + :return: current log file of node. |
| 174 | + """ |
| 175 | + return os.path.join(self.log_dir, self.LOG_FILENAME) |
| 176 | + |
| 177 | + @staticmethod |
| 178 | + def java_class_name(): |
| 179 | + """ The class name of the Kafka broker. """ |
| 180 | + return "kafka.Kafka" |
| 181 | + |
| 182 | + def pids(self, node): |
| 183 | + """ |
| 184 | + Get pids of kafka service node. |
| 185 | + :param node: Kafka service node. |
| 186 | + :return: List of pids. |
| 187 | + """ |
| 188 | + return node.account.java_pids(self.java_class_name()) |
| 189 | + |
| 190 | + def alive(self, node): |
| 191 | + """ |
| 192 | + Check if kafka service node is alive. |
| 193 | + :param node: Kafka service node. |
| 194 | + :return: True if node is alive |
| 195 | + """ |
| 196 | + return len(self.pids(node)) > 0 |
| 197 | + |
| 198 | + def connection_string(self): |
| 199 | + """ |
| 200 | + Form a connection string to kafka cluster. |
| 201 | + :return: Connection string. |
| 202 | + """ |
| 203 | + return ','.join([node.account.hostname + ":" + str(self.settings.port) for node in self.nodes]) |
| 204 | + |
| 205 | + def stop_node(self, node, force_stop=True, **kwargs): |
| 206 | + idx = self.idx(node) |
| 207 | + self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) |
| 208 | + |
| 209 | + allow_fail = kwargs["allow_fail"] if "allow_fail" in kwargs else False |
| 210 | + |
| 211 | + node.account.kill_process("kafka", clean_shutdown=not force_stop, allow_fail=allow_fail) |
| 212 | + |
| 213 | + def clean_node(self, node, **kwargs): |
| 214 | + super().clean_node(node, **kwargs) |
| 215 | + |
| 216 | + self.logger.info("Cleaning Kafka node %d on %s", self.idx(node), node.account.hostname) |
| 217 | + node.account.ssh(f"rm -rf -- {self.persistent_root}", allow_fail=False) |
| 218 | + |
| 219 | + |
| 220 | +class ConsumerOffsetInfo(NamedTuple): |
| 221 | + """ |
| 222 | + Consumer offset record. |
| 223 | + """ |
| 224 | + group: str |
| 225 | + topic: str |
| 226 | + part: int |
| 227 | + current_offset: int |
| 228 | + log_end_offset: int |
| 229 | + lag: int |
| 230 | + consumer_id: str |
| 231 | + host: str |
| 232 | + client_id: str |
0 commit comments