Skip to content

Commit fef2ab8

Browse files
author
admitrov
committed
Migrate embedded-kafka to use ConfluentKafkaContainer and override containerIsStarting for advertised listeners configuration
Replace KafkaContainer with ConfluentKafkaContainer. Override containerIsStarting() instead of getBootstrapServers() to dynamically configure advertised listeners using Transferable script injection. Add CONTROLLER listener configuration for KRaft mode. Update all method signatures and type casts from KafkaContainer to ConfluentKafkaContainer.
1 parent 42a8e50 commit fef2ab8

1 file changed

Lines changed: 32 additions & 22 deletions

File tree

embedded-kafka/src/main/java/com/playtika/testcontainer/kafka/configuration/KafkaContainerConfiguration.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.playtika.testcontainer.kafka.configuration;
22

3+
import com.github.dockerjava.api.command.InspectContainerResponse;
34
import com.playtika.testcontainer.common.utils.ContainerUtils;
45
import com.playtika.testcontainer.kafka.KafkaTopicsConfigurer;
56
import com.playtika.testcontainer.kafka.checks.KafkaStatusCheck;
@@ -23,7 +24,8 @@
2324
import org.springframework.core.env.MapPropertySource;
2425
import org.testcontainers.containers.GenericContainer;
2526
import org.testcontainers.containers.Network;
26-
import org.testcontainers.kafka.KafkaContainer;
27+
import org.testcontainers.images.builder.Transferable;
28+
import org.testcontainers.kafka.ConfluentKafkaContainer;
2729
import org.testcontainers.toxiproxy.ToxiproxyContainer;
2830
import org.testcontainers.utility.MountableFile;
2931

@@ -128,7 +130,7 @@ ToxiproxyClientProxy kafkaContainerSaslProxy(ToxiproxyClient toxiproxyClient,
128130
}
129131

