I'm trying to create ConfluentKafkaContainer in which I need to have an additional listener. The problem is that I require this listener to use SASL_PLAINTEXT not the plaintext. I see that KafkaHelper pushes PLAINTEXT for this listener into the security protocol map, even if I manually defined it as SASL_PLAINTEXT. Is there a workaround for this, or is there an idea to have this working?
https://github.com/testcontainers/testcontainers-java/blob/6e104da1d6e8a4188451f556e9c7cb4ce8eb3164/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java#L81C11-L81C59
Network network = new Network();
var kafka = new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
.withNetwork(network)
.withNetworkAliases(HOST_ALIAS)
.withExposedPorts(9092, 9093, 9095)
.withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\" " +
"user_admin=\"admin-secret\";")
.withEnv("KAFKA_LISTENER_NAME_TC-0_PLAIN_SASL_JAAS_CONFIG", "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"admin\" " +
"password=\"admin-secret\" " +
"user_admin=\"admin-secret\";")
.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
.withListener("kafka:9095")
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,TC-0:SASL_PLAINTEXT");
I'm trying to create ConfluentKafkaContainer in which I need to have an additional listener. The problem is that I require this listener to use SASL_PLAINTEXT not the plaintext. I see that KafkaHelper pushes PLAINTEXT for this listener into the security protocol map, even if I manually defined it as SASL_PLAINTEXT. Is there a workaround for this, or is there an idea to have this working?
https://github.com/testcontainers/testcontainers-java/blob/6e104da1d6e8a4188451f556e9c7cb4ce8eb3164/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java#L81C11-L81C59