Skip to content

Commit 78d1b8f

Browse files
authored
DRILL-8534: Update Kafka to Version 3.9 (#3020)
1 parent 3362348 commit 78d1b8f

3 files changed

Lines changed: 34 additions & 23 deletions

File tree

contrib/storage-kafka/pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
<name>Drill : Contrib : Storage : Kafka</name>
3232

3333
<properties>
34-
<kafka.version>2.8.2</kafka.version>
34+
<kafka.version>3.9.0</kafka.version>
35+
<kafka_scala.version>3.9.0</kafka_scala.version>
3536
<kafka.TestSuite>**/TestKafkaSuite.class</kafka.TestSuite>
3637
</properties>
3738

@@ -80,7 +81,7 @@
8081
<dependency>
8182
<groupId>org.apache.kafka</groupId>
8283
<artifactId>kafka_2.13</artifactId>
83-
<version>${kafka.version}</version>
84+
<version>${kafka_scala.version}</version>
8485
<scope>test</scope>
8586
<exclusions>
8687
<exclusion>

contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.junit.runners.Suite.SuiteClasses;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
45-
import scala.Option;
4645

4746
import java.util.Collections;
4847
import java.util.HashMap;
@@ -83,8 +82,8 @@ public static void initKafka() throws Exception {
8382
embeddedKafkaCluster = new EmbeddedKafkaCluster();
8483
zkClient = KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
8584
false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
86-
"kafka.server", "SessionExpireListener",
87-
Option.<String>empty(), Option.<ZKClientConfig>empty());
85+
"kafka.server", new ZKClientConfig(),
86+
"kafka.server", "SessionExpireListener", false, false);
8887
createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
8988
createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
9089
KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);

contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -70,29 +70,28 @@ public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IO
7070
}
7171

7272
this.props.put("metadata.broker.list", sb.toString());
73-
this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
73+
this.props.put("zookeeper.connect", this.zkHelper.getConnectionString());
7474
logger.info("Initialized Kafka Server");
7575
this.closer = new KafkaAsyncCloser();
7676
}
7777

7878
private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
7979
Properties properties = new Properties();
8080
properties.putAll(props);
81-
properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
82-
properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
83-
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
84-
properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
85-
properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
86-
properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
87-
properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
88-
properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
89-
properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
90-
properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
91-
properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
92-
properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort));
93-
properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
94-
properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
95-
properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
81+
properties.put("leader.imbalance.check.interval.seconds", String.valueOf(1));
82+
properties.put("offsets.topic.num.partitions", String.valueOf(1));
83+
properties.put("offsets.topic.replication.factor", String.valueOf(1));
84+
properties.put("default.replication.factor", String.valueOf(1));
85+
properties.put("group.min.session.timeout.ms", String.valueOf(100));
86+
properties.put("auto.create.topics.enable", Boolean.FALSE);
87+
properties.put("zookeeper.connect", zkHelper.getConnectionString());
88+
properties.put("broker.id", String.valueOf(brokerID + 1));
89+
properties.put("listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + ephemeralBrokerPort);
90+
properties.put("advertised.listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + ephemeralBrokerPort);
91+
properties.put("port", String.valueOf(ephemeralBrokerPort));
92+
properties.put("delete.topic.enable", Boolean.TRUE);
93+
properties.put("log.dirs", getTemporaryDir().getAbsolutePath());
94+
properties.put("log.flush.interval.messages", String.valueOf(1));
9695
brokers.add(getBroker(properties));
9796
}
9897

@@ -119,7 +118,7 @@ public void shutDownCluster() {
119118

120119
public void shutDownBroker(int brokerId) {
121120
brokers.stream()
122-
.filter(broker -> Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) == brokerId)
121+
.filter(broker -> Integer.parseInt(broker.config().getString("broker.id")) == brokerId)
123122
.findAny()
124123
.ifPresent(KafkaServer::shutdown);
125124
}
@@ -145,7 +144,19 @@ public ZookeeperHelper getZkServer() {
145144
public String getKafkaBrokerList() {
146145
return brokers.stream()
147146
.map(KafkaServer::config)
148-
.map(serverConfig -> serverConfig.hostName() + ":" + serverConfig.port())
147+
.map(serverConfig -> {
148+
// Try modern listeners first, fall back to legacy host.name/port
149+
try {
150+
String listeners = serverConfig.getString("listeners");
151+
// Extract host:port from listeners (format: PLAINTEXT://host:port)
152+
return listeners.replaceAll("^[A-Z]+://", "");
153+
} catch (Exception e) {
154+
// Fall back to legacy approach using advertised properties or default host/port
155+
String host = LOCAL_HOST;
156+
int port = serverConfig.getInt("port");
157+
return host + ":" + port;
158+
}
159+
})
149160
.collect(Collectors.joining(","));
150161
}
151162

0 commit comments

Comments
 (0)