130132
@Bean(name = KAFKA_BEAN_NAME, destroyMethod = "stop")
131-
public KafkaContainer kafka(
133+
public ConfluentKafkaContainer kafka(
132134
KafkaStatusCheck kafkaStatusCheck,
133135
KafkaConfigurationProperties kafkaProperties,
134136
ZookeeperConfigurationProperties zookeeperProperties,
@@ -150,25 +152,30 @@ public KafkaContainer kafka(
150152
// All properties: https://docs.confluent.io/platform/current/installation/configuration/
151153
// Kafka Broker properties: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html
152154

153-
KafkaContainer kafka = new KafkaContainer(ContainerUtils.getDockerImageName(kafkaProperties)) {
155+
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(ContainerUtils.getDockerImageName(kafkaProperties)) {
154156
@Override
155-
public String getBootstrapServers() {
156-
List<String> servers = new ArrayList<>();
157-
servers.add("EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(kafkaExternalPort));
158-
servers.add("EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextKafkaExternalPort));
159-
servers.add("INTERNAL_SASL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + saslPlaintextKafkaInternalPort);
160-
servers.add("INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort);
161-
162-
if (plainTextProxy != null) {
163-
servers.add("TOXIPROXY_INTERNAL_PLAINTEXT://" + getHost() + ":" + plainTextProxy.getProxyPort());
164-
}
165-
if (saslProxy != null) {
166-
servers.add("TOXIPROXY_INTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + saslProxy.getProxyPort());
167-
}
168-
169-
return String.join(",", servers);
157+
protected void containerIsStarting(InspectContainerResponse containerInfo) {
158+
String hostname = containerInfo.getConfig().getHostName();
159+
List<String> advertisedListeners = new ArrayList<>();
160+
advertisedListeners.add("EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(kafkaExternalPort));
161+
advertisedListeners.add("EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextKafkaExternalPort));
162+
advertisedListeners.add("INTERNAL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + kafkaInternalPort);
163+
advertisedListeners.add("INTERNAL_SASL_PLAINTEXT://" + KAFKA_HOST_NAME + ":" + saslPlaintextKafkaInternalPort);
164+
advertisedListeners.add("TOXIPROXY_INTERNAL_PLAINTEXT://" + (plainTextProxy != null
165+
? getHost() + ":" + plainTextProxy.getProxyPort()
166+
: KAFKA_HOST_NAME + ":" + toxiProxyKafkaInternalPort));
167+
advertisedListeners.add("TOXIPROXY_INTERNAL_SASL_PLAINTEXT://" + (saslProxy != null
168+
? getHost() + ":" + saslProxy.getProxyPort()
169+
: KAFKA_HOST_NAME + ":" + toxiProxySaslPlaintextKafkaInternalPort));
170+
advertisedListeners.add("BROKER://" + hostname + ":9092");
171+
172+
String command = "#!/bin/bash\n"
173+
+ "export KAFKA_ADVERTISED_LISTENERS=" + String.join(",", advertisedListeners) + "\n"
174+
+ "/etc/confluent/docker/run\n";
175+
copyFileToContainer(Transferable.of(command, 0777), "/tmp/testcontainers_start.sh");
170176
}
171177
}
178+
172179
.withCreateContainerCmdModifier(cmd -> cmd.withUser(kafkaProperties.getDockerUser()))
173180
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(KAFKA_HOST_NAME))
174181
//see: https://stackoverflow.com/questions/41868161/kafka-in-kubernetes-cluster-how-to-publish-consume-messages-from-outside-of-kub
@@ -180,6 +187,7 @@ public String getBootstrapServers() {
180187
"INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT," +
181188
"INTERNAL_PLAINTEXT:PLAINTEXT," +
182189
"BROKER:PLAINTEXT," +
190+
"CONTROLLER:PLAINTEXT," +
183191
"TOXIPROXY_INTERNAL_PLAINTEXT:PLAINTEXT," +
184192
"TOXIPROXY_INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT"
185193
)
@@ -190,8 +198,10 @@ public String getBootstrapServers() {
190198
"INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaInternalPort + "," +
191199
"TOXIPROXY_INTERNAL_PLAINTEXT://0.0.0.0:" + toxiProxyKafkaInternalPort + "," +
192200
"TOXIPROXY_INTERNAL_SASL_PLAINTEXT://0.0.0.0:" + toxiProxySaslPlaintextKafkaInternalPort + "," +
193-
"BROKER://0.0.0.0:9092"
201+
"BROKER://0.0.0.0:9092," +
202+
"CONTROLLER://0.0.0.0:9099"
194203
)
204+
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9099")
195205
.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
196206
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
197207
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaProperties.getOffsetsTopicReplicationFactor()))
@@ -215,12 +225,12 @@ public String getBootstrapServers() {
215225
kafkaFileSystemBind(kafkaProperties, kafka);
216226
zookeperFileSystemBind(zookeeperProperties, kafka);
217227

218-
kafka = (KafkaContainer) configureCommonsAndStart(kafka, kafkaProperties, log);
228+
kafka = (ConfluentKafkaContainer) configureCommonsAndStart(kafka, kafkaProperties, log);
219229
registerKafkaEnvironment(kafka, environment, kafkaProperties);
220230
return kafka;
221231
}
222232

223-
private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaProperties, KafkaContainer kafka) {
233+
private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaProperties, ConfluentKafkaContainer kafka) {
224234
KafkaConfigurationProperties.FileSystemBind fileSystemBind = kafkaProperties.getFileSystemBind();
225235
if (fileSystemBind.isEnabled()) {
226236
String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
@@ -233,7 +243,7 @@ private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaProperties, K
233243
}
234244
}
235245

236-
private void zookeperFileSystemBind(ZookeeperConfigurationProperties zookeeperProperties, KafkaContainer kafka) {
246+
private void zookeperFileSystemBind(ZookeeperConfigurationProperties zookeeperProperties, ConfluentKafkaContainer kafka) {
237247
ZookeeperConfigurationProperties.FileSystemBind zookeeperFileSystemBind = zookeeperProperties.getFileSystemBind();
238248
if (zookeeperFileSystemBind.isEnabled()) {
239249
String currentTimestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));

0 commit comments

Comments
 (0)