diff --git a/lib/container/kafka_container.ex b/lib/container/kafka_container.ex index 9f4d757..ecd870a 100644 --- a/lib/container/kafka_container.ex +++ b/lib/container/kafka_container.ex @@ -1,159 +1,170 @@ defmodule Testcontainers.KafkaContainer do @moduledoc """ Provides functionality for creating and managing Kafka container configurations. + + This implementation uses the official `apache/kafka` Docker image which runs in KRaft mode + by default (no Zookeeper required). This makes Kafka deployment significantly simpler. + + ## Example + + config = KafkaContainer.new() + {:ok, container} = Testcontainers.start_container(config) + + # Get the bootstrap server address + bootstrap_servers = KafkaContainer.bootstrap_servers(container) + + ## With automatic topic creation + + config = + KafkaContainer.new() + |> KafkaContainer.with_topics(["my-topic", "other-topic"]) + + {:ok, container} = Testcontainers.start_container(config) + + ## Note on Port Binding + + This implementation uses a randomly selected fixed host port (between 29000-29999) for + the Kafka listener. This is necessary because the apache/kafka image requires knowing + the advertised listener address at startup time, before the container's dynamic port + mapping is known. + + If you need to use a specific port, you can set it with `with_kafka_port/2`. """ + alias Testcontainers.Container alias Testcontainers.Docker alias Testcontainers.KafkaContainer - alias Testcontainers.CommandWaitStrategy - - @default_image "confluentinc/cp-kafka" - @default_image_with_tag "#{@default_image}:7.4.3" - @default_kafka_port 9092 - @default_broker_port 29092 - @default_broker_id 1 - @default_zookeeper_port 2181 + alias Testcontainers.LogWaitStrategy + + @default_image "apache/kafka" + @default_tag "3.9.0" + @default_image_with_tag "#{@default_image}:#{@default_tag}" + @default_internal_kafka_port 9092 + @default_controller_port 9093 + @default_node_id 1 @default_wait_timeout 60_000 - @default_consensus_strategy :zookeeper_embedded - @default_topic_partitions 1 @default_cluster_id "4L6g3nShT-eMCtK--X86sw" - @start_file_path "tc-start.sh" - @enforce_keys [ :image, :kafka_port, - :broker_port, - :broker_id, - :zookeeper_port, - :zookeeper_host, + :internal_kafka_port, + :controller_port, + :node_id, :cluster_id, - :wait_timeout, - :consensus_strategy, - :default_topic_partitions, - :start_file_path + :wait_timeout ] defstruct [ :image, :kafka_port, - :broker_port, - :broker_id, + :internal_kafka_port, + :controller_port, + :node_id, :cluster_id, - :zookeeper_port, - :zookeeper_host, :wait_timeout, - :consensus_strategy, - :default_topic_partitions, - :start_file_path, + topics: [], reuse: false ] @doc """ Creates a new `KafkaContainer` struct with default configurations. + + A random port between 29000-29999 is selected for the Kafka listener. """ def new do + # Select a random port in a high range to minimize conflicts + kafka_port = Enum.random(29000..29999) + %__MODULE__{ image: @default_image_with_tag, - kafka_port: @default_kafka_port, - broker_port: @default_broker_port, - broker_id: @default_broker_id, - zookeeper_port: @default_zookeeper_port, + kafka_port: kafka_port, + internal_kafka_port: @default_internal_kafka_port, + controller_port: @default_controller_port, + node_id: @default_node_id, cluster_id: @default_cluster_id, wait_timeout: @default_wait_timeout, - consensus_strategy: @default_consensus_strategy, - zookeeper_host: nil, - default_topic_partitions: @default_topic_partitions, - start_file_path: @start_file_path + topics: [] } end @doc """ Overrides the default image used for the Kafka container. - Right now we support only confluentinc images. """ def with_image(%__MODULE__{} = config, image) when is_binary(image) do %{config | image: image} end @doc """ - Overrides the default kafka port used for the Kafka container. + Overrides the host port used for the Kafka container. + + This port will be used on the host machine and also as the advertised listener port. """ def with_kafka_port(%__MODULE__{} = config, kafka_port) when is_integer(kafka_port) do %{config | kafka_port: kafka_port} end @doc """ - Overrides the default kafka port used for the Kafka container. + Overrides the default controller port used for the Kafka container. """ - def with_broker_port(%__MODULE__{} = config, broker_port) when is_integer(broker_port) do - %{config | broker_port: broker_port} + def with_controller_port(%__MODULE__{} = config, controller_port) + when is_integer(controller_port) do + %{config | controller_port: controller_port} end @doc """ - Overrides the default broker id used for the Kafka container. + Overrides the default node id used for the Kafka container. """ - def with_broker_id(%__MODULE__{} = config, broker_id) when is_integer(broker_id) do - %{config | broker_id: broker_id} + def with_node_id(%__MODULE__{} = config, node_id) when is_integer(node_id) do + %{config | node_id: node_id} end @doc """ - Overrides the default consensus strategy used for the Kafka container. + Overrides the default cluster id used for the Kafka container. """ - def with_consensus_strategy(%__MODULE__{} = config, consensus_strategy) - when consensus_strategy in [:zookeeper_embedded, :zookeeper_external, :kraft] do - %{config | consensus_strategy: consensus_strategy} + def with_cluster_id(%__MODULE__{} = config, cluster_id) when is_binary(cluster_id) do + %{config | cluster_id: cluster_id} end @doc """ - Overrides the default zookeeper port used for the Kafka container. + Overrides the default wait timeout used for the Kafka container. """ - def with_zookeeper_port(%__MODULE__{consensus_strategy: strategy} = config, zookeeper_port) - when is_integer(zookeeper_port) and strategy in [:zookeeper_embedded, :zookeeper_external] do - %{config | zookeeper_port: zookeeper_port} + def with_wait_timeout(%__MODULE__{} = config, wait_timeout) when is_integer(wait_timeout) do + %{config | wait_timeout: wait_timeout} end @doc """ - Overrides the default zookeeper host used for the Kafka container. - Available only when consensus_strategy is external - """ - def with_zookeeper_host( - %__MODULE__{consensus_strategy: :zookeeper_external} = config, - zookeeper_host - ) - when is_binary(zookeeper_host) do - %{config | zookeeper_host: zookeeper_host} - end + Sets the topics to be created automatically when the container starts. - @doc """ - Overrides the default zookeeper host used for the Kafka container. - Available only when consensus_strategy is kraft + ## Example + + config = + KafkaContainer.new() + |> KafkaContainer.with_topics(["my-topic", "other-topic"]) """ - def with_cluster_id(%__MODULE__{consensus_strategy: :kraft} = config, cluster_id) - when is_binary(cluster_id) do - %{config | cluster_id: cluster_id} + def with_topics(%__MODULE__{} = config, topics) when is_list(topics) do + %{config | topics: topics} end @doc """ - Overrides the default wait timeout used for the Kafka container. + Set the reuse flag to reuse the container if it is already running. """ - def with_wait_timeout(%__MODULE__{} = config, wait_timeout) when is_integer(wait_timeout) do - %{config | wait_timeout: wait_timeout} + def with_reuse(%__MODULE__{} = config, reuse) when is_boolean(reuse) do + %__MODULE__{config | reuse: reuse} end @doc """ - Overrides the default topic + Returns the bootstrap servers string for connecting to the Kafka container. """ - def with_topic_partitions(%__MODULE__{} = config, topic_partitions) - when is_integer(topic_partitions) do - %{config | default_topic_partitions: topic_partitions} + def bootstrap_servers(%Container{} = container) do + port = Container.mapped_port(container, @default_internal_kafka_port) + "#{Testcontainers.get_host()}:#{port}" end @doc """ - Set the reuse flag to reuse the container if it is already running. + Returns the port on the host machine where the Kafka container is listening. """ - def with_reuse(%__MODULE__{} = config, reuse) when is_boolean(reuse) do - %__MODULE__{config | reuse: reuse} - end + def port(%Container{} = container), + do: Container.mapped_port(container, @default_internal_kafka_port) defimpl Testcontainers.ContainerBuilder do import Container @@ -161,15 +172,15 @@ defmodule Testcontainers.KafkaContainer do @impl true @spec build(%KafkaContainer{}) :: %Container{} def build(%KafkaContainer{} = config) do + host = Testcontainers.get_host() + new(config.image) - |> with_exposed_port(config.kafka_port) - |> with_listener_config(config) - |> with_topic_config(config) - |> with_startup_script(config) + |> with_fixed_port(config.internal_kafka_port, config.kafka_port) + |> with_kraft_config(config, host) |> with_reuse(config.reuse) |> with_waiting_strategy( - CommandWaitStrategy.new( - ["kafka-broker-api-versions", "--bootstrap-server", "localhost:#{config.kafka_port}"], + LogWaitStrategy.new( + ~r/Kafka Server started/, config.wait_timeout, 1000 ) @@ -177,137 +188,91 @@ defmodule Testcontainers.KafkaContainer do end @doc """ - Do stuff after container has started. - We now know both the host and the port of the container and we can - assign them to the config. + After the container starts, create any specified topics. """ @impl true - def after_start(config = %{start_file_path: start_file_path}, container, conn) do - script = build_startup_script(container, config) - Docker.Api.put_file(container.container_id, conn, "/", start_file_path, script) + def after_start(config, container, conn) do + # Create topics if specified + Enum.each(config.topics, fn topic -> + create_topic(container.container_id, conn, topic, config.internal_kafka_port) + end) + + :ok end - # ------------------Listeners------------------ - defp with_listener_config(container, config) do + # KRaft mode environment configuration + defp with_kraft_config(container, config, host) do container + |> with_environment(:KAFKA_NODE_ID, "#{config.node_id}") + |> with_environment(:KAFKA_PROCESS_ROLES, "broker,controller") + |> with_environment(:KAFKA_CONTROLLER_LISTENER_NAMES, "CONTROLLER") + |> with_environment(:KAFKA_INTER_BROKER_LISTENER_NAME, "PLAINTEXT") |> with_environment( :KAFKA_LISTENERS, - "BROKER://0.0.0.0:#{config.broker_port},OUTSIDE://0.0.0.0:#{config.kafka_port}" + "PLAINTEXT://:#{config.internal_kafka_port},CONTROLLER://:#{config.controller_port}" ) |> with_environment( :KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, - "BROKER:PLAINTEXT,OUTSIDE:PLAINTEXT" + "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" ) - |> with_environment(:KAFKA_INTER_BROKER_LISTENER_NAME, "BROKER") - end - - # ------------------Topics------------------ - defp with_topic_config(container, config) do - container - |> with_environment(:KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR, "1") |> with_environment( - :KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS, - "#{config.default_topic_partitions}" + :KAFKA_CONTROLLER_QUORUM_VOTERS, + "#{config.node_id}@localhost:#{config.controller_port}" ) + |> with_environment( + :KAFKA_ADVERTISED_LISTENERS, + "PLAINTEXT://#{host}:#{config.kafka_port}" + ) + |> with_environment(:KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR, "1") |> with_environment(:KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR, "1") |> with_environment(:KAFKA_TRANSACTION_STATE_LOG_MIN_ISR, "1") - |> with_environment(:KAFKA_AUTO_CREATE_TOPICS_ENABLE, "false") - end - - # ------------------Startup------------------ - defp with_startup_script(container, %{start_file_path: start_file_path}) do - with_cmd(container, [ - "sh", - "-c", - "while [ ! -f /#{start_file_path} ]; do echo 'ok' && sleep 0.1; done; sh /#{start_file_path};" - ]) - end - - # ------------------Startup Script------------------ - defp build_startup_script(container, config) do - container - |> init_script(config) - |> add_consensus_strategy(container, config) - |> add_run_command() - |> parse_script() + |> with_environment(:KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS, "0") end - defp add_consensus_strategy(script, container, config) do - case config.consensus_strategy do - :zookeeper_embedded -> embedded_zookeeper_script(script, config) - :zookeeper_external -> external_zookeeper_script(script, config) - :kraft -> kraft_script(script, container, config) - value -> raise "Consensus strategy #{inspect(value)} not implemented" - end + defp create_topic(container_id, conn, topic, kafka_port) do + cmd = [ + "/opt/kafka/bin/kafka-topics.sh", + "--bootstrap-server", + "localhost:#{kafka_port}", + "--create", + "--topic", + topic, + "--partitions", + "1", + "--replication-factor", + "1", + "--if-not-exists" + ] + + result = + case Docker.Api.start_exec(container_id, cmd, conn) do + {:ok, exec_id} -> + wait_for_exec(exec_id, conn) + + {:error, reason} -> + {:error, reason} + end + + # Wait for leader election to complete + Process.sleep(2000) + result end - defp embedded_zookeeper_script(script, config) do - """ - #{script} - export KAFKA_ZOOKEEPER_CONNECT='localhost:#{config.zookeeper_port}' - echo 'clientPort=#{config.zookeeper_port}' > zookeeper.properties - echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties - echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties - zookeeper-server-start zookeeper.properties & - """ - end + defp wait_for_exec(exec_id, conn) do + case Docker.Api.inspect_exec(exec_id, conn) do + {:ok, %{running: true}} -> + Process.sleep(100) + wait_for_exec(exec_id, conn) - defp external_zookeeper_script(script, config) do - """ - #{script} - export KAFKA_ZOOKEEPER_CONNECT='#{config.zookeeper_host}:#{config.zookeeper_port}' - """ - end + {:ok, %{running: false, exit_code: 0}} -> + :ok - # Currently we support only single node as QUORUM_VOTERS requires to know hostnames - # of all voters - defp kraft_script(script, container, config) do - listeners = Map.fetch!(container.environment, :KAFKA_LISTENERS) - protocol_map = Map.fetch!(container.environment, :KAFKA_LISTENER_SECURITY_PROTOCOL_MAP) - - """ - #{script} - export CLUSTER_ID=#{config.cluster_id} - export KAFKA_NODE_ID=#{config.broker_id} - export KAFKA_PROCESS_ROLES=broker,controller - export KAFKA_LISTENERS=#{listeners},CONTROLLER://0.0.0.0:9094 - export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=#{protocol_map},CONTROLLER:PLAINTEXT - export KAFKA_INTER_BROKER_LISTENER_NAME=BROKER - export KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER - export KAFKA_CONTROLLER_QUORUM_VOTERS=1@$(hostname -i):9094 - sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure - echo 'kafka-storage format --ignore-formatted -t "#{config.cluster_id}" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure - """ - end + {:ok, %{running: false, exit_code: code}} -> + {:error, {:exec_failed, code}} - # ----------------------- Default ----------------------- - defp init_script(container, config) do - internal = "BROKER://$(hostname -i):#{config.broker_port}" - - external = - "OUTSIDE://#{Testcontainers.get_host()}:#{Container.mapped_port(container, config.kafka_port)}" - - """ - export KAFKA_BROKER_ID=#{config.broker_id} - export KAFKA_ADVERTISED_LISTENERS=#{internal},#{external} - echo '' > /etc/confluent/docker/ensure - """ - end - - defp add_run_command(script) do - """ - #{script} - /etc/confluent/docker/run - echo finished - """ - end - - defp parse_script(script) do - script - |> String.split("\n") - |> Enum.map(&String.trim/1) - |> Enum.reject(&(&1 == "")) - |> Enum.join("\n") + {:error, reason} -> + {:error, reason} + end end end end diff --git a/test/container/kafka_container_test.exs b/test/container/kafka_container_test.exs index 7465474..88634b6 100644 --- a/test/container/kafka_container_test.exs +++ b/test/container/kafka_container_test.exs @@ -1,10 +1,9 @@ defmodule Testcontainers.Container.KafkaContainerTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false import Testcontainers.ExUnit alias Testcontainers.Container alias Testcontainers.KafkaContainer - alias Test.ZookeeperContainer @moduletag timeout: 200_000 @@ -12,24 +11,23 @@ defmodule Testcontainers.Container.KafkaContainerTest do test "creates a new KafkaContainer struct with default configurations" do config = KafkaContainer.new() - assert config.image == "confluentinc/cp-kafka:7.4.3" - assert config.kafka_port == 9092 - assert config.broker_port == 29092 - assert config.zookeeper_port == 2181 + assert config.image == "apache/kafka:3.9.0" + assert config.kafka_port >= 29000 and config.kafka_port <= 29999 + assert config.internal_kafka_port == 9092 + assert config.controller_port == 9093 + assert config.node_id == 1 assert config.wait_timeout == 60_000 - assert config.consensus_strategy == :zookeeper_embedded assert config.cluster_id == "4L6g3nShT-eMCtK--X86sw" - assert config.zookeeper_host == nil - assert config.default_topic_partitions == 1 + assert config.topics == [] end end describe "with_image/2" do test "overrides the default image used for the Kafka container" do config = KafkaContainer.new() - new_config = KafkaContainer.with_image(config, "confluentinc/cp-kafka:6.2.0") + new_config = KafkaContainer.with_image(config, "apache/kafka:3.8.0") - assert new_config.image == "confluentinc/cp-kafka:6.2.0" + assert new_config.image == "apache/kafka:3.8.0" end test "raises if the image is not a binary" do @@ -52,122 +50,50 @@ defmodule Testcontainers.Container.KafkaContainerTest do end end - describe "with_broker_port/2" do - test "overrides the default broker port used for the Kafka container" do + describe "with_controller_port/2" do + test "overrides the default controller port used for the Kafka container" do config = KafkaContainer.new() - new_config = KafkaContainer.with_broker_port(config, 9095) + new_config = KafkaContainer.with_controller_port(config, 9095) - assert new_config.broker_port == 9095 + assert new_config.controller_port == 9095 end - test "raises if the broker port is not an integer" do + test "raises if the controller port is not an integer" do config = KafkaContainer.new() - assert_raise FunctionClauseError, fn -> KafkaContainer.with_broker_port(config, "9095") end - end - end - - describe "with_broker_id/2" do - test "overrides the default broker id used for the Kafka container" do - config = KafkaContainer.new() - new_config = KafkaContainer.with_broker_id(config, 2) - - assert new_config.broker_id == 2 - end - - test "raises if the broker id is not integer" do - config = KafkaContainer.new() - assert_raise FunctionClauseError, fn -> KafkaContainer.with_broker_id(config, "2") end - end - end - - describe "with_zookeeper_port/2" do - test "overrides the default zookeeper port used for the Kafka container" do - config = KafkaContainer.new() - new_config = KafkaContainer.with_zookeeper_port(config, 2182) - - assert new_config.zookeeper_port == 2182 - end - - test "raises if the zookeeper port is not an integer" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:zookeeper_embedded) - - assert_raise FunctionClauseError, fn -> - KafkaContainer.with_zookeeper_port(config, "2182") - end - end - - test "raises if consensus strategy is not zookeeper" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:kraft) assert_raise FunctionClauseError, fn -> - KafkaContainer.with_zookeeper_port(config, 2182) + KafkaContainer.with_controller_port(config, "9095") end end end - describe "with_zookeeper_host/2" do - test "overrides the default zookeeper host used for the Kafka container" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:zookeeper_external) - new_config = KafkaContainer.with_zookeeper_host(config, "localhost") - - assert new_config.zookeeper_host == "localhost" - end - - test "raises if the zookeeper host is not an binary" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:zookeeper_external) + describe "with_node_id/2" do + test "overrides the default node id used for the Kafka container" do + config = KafkaContainer.new() + new_config = KafkaContainer.with_node_id(config, 2) - assert_raise FunctionClauseError, fn -> - KafkaContainer.with_zookeeper_host(config, 123) - end + assert new_config.node_id == 2 end - test "raises if the zookeeper strategy is not an external" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:zookeeper_embedded) - - assert_raise FunctionClauseError, fn -> - KafkaContainer.with_zookeeper_host(config, "localhost") - end + test "raises if the node id is not integer" do + config = KafkaContainer.new() + assert_raise FunctionClauseError, fn -> KafkaContainer.with_node_id(config, "2") end end end describe "with_cluster_id/2" do test "overrides the default cluster_id used for the Kafka container" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:kraft) + config = KafkaContainer.new() new_config = KafkaContainer.with_cluster_id(config, "1234") assert new_config.cluster_id == "1234" end - test "raises if the cluster_id is not an binary" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:kraft) - - assert_raise FunctionClauseError, fn -> - KafkaContainer.with_cluster_id(config, 123) - end - end - - test "raises if the consensus strategy is not an kraft" do - config = KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:zookeeper_embedded) - - assert_raise FunctionClauseError, fn -> - KafkaContainer.with_cluster_id(config, "localhost") - end - end - end - - describe "with_consensus_strategy/2" do - test "overrides the consensus strategy host used for the Kafka container" do - config = KafkaContainer.new() - new_config = KafkaContainer.with_consensus_strategy(config, :zookeeper_external) - - assert new_config.consensus_strategy == :zookeeper_external - end - - test "raises if the zookeeper strategy is invalid" do + test "raises if the cluster_id is not a binary" do config = KafkaContainer.new() assert_raise FunctionClauseError, fn -> - KafkaContainer.with_consensus_strategy(config, :host) + KafkaContainer.with_cluster_id(config, 123) end end end @@ -189,24 +115,24 @@ defmodule Testcontainers.Container.KafkaContainerTest do end end - describe "with_topic_partitions/2" do - test "overrides the default topic partitions used for the Kafka container" do + describe "with_topics/2" do + test "sets the topics to be created automatically" do config = KafkaContainer.new() - new_config = KafkaContainer.with_topic_partitions(config, 2) + new_config = KafkaContainer.with_topics(config, ["topic1", "topic2"]) - assert new_config.default_topic_partitions == 2 + assert new_config.topics == ["topic1", "topic2"] end - test "raises if the topic partitions is not an integer" do + test "raises if topics is not a list" do config = KafkaContainer.new() assert_raise FunctionClauseError, fn -> - KafkaContainer.with_topic_partitions(config, "2") + KafkaContainer.with_topics(config, "topic1") end end end - describe "with internal zookeeper" do + describe "kafka container" do container(:kafka, KafkaContainer.new()) test "provides a ready-to-use kafka container", %{kafka: kafka} do @@ -227,99 +153,45 @@ defmodule Testcontainers.Container.KafkaContainerTest do end end - describe "with external zookeeper" do - test "provides a ready-to-use kafka container" do - {:ok, zookeeper} = start_external_zookeeper() - {:ok, kafka} = start_kafka_with_external_zookeeper(zookeeper, 1, 9092) + describe "kafka container with automatic topic creation" do + container(:kafka, KafkaContainer.new() |> KafkaContainer.with_topics(["auto_topic"])) - worker_name = :worker - topic_name = "test_topic" + test "creates topics automatically", %{kafka: kafka} do + worker_name = :auto_worker + topic_name = "auto_topic" uris = [{"localhost", Container.mapped_port(kafka, 9092)}] {:ok, pid} = KafkaEx.create_worker(worker_name, uris: uris, consumer_group: "kafka_ex") on_exit(fn -> :ok = KafkaEx.stop_worker(pid) end) - :ok = create_topic(worker_name, topic_name, []) - - {:ok, _} = KafkaEx.produce(topic_name, 0, "hey", worker_name: worker_name, required_acks: 1) - stream = KafkaEx.stream(topic_name, 0, worker_name: worker_name) - [response] = Enum.take(stream, 1) - - assert response.value == "hey" - end - - @tag flaky: "this fails all the time" - test "with multiple connected nodes" do - {:ok, zookeeper} = start_external_zookeeper() - {:ok, kafka1} = start_kafka_with_external_zookeeper(zookeeper, 1, 9092) - {:ok, kafka2} = start_kafka_with_external_zookeeper(zookeeper, 2, 9093) - - writer_name = :writer - reader_name = :reader - topic_name = "test_topic" - uris_one = [{"localhost", Container.mapped_port(kafka1, 9092)}] - uris_two = [{"localhost", Container.mapped_port(kafka2, 9093)}] - - {:ok, pid} = KafkaEx.create_worker(writer_name, uris: uris_one, consumer_group: "kafka_ex") - on_exit(fn -> :ok = KafkaEx.stop_worker(pid) end) - - # Produce a message to the topic using client connected to kafka1 - :ok = create_topic(writer_name, topic_name, []) - {:ok, _} = KafkaEx.produce(topic_name, 0, "hey", worker_name: writer_name, required_acks: 1) - - # Consume the message from the topic using client connected to kafka2 - {:ok, pid} = KafkaEx.create_worker(reader_name, uris: uris_two, consumer_group: "kafka_ex") - on_exit(fn -> :ok = KafkaEx.stop_worker(pid) end) - stream = KafkaEx.stream(topic_name, 0, worker_name: reader_name) - [response] = Enum.take(stream, 1) - - assert response.value == "hey" - end - end + # Topic should already exist - refresh metadata and wait for leader + :timer.sleep(1000) + KafkaEx.metadata(worker_name: worker_name) + :timer.sleep(1000) - describe "with raft mode" do - container(:kafka, KafkaContainer.new() |> KafkaContainer.with_consensus_strategy(:kraft)) + # Try produce with retries for leader election + {:ok, _} = produce_with_retry(topic_name, "auto_message", worker_name, 5) - test "provides a ready-to-use kafka container", %{kafka: kafka} do - worker_name = :worker - topic_name = "test_topic" - uris = [{"localhost", Container.mapped_port(kafka, 9092)}] - - {:ok, pid} = KafkaEx.create_worker(worker_name, uris: uris, consumer_group: "kafka_ex") - on_exit(fn -> :ok = KafkaEx.stop_worker(pid) end) - - :ok = create_topic(:worker, topic_name, []) - - {:ok, _} = KafkaEx.produce(topic_name, 0, "hey", worker_name: worker_name, required_acks: 1) stream = KafkaEx.stream(topic_name, 0, worker_name: worker_name) [response] = Enum.take(stream, 1) - assert response.value == "hey" + assert response.value == "auto_message" end end - defp start_external_zookeeper do - {:ok, zookeeper} = Testcontainers.start_container(%ZookeeperContainer{}) - on_exit(fn -> Testcontainers.stop_container(zookeeper.container_id) end) - {:ok, zookeeper} - end - - defp start_kafka_with_external_zookeeper(zookeeper, broker_id, port) do - broker_port = String.to_integer("2#{port}") - - {:ok, kafka} = - Testcontainers.start_container( - KafkaContainer.new() - |> KafkaContainer.with_kafka_port(port) - |> KafkaContainer.with_broker_port(broker_port) - |> KafkaContainer.with_broker_id(broker_id) - |> KafkaContainer.with_consensus_strategy(:zookeeper_external) - |> KafkaContainer.with_zookeeper_host(zookeeper.ip_address) - ) + describe "helper functions" do + container(:kafka, KafkaContainer.new()) - on_exit(fn -> Testcontainers.stop_container(kafka.container_id) end) + test "bootstrap_servers returns the correct connection string", %{kafka: kafka} do + bootstrap = KafkaContainer.bootstrap_servers(kafka) + assert bootstrap =~ ~r/^localhost:\d+$/ + end - {:ok, kafka} + test "port returns the mapped port", %{kafka: kafka} do + port = KafkaContainer.port(kafka) + assert is_integer(port) + assert port > 0 + end end # After creating a topic, we need to wait for a short period of time for the topic to be created and @@ -337,4 +209,23 @@ defmodule Testcontainers.Container.KafkaContainerTest do :ok end + + # Retry producing a message if leader is not available yet + defp produce_with_retry(topic_name, message, worker_name, retries) when retries > 0 do + case KafkaEx.produce(topic_name, 0, message, worker_name: worker_name, required_acks: 1) do + {:ok, _} = result -> + result + + :leader_not_available -> + :timer.sleep(1000) + produce_with_retry(topic_name, message, worker_name, retries - 1) + + error -> + error + end + end + + defp produce_with_retry(_topic_name, _message, _worker_name, 0) do + {:error, :leader_not_available_after_retries} + end end