-
-
Notifications
You must be signed in to change notification settings - Fork 345
feat: Add Apache Kafka (KRaft) builder (#1347) #1348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
IbrayevRamil
wants to merge
4
commits into
testcontainers:develop
from
IbrayevRamil:feature/add-apache-kafka-kraft-support
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| using Testcontainers.Kafka; | ||
|
|
||
| namespace Testcontainers.ApacheKafka; | ||
|
|
||
| /// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" /> | ||
| [PublicAPI] | ||
| public sealed class ApacheKafkaBuilder : BaseKafkaBuilder<ApacheKafkaBuilder> | ||
| { | ||
| public override string KafkaImage => "apache/kafka:3.9.0"; | ||
| protected override string ReadyMessage => ".*Transitioning from RECOVERY to RUNNING.*"; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class. | ||
| /// </summary> | ||
| public ApacheKafkaBuilder() : this(new KafkaConfiguration()) | ||
| { | ||
| DockerResourceConfiguration = Init().DockerResourceConfiguration; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="ApacheKafkaBuilder" /> class. | ||
| /// </summary> | ||
| /// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
| private ApacheKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration) | ||
| { | ||
| DockerResourceConfiguration = resourceConfiguration; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override KafkaConfiguration DockerResourceConfiguration { get; } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override ApacheKafkaBuilder Init() | ||
| { | ||
| return base.Init() | ||
| .WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller") | ||
| .WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); | ||
| } | ||
|
|
||
| protected override string GetStartupScript(KafkaContainer container) | ||
| { | ||
| var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>()); | ||
| var kafkaListener = $"PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaConfiguration.KafkaPort)}"; | ||
| var brokerListener = $"BROKER://{container.IpAddress}:{KafkaConfiguration.BrokerPort}"; | ||
| return $""" | ||
| #!/bin/bash | ||
| export KAFKA_ADVERTISED_LISTENERS={kafkaListener},{brokerListener},{additionalAdvertisedListeners} | ||
| echo '' > /etc/kafka/docker/ensure | ||
| exec /etc/kafka/docker/run | ||
| """; | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override ApacheKafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfiguration newValue) | ||
| { | ||
| return new ApacheKafkaBuilder(new KafkaConfiguration(oldValue, newValue)); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| namespace Testcontainers.Kafka; | ||
|
|
||
| /// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" /> | ||
| [PublicAPI] | ||
| public abstract class BaseKafkaBuilder<TBuilderEntity> : ContainerBuilder<TBuilderEntity, KafkaContainer, KafkaConfiguration> | ||
| where TBuilderEntity : BaseKafkaBuilder<TBuilderEntity> | ||
| { | ||
| public abstract string KafkaImage { get; } | ||
| public const string StartupScriptPath = "/testcontainers.sh"; | ||
|
|
||
| protected abstract string ReadyMessage { get; } | ||
|
|
||
| protected abstract string GetStartupScript(KafkaContainer container); | ||
|
|
||
| private const string ProtocolPrefix = "TC"; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="BaseKafkaBuilder{TBuilderEntity}" /> class. | ||
| /// </summary> | ||
| /// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
| protected BaseKafkaBuilder(KafkaConfiguration resourceConfiguration) : base(resourceConfiguration) | ||
| { | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| public override KafkaContainer Build() | ||
| { | ||
| Validate(); | ||
| return new KafkaContainer(DockerResourceConfiguration); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Adds a listener to the Kafka configuration in the format <c>host:port</c>. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// The host will be included as a network alias, allowing additional connections | ||
| /// to the Kafka broker within the same container network. | ||
| /// | ||
| /// This method is useful for registering custom listeners beyond the default ones, | ||
| /// enabling specific connection points for Kafka brokers. | ||
| /// | ||
| /// Default listeners include: | ||
| /// - <c>PLAINTEXT://:9092</c> | ||
| /// - <c>BROKER://:9093</c> | ||
| /// - <c>CONTROLLER://:9094</c> | ||
| /// </remarks> | ||
| /// <param name="kafka">The MsSql database.</param> | ||
| /// <returns>A configured instance of <see cref="BaseKafkaBuilder{TBuilderEntity}" />.</returns> | ||
| public TBuilderEntity WithListener(string kafka) | ||
| { | ||
| var index = DockerResourceConfiguration.Listeners?.Count() ?? 0; | ||
| var protocol = $"{ProtocolPrefix}-{index}"; | ||
| var listener = $"{protocol}://{kafka}"; | ||
| var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT"; | ||
|
|
||
| var listeners = new[] { listener }; | ||
| var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap }; | ||
|
|
||
| var host = kafka.Split(':')[0]; | ||
|
|
||
| var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"] | ||
| .Split(',') | ||
| .Concat(listeners); | ||
|
|
||
| var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] | ||
| .Split(',') | ||
| .Concat(listenersSecurityProtocolMap); | ||
|
|
||
| return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners)) | ||
| .WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners)) | ||
| .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap)) | ||
| .WithNetworkAliases(host); | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override TBuilderEntity Init() | ||
| { | ||
| return base.Init() | ||
| .WithImage(KafkaImage) | ||
| .WithPortBinding(KafkaConfiguration.KafkaPort, true) | ||
| .WithPortBinding(KafkaConfiguration.BrokerPort, true) | ||
| .WithEnvironment("KAFKA_LISTENERS", | ||
| $"PLAINTEXT://:{KafkaConfiguration.KafkaPort},BROKER://:{KafkaConfiguration.BrokerPort},CONTROLLER://:{KafkaConfiguration.ControllerPort}") | ||
| .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") | ||
| .WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") | ||
| .WithEnvironment("KAFKA_BROKER_ID", "1") | ||
| .WithEnvironment("KAFKA_NODE_ID", "1") | ||
| .WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + KafkaConfiguration.ControllerPort) | ||
| .WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") | ||
| .WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") | ||
| .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") | ||
| .WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") | ||
| .WithEnvironment("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", long.MaxValue.ToString()) | ||
| .WithEnvironment("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") | ||
| .WithEntrypoint("/bin/sh", "-c") | ||
| .WithCommand("while [ ! -f " + StartupScriptPath + " ]; do sleep 0.1; done; " + StartupScriptPath) | ||
| .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged(ReadyMessage)) | ||
| .WithStartupCallback((container, ct) => | ||
| container.CopyAsync(Encoding.Default.GetBytes(GetStartupScript(container)), StartupScriptPath, Unix.FileMode755, ct)); | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override TBuilderEntity Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration) | ||
| { | ||
| return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); | ||
| } | ||
|
|
||
| /// <inheritdoc /> | ||
| protected override TBuilderEntity Clone(IContainerConfiguration resourceConfiguration) | ||
| { | ||
| return Merge(DockerResourceConfiguration, new KafkaConfiguration(resourceConfiguration)); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